Add all-nodes api endpoint for system view

This commit is contained in:
Lou Knauer 2021-11-16 11:27:28 +01:00
parent eff61ce93e
commit d006e26fd4
2 changed files with 99 additions and 3 deletions

78
api.go
View File

@ -80,7 +80,7 @@ func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
// http.Error(rw, err.Error(), http.StatusInternalServerError) // http.Error(rw, err.Error(), http.StatusInternalServerError)
msg := err.Error() msg := err.Error()
metrics[metric] = ApiMetricData{ Error: &msg } metrics[metric] = ApiMetricData{Error: &msg}
continue continue
} }
@ -134,7 +134,7 @@ func handleStats(rw http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
// http.Error(rw, err.Error(), http.StatusInternalServerError) // http.Error(rw, err.Error(), http.StatusInternalServerError)
msg := err.Error() msg := err.Error()
metrics[metric] = ApiStatsData{ Error: &msg } metrics[metric] = ApiStatsData{Error: &msg}
continue continue
} }
@ -232,6 +232,79 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
} }
func handleAllNodes(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
clusterId := vars["cluster"]
from, err := strconv.ParseInt(vars["from"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
to, err := strconv.ParseInt(vars["to"], 10, 64)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
if r.Method != http.MethodPost {
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
bodyDec := json.NewDecoder(r.Body)
var reqBody struct {
Metrics []string `json:"metrics"`
}
err = bodyDec.Decode(&reqBody)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
res := make(map[string]map[string]ApiMetricData)
memoryStore.root.lock.RLock()
cluster, ok := memoryStore.root.children[clusterId]
memoryStore.root.lock.RUnlock()
if !ok {
http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", clusterId), http.StatusBadRequest)
return
}
cluster.lock.RLock()
hosts := make([]string, 0, len(cluster.children))
for host := range cluster.children {
hosts = append(hosts, host)
}
cluster.lock.RUnlock()
for _, host := range hosts {
metrics := make(map[string]ApiMetricData)
for _, metric := range reqBody.Metrics {
data, f, t, err := memoryStore.Read(Selector{SelectorElement{String: clusterId}, SelectorElement{String: host}}, metric, from, to)
if err != nil {
// http.Error(rw, err.Error(), http.StatusInternalServerError)
msg := err.Error()
metrics[metric] = ApiMetricData{Error: &msg}
continue
}
metrics[metric] = ApiMetricData{
From: f,
To: t,
Data: data,
}
}
res[host] = metrics
}
rw.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(rw).Encode(res)
if err != nil {
log.Println(err.Error())
}
}
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
authheader := r.Header.Get("Authorization") authheader := r.Header.Get("Authorization")
@ -268,6 +341,7 @@ func StartApiServer(address string, ctx context.Context) error {
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree)
r.HandleFunc("/api/{cluster}/peek", handlePeek) r.HandleFunc("/api/{cluster}/peek", handlePeek)
r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes)
r.HandleFunc("/api/write", handleWrite) r.HandleFunc("/api/write", handleWrite)
server := &http.Server{ server := &http.Server{

View File

@ -6,13 +6,23 @@ import (
) )
type SelectorElement struct { type SelectorElement struct {
Any bool
String string String string
Group []string Group []string
} }
func (se *SelectorElement) UnmarshalJSON(input []byte) error { func (se *SelectorElement) UnmarshalJSON(input []byte) error {
if input[0] == '"' { if input[0] == '"' {
return json.Unmarshal(input, &se.String) if err := json.Unmarshal(input, &se.String); err != nil {
return err
}
if se.String == "*" {
se.Any = true
se.String = ""
}
return nil
} }
if input[0] == '[' { if input[0] == '[' {
@ -23,6 +33,10 @@ func (se *SelectorElement) UnmarshalJSON(input []byte) error {
} }
func (se *SelectorElement) MarshalJSON() ([]byte, error) { func (se *SelectorElement) MarshalJSON() ([]byte, error) {
if se.Any {
return []byte("\"*\""), nil
}
if se.String != "" { if se.String != "" {
return json.Marshal(se.String) return json.Marshal(se.String)
} }
@ -91,5 +105,13 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err
return nil return nil
} }
if sel.Any && l.children != nil {
for _, lvl := range l.children {
if err := lvl.findBuffers(selector[1:], offset, f); err != nil {
return err
}
}
}
panic("impossible") panic("impossible")
} }