add query api endpoint

This commit is contained in:
Lou Knauer 2022-01-07 08:52:55 +01:00
parent 4509a4a355
commit 97dcc95a2f

105
api.go
View File

@ -31,7 +31,7 @@ type ApiRequestBody struct {
}
type ApiMetricData struct {
Error *string `json:"error"`
Error *string `json:"error,omitempty"`
From int64 `json:"from"`
To int64 `json:"to"`
Data []Float `json:"data"`
@ -339,33 +339,86 @@ func handleAllNodes(rw http.ResponseWriter, r *http.Request) {
}
}
// func handleCheckpoint(rw http.ResponseWriter, r *http.Request) {
// vars := mux.Vars(r)
// from, err := strconv.ParseInt(vars["from"], 10, 64)
// if err != nil {
// http.Error(rw, err.Error(), http.StatusBadRequest)
// return
// }
// to, err := strconv.ParseInt(vars["to"], 10, 64)
// if err != nil {
// http.Error(rw, err.Error(), http.StatusBadRequest)
// return
// }
type ApiQueryRequest struct {
Cluster string `json:"cluster"`
From int64 `json:"from"`
To int64 `json:"to"`
Queries []ApiQuery `json:"queries"`
}
// log.Println("Checkpoint creation started...")
// n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, from, to)
// if err != nil {
// log.Printf("Checkpoint creation failed: %s\n", err.Error())
// rw.WriteHeader(http.StatusInternalServerError)
// return
// } else {
// log.Printf("Checkpoint finished (%d files)\n", n)
// }
type ApiQueryResponse struct {
ApiMetricData
Query *ApiQuery `json:"query"`
}
// memoryStore.FreeAll()
type ApiQuery struct {
Metric string `json:"metric"`
Hostname string `json:"hostname"`
Type *string `json:"type,omitempty"`
TypeIds []string `json:"type-ids,omitempty"`
SubType *string `json:"subtype,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
}
// rw.WriteHeader(http.StatusOK)
// }
func handleQuery(rw http.ResponseWriter, r *http.Request) {
var err error
var req ApiQueryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
response := make([]ApiQueryResponse, 0, len(req.Queries))
for _, query := range req.Queries {
res := ApiQueryResponse{
Query: &query,
}
sel := Selector{SelectorElement{String: req.Cluster}, SelectorElement{String: query.Hostname}}
if query.Type != nil {
if len(query.TypeIds) == 1 {
sel = append(sel, SelectorElement{String: query.TypeIds[0]})
} else {
ids := make([]string, len(query.TypeIds))
for i, id := range query.TypeIds {
ids[i] = *query.Type + id
}
sel = append(sel, SelectorElement{Group: ids})
}
if query.SubType != nil {
if len(query.SubTypeIds) == 1 {
sel = append(sel, SelectorElement{String: query.SubTypeIds[0]})
} else {
ids := make([]string, len(query.SubTypeIds))
for i, id := range query.SubTypeIds {
ids[i] = *query.SubType + id
}
sel = append(sel, SelectorElement{Group: ids})
}
}
}
res.Data, res.From, res.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
if err != nil {
msg := err.Error()
res.Error = &msg
response = append(response, res)
continue
}
res.AddStats()
response = append(response, res)
}
rw.Header().Set("Content-Type", "application/json")
bw := bufio.NewWriter(rw)
defer bw.Flush()
if err := json.NewEncoder(bw).Encode(response); err != nil {
log.Print(err)
return
}
}
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
@ -405,7 +458,7 @@ func StartApiServer(address string, ctx context.Context) error {
r.HandleFunc("/api/{cluster}/peek", handlePeek)
r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes)
r.HandleFunc("/api/write", handleWrite)
// r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/checkpoint", handleCheckpoint)
r.HandleFunc("/api/query", handleQuery)
server := &http.Server{
Handler: r,