From 97dcc95a2ff9122ec750edd05d5835fbdace6e6e Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Fri, 7 Jan 2022 08:52:55 +0100 Subject: [PATCH] add query api endpoint --- api.go | 105 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 26 deletions(-) diff --git a/api.go b/api.go index 8dc72c2..6280243 100644 --- a/api.go +++ b/api.go @@ -31,7 +31,7 @@ type ApiRequestBody struct { } type ApiMetricData struct { - Error *string `json:"error"` + Error *string `json:"error,omitempty"` From int64 `json:"from"` To int64 `json:"to"` Data []Float `json:"data"` @@ -339,33 +339,86 @@ func handleAllNodes(rw http.ResponseWriter, r *http.Request) { } } -// func handleCheckpoint(rw http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// from, err := strconv.ParseInt(vars["from"], 10, 64) -// if err != nil { -// http.Error(rw, err.Error(), http.StatusBadRequest) -// return -// } -// to, err := strconv.ParseInt(vars["to"], 10, 64) -// if err != nil { -// http.Error(rw, err.Error(), http.StatusBadRequest) -// return -// } +type ApiQueryRequest struct { + Cluster string `json:"cluster"` + From int64 `json:"from"` + To int64 `json:"to"` + Queries []ApiQuery `json:"queries"` +} -// log.Println("Checkpoint creation started...") -// n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, from, to) -// if err != nil { -// log.Printf("Checkpoint creation failed: %s\n", err.Error()) -// rw.WriteHeader(http.StatusInternalServerError) -// return -// } else { -// log.Printf("Checkpoint finished (%d files)\n", n) -// } +type ApiQueryResponse struct { + ApiMetricData + Query *ApiQuery `json:"query"` +} -// memoryStore.FreeAll() +type ApiQuery struct { + Metric string `json:"metric"` + Hostname string `json:"hostname"` + Type *string `json:"type,omitempty"` + TypeIds []string `json:"type-ids,omitempty"` + SubType *string `json:"subtype,omitempty"` + SubTypeIds []string `json:"subtype-ids,omitempty"` +} -// rw.WriteHeader(http.StatusOK) -// } +func handleQuery(rw http.ResponseWriter, r *http.Request) { + var err error + var req ApiQueryRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + response := make([]ApiQueryResponse, 0, len(req.Queries)) + for _, query := range req.Queries { + res := ApiQueryResponse{ + Query: &query, + } + + sel := Selector{SelectorElement{String: req.Cluster}, SelectorElement{String: query.Hostname}} + if query.Type != nil { + if len(query.TypeIds) == 1 { + sel = append(sel, SelectorElement{String: query.TypeIds[0]}) + } else { + ids := make([]string, len(query.TypeIds)) + for i, id := range query.TypeIds { + ids[i] = *query.Type + id + } + sel = append(sel, SelectorElement{Group: ids}) + } + + if query.SubType != nil { + if len(query.SubTypeIds) == 1 { + sel = append(sel, SelectorElement{String: query.SubTypeIds[0]}) + } else { + ids := make([]string, len(query.SubTypeIds)) + for i, id := range query.SubTypeIds { + ids[i] = *query.SubType + id + } + sel = append(sel, SelectorElement{Group: ids}) + } + } + } + + res.Data, res.From, res.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To) + if err != nil { + msg := err.Error() + res.Error = &msg + response = append(response, res) + continue + } + + res.AddStats() + response = append(response, res) + } + + rw.Header().Set("Content-Type", "application/json") + bw := bufio.NewWriter(rw) + defer bw.Flush() + if err := json.NewEncoder(bw).Encode(response); err != nil { + log.Print(err) + return + } +} func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { @@ -405,7 +458,7 @@ func StartApiServer(address string, ctx context.Context) error { r.HandleFunc("/api/{cluster}/peek", handlePeek) r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes) r.HandleFunc("/api/write", handleWrite) - // r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/checkpoint", handleCheckpoint) + r.HandleFunc("/api/query", handleQuery) server := &http.Server{ Handler: r,