diff --git a/api.go b/api.go index 899c52d..016ce7d 100644 --- a/api.go +++ b/api.go @@ -80,7 +80,7 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) { if err != nil { // http.Error(rw, err.Error(), http.StatusInternalServerError) msg := err.Error() - metrics[metric] = ApiMetricData{ Error: &msg } + metrics[metric] = ApiMetricData{Error: &msg} continue } @@ -134,7 +134,7 @@ func handleStats(rw http.ResponseWriter, r *http.Request) { if err != nil { // http.Error(rw, err.Error(), http.StatusInternalServerError) msg := err.Error() - metrics[metric] = ApiStatsData{ Error: &msg } + metrics[metric] = ApiStatsData{Error: &msg} continue } @@ -232,6 +232,79 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusOK) } +func handleAllNodes(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster"] + from, err := strconv.ParseInt(vars["from"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + to, err := strconv.ParseInt(vars["to"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + bodyDec := json.NewDecoder(r.Body) + var reqBody struct { + Metrics []string `json:"metrics"` + } + err = bodyDec.Decode(&reqBody) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + res := make(map[string]map[string]ApiMetricData) + + memoryStore.root.lock.RLock() + cluster, ok := memoryStore.root.children[clusterId] + memoryStore.root.lock.RUnlock() + if !ok { + http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", clusterId), http.StatusBadRequest) + return + } + + cluster.lock.RLock() + hosts := make([]string, 0, len(cluster.children)) + for host := range cluster.children { + hosts = append(hosts, host) + } + cluster.lock.RUnlock() + + for _, host := range hosts { + metrics := make(map[string]ApiMetricData) + for _, metric := range reqBody.Metrics { + data, f, t, err := memoryStore.Read(Selector{SelectorElement{String: clusterId}, SelectorElement{String: host}}, metric, from, to) + if err != nil { + // http.Error(rw, err.Error(), http.StatusInternalServerError) + msg := err.Error() + metrics[metric] = ApiMetricData{Error: &msg} + continue + } + + metrics[metric] = ApiMetricData{ + From: f, + To: t, + Data: data, + } + } + res[host] = metrics + } + + rw.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(rw).Encode(res) + if err != nil { + log.Println(err.Error()) + } +} + func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { authheader := r.Header.Get("Authorization") @@ -268,6 +341,7 @@ func StartApiServer(address string, ctx context.Context) error { r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) r.HandleFunc("/api/{cluster}/peek", handlePeek) + r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes) r.HandleFunc("/api/write", handleWrite) server := &http.Server{ diff --git a/selector.go b/selector.go index 357b02f..9aa14a6 100644 --- a/selector.go +++ b/selector.go @@ -6,13 +6,23 @@ import ( ) type SelectorElement struct { + Any bool String string Group []string } func (se *SelectorElement) UnmarshalJSON(input []byte) error { if input[0] == '"' { - return json.Unmarshal(input, &se.String) + if err := json.Unmarshal(input, &se.String); err != nil { + return err + } + + if se.String == "*" { + se.Any = true + se.String = "" + } + + return nil } if input[0] == '[' { @@ -23,6 +33,10 @@ func (se *SelectorElement) UnmarshalJSON(input []byte) error { } func (se *SelectorElement) MarshalJSON() ([]byte, error) { + if se.Any { + return []byte("\"*\""), nil + } + if se.String != "" { return json.Marshal(se.String) } @@ -91,5 +105,13 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err return nil } + if sel.Any && l.children != nil { + for _, lvl := range l.children { + if err := lvl.findBuffers(selector[1:], offset, f); err != nil { + return err + } + } + } + panic("impossible") }