diff --git a/api.go b/api.go index 599e805..38bccb5 100644 --- a/api.go +++ b/api.go @@ -20,21 +20,11 @@ import ( "github.com/influxdata/line-protocol/v2/lineprotocol" ) -// Example: -// { -// "metrics": ["flops_sp", "flops_dp"] -// "selectors": [["emmy", "host123", "cpu", "0"], ["emmy", "host123", "cpu", "1"]] -// } -type ApiRequestBody struct { - Metrics []string `json:"metrics"` - Selectors []Selector `json:"selectors"` -} - type ApiMetricData struct { Error *string `json:"error,omitempty"` From int64 `json:"from"` To int64 `json:"to"` - Data []Float `json:"data"` + Data []Float `json:"data,omitempty"` Avg Float `json:"avg"` Min Float `json:"min"` Max Float `json:"max"` @@ -65,136 +55,14 @@ func (data *ApiMetricData) AddStats() { } } -type ApiStatsData struct { - Error *string `json:"error"` - From int64 `json:"from"` - To int64 `json:"to"` - Samples int `json:"samples"` - Avg Float `json:"avg"` - Min Float `json:"min"` - Max Float `json:"max"` -} - -func handleTimeseries(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - from, err := strconv.ParseInt(vars["from"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - to, err := strconv.ParseInt(vars["to"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - if r.Method != http.MethodPost { - http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) - return - } - - withStats := r.URL.Query().Get("with-stats") == "true" - - bodyDec := json.NewDecoder(r.Body) - var reqBody ApiRequestBody - err = bodyDec.Decode(&reqBody) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - res := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors)) - for _, selector := range reqBody.Selectors { - metrics := make(map[string]ApiMetricData) - for _, metric := range reqBody.Metrics { - data, f, t, err := memoryStore.Read(selector, metric, from, to) - if err != nil { - // http.Error(rw, err.Error(), http.StatusInternalServerError) - msg := err.Error() - metrics[metric] = ApiMetricData{Error: &msg} - continue - } - - amd := ApiMetricData{ - From: f, - To: t, - Data: data, - } - if withStats { - amd.AddStats() - } - metrics[metric] = amd - } - res = append(res, metrics) - } - - rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(res) - if err != nil { - log.Println(err.Error()) - } -} - -func handleStats(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - from, err := strconv.ParseInt(vars["from"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - to, err := strconv.ParseInt(vars["to"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - if r.Method != http.MethodPost { - http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) - return - } - - bodyDec := json.NewDecoder(r.Body) - var reqBody ApiRequestBody - err = bodyDec.Decode(&reqBody) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - res := make([]map[string]ApiStatsData, 0, len(reqBody.Selectors)) - for _, selector := range reqBody.Selectors { - metrics := make(map[string]ApiStatsData) - for _, metric := range reqBody.Metrics { - stats, f, t, err := memoryStore.Stats(selector, metric, from, to) - if err != nil { - // http.Error(rw, err.Error(), http.StatusInternalServerError) - msg := err.Error() - metrics[metric] = ApiStatsData{Error: &msg} - continue - } - - metrics[metric] = ApiStatsData{ - From: f, - To: t, - Samples: stats.Samples, - Avg: stats.Avg, - Min: stats.Min, - Max: stats.Max, - } - } - res = append(res, metrics) - } - - rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(res) - if err != nil { - log.Println(err.Error()) - } -} - func handleFree(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - to, err := strconv.ParseInt(vars["to"], 10, 64) + rawTo := r.URL.Query().Get("to") + if rawTo == "" { + http.Error(rw, "'to' is a required query parameter", http.StatusBadRequest) + return + } + + to, err := strconv.ParseInt(rawTo, 10, 64) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return @@ -235,22 +103,6 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) } -func handlePeek(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - cluster := vars["cluster"] - res, err := memoryStore.Peek(cluster) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - return - } - - rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(res) - if err != nil { - log.Println(err.Error()) - } -} - func handleWrite(rw http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) @@ -258,158 +110,118 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { } dec := lineprotocol.NewDecoder(bufio.NewReader(r.Body)) - // Unlike the name suggests, handleLine can handle multiple lines - if err := handleLine(dec); err != nil { + if err := decodeLine(dec); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } rw.WriteHeader(http.StatusOK) } -func handleAllNodes(rw http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - clusterId := vars["cluster"] - from, err := strconv.ParseInt(vars["from"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - to, err := strconv.ParseInt(vars["to"], 10, 64) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - if r.Method != http.MethodPost { - http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) - return - } - - bodyDec := json.NewDecoder(r.Body) - var reqBody struct { - Metrics []string `json:"metrics"` - } - err = bodyDec.Decode(&reqBody) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - res := make(map[string]map[string]ApiMetricData) - - memoryStore.root.lock.RLock() - cluster, ok := memoryStore.root.children[clusterId] - memoryStore.root.lock.RUnlock() - if !ok { - http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", clusterId), http.StatusBadRequest) - return - } - - cluster.lock.RLock() - hosts := make([]string, 0, len(cluster.children)) - for host := range cluster.children { - hosts = append(hosts, host) - } - cluster.lock.RUnlock() - - for _, host := range hosts { - metrics := make(map[string]ApiMetricData) - for _, metric := range reqBody.Metrics { - data, f, t, err := memoryStore.Read(Selector{SelectorElement{String: clusterId}, SelectorElement{String: host}}, metric, from, to) - if err != nil { - // http.Error(rw, err.Error(), http.StatusInternalServerError) - msg := err.Error() - metrics[metric] = ApiMetricData{Error: &msg} - continue - } - - metrics[metric] = ApiMetricData{ - From: f, - To: t, - Data: data, - } - } - res[host] = metrics - } - - rw.Header().Set("Content-Type", "application/json") - err = json.NewEncoder(rw).Encode(res) - if err != nil { - log.Println(err.Error()) - } -} - type ApiQueryRequest struct { - Cluster string `json:"cluster"` - From int64 `json:"from"` - To int64 `json:"to"` - Queries []ApiQuery `json:"queries"` -} - -type ApiQueryResponse struct { - ApiMetricData - Query *ApiQuery `json:"query"` + Cluster string `json:"cluster"` + From int64 `json:"from"` + To int64 `json:"to"` + WithStats bool `json:"with-stats"` + WithData bool `json:"with-data"` + Queries []ApiQuery `json:"queries"` + ForAllNodes []string `json:"for-all-nodes"` } type ApiQuery struct { - Metric string `json:"metric"` - Hostname string `json:"hostname"` - Type *string `json:"type,omitempty"` - TypeIds []string `json:"type-ids,omitempty"` - SubType *string `json:"subtype,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + Aggregate bool `json:"aggreg"` + Type *string `json:"type,omitempty"` + TypeIds []int `json:"type-ids,omitempty"` + SubType *string `json:"subtype,omitempty"` + SubTypeIds []int `json:"subtype-ids,omitempty"` } func handleQuery(rw http.ResponseWriter, r *http.Request) { var err error - var req ApiQueryRequest + var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } - response := make([]ApiQueryResponse, 0, len(req.Queries)) - for _, query := range req.Queries { - q := query // One of the few shitty things about Go: range looped-over variables are in the outer scope - res := ApiQueryResponse{ - Query: &q, - } - - sel := Selector{SelectorElement{String: req.Cluster}, SelectorElement{String: query.Hostname}} - if query.Type != nil { - if len(query.TypeIds) == 1 { - sel = append(sel, SelectorElement{String: *query.Type + query.TypeIds[0]}) - } else { - ids := make([]string, len(query.TypeIds)) - for i, id := range query.TypeIds { - ids[i] = *query.Type + id - } - sel = append(sel, SelectorElement{Group: ids}) + if req.ForAllNodes != nil { + nodes := memoryStore.ListChildren([]string{req.Cluster}) + for _, node := range nodes { + for _, metric := range req.ForAllNodes { + req.Queries = append(req.Queries, ApiQuery{ + Metric: metric, + Hostname: node, + }) } + } + } - if query.SubType != nil { - if len(query.SubTypeIds) == 1 { - sel = append(sel, SelectorElement{String: *query.SubType + query.SubTypeIds[0]}) + response := make([][]ApiMetricData, 0, len(req.Queries)) + for _, query := range req.Queries { + sels := make([]Selector, 0, 1) + if query.Aggregate || query.Type == nil { + sel := Selector{{String: req.Cluster}, {String: query.Hostname}} + if query.Type != nil { + if len(query.TypeIds) == 1 { + sel = append(sel, SelectorElement{String: fmt.Sprintf("%s%d", *query.Type, query.TypeIds[0])}) } else { - ids := make([]string, len(query.SubTypeIds)) - for i, id := range query.SubTypeIds { - ids[i] = *query.SubType + id + ids := make([]string, len(query.TypeIds)) + for i, id := range query.TypeIds { + ids[i] = fmt.Sprintf("%s%d", *query.Type, id) } sel = append(sel, SelectorElement{Group: ids}) } + + if query.SubType != nil { + if len(query.SubTypeIds) == 1 { + sel = append(sel, SelectorElement{String: fmt.Sprintf("%s%d", *query.SubType, query.SubTypeIds[0])}) + } else { + ids := make([]string, len(query.SubTypeIds)) + for i, id := range query.SubTypeIds { + ids[i] = fmt.Sprintf("%s%d", *query.SubType, id) + } + sel = append(sel, SelectorElement{Group: ids}) + } + } + } + sels = append(sels, sel) + } else { + for _, typeId := range query.TypeIds { + if query.SubType != nil { + for _, subTypeId := range query.SubTypeIds { + sels = append(sels, Selector{ + {String: req.Cluster}, {String: query.Hostname}, + {String: fmt.Sprintf("%s%d", *query.Type, typeId)}, + {String: fmt.Sprintf("%s%d", *query.SubType, subTypeId)}}) + } + } else { + sels = append(sels, Selector{ + {String: req.Cluster}, + {String: query.Hostname}, + {String: fmt.Sprintf("%s%d", *query.Type, typeId)}}) + } } } - // log.Printf("selector (metric: %s): %v", query.Metric, sel) - res.Data, res.From, res.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To) - if err != nil { - msg := err.Error() - res.Error = &msg - response = append(response, res) - continue - } + res := make([]ApiMetricData, 0, len(sels)) + for _, sel := range sels { + data := ApiMetricData{} + data.Data, data.From, data.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To) + if err != nil { + msg := err.Error() + data.Error = &msg + continue + } - res.AddStats() + if req.WithStats { + data.AddStats() + } + if !req.WithData { + data.Data = nil + } + res = append(res, data) + } response = append(response, res) } @@ -454,11 +266,7 @@ func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler func StartApiServer(address string, ctx context.Context) error { r := mux.NewRouter() - r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) - r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) - r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) - r.HandleFunc("/api/{cluster}/peek", handlePeek) - r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes) + r.HandleFunc("/api/free", handleFree) r.HandleFunc("/api/write", handleWrite) r.HandleFunc("/api/query", handleQuery) diff --git a/memstore.go b/memstore.go index 54ce3b7..5337fd8 100644 --- a/memstore.go +++ b/memstore.go @@ -247,8 +247,9 @@ type level struct { // Find the correct level for the given selector, creating it if // it does not exist. Example selector in the context of the -// ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" } +// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. // This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +// If nMetrics is -1, do not create new levels. func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { if len(selector) == 0 { return l @@ -261,6 +262,9 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { if l.children == nil { // Children map needs to be created... l.lock.RUnlock() + if nMetrics == -1 { + return nil + } } else { child, ok := l.children[selector[0]] l.lock.RUnlock() @@ -458,3 +462,21 @@ func (m *MemoryStore) FreeAll() error { return nil } + +// Given a selector, return a list of all children of the level selected. +func (m *MemoryStore) ListChildren(selector []string) []string { + lvl := m.root.findLevelOrCreate(selector, -1) + if lvl == nil { + return nil + } + + lvl.lock.RLock() + defer lvl.lock.RUnlock() + + children := make([]string, 0, len(lvl.children)) + for child := range lvl.children { + children = append(children, child) + } + + return children +} diff --git a/stats.go b/stats.go index 30f55ee..2e56dd5 100644 --- a/stats.go +++ b/stats.go @@ -116,60 +116,3 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* Max: Float(max), }, from, to, nil } - -// Return the newest value of the metric at offset `offset`. -// In case the level does not hold the metric itself, -// sum up the values from all lower levels. -func (l *level) peek(offset int) (Float, int) { - b := l.metrics[offset] - if b != nil { - x := b.data[len(b.data)-1] - return x, 1 - } - - n, sum := 0, Float(0) - for _, lvl := range l.children { - lvl.lock.RLock() - x, m := lvl.peek(offset) - lvl.lock.RUnlock() - n += m - sum += x - } - - return sum, n -} - -// Return the newest value for every metric of every node for the given cluster. -// All values are always aggregated to a node. -func (m *MemoryStore) Peek(cluster string) (map[string]map[string]Float, error) { - m.root.lock.RLock() - clusterLevel, ok := m.root.children[cluster] - m.root.lock.RUnlock() - if !ok { - return nil, errors.New("no such cluster: " + cluster) - } - - clusterLevel.lock.RLock() - defer clusterLevel.lock.RUnlock() - - nodes := make(map[string]map[string]Float) - for node, l := range clusterLevel.children { - l.lock.RLock() - metrics := make(map[string]Float) - for metric, minfo := range m.metrics { - x, n := l.peek(minfo.offset) - if n > 1 { - if minfo.aggregation == NoAggregation { - return nil, errors.New("cannot aggregate: " + metric) - } else if minfo.aggregation == AvgAggregation { - x /= Float(n) - } - } - metrics[metric] = x - } - nodes[node] = metrics - l.lock.RUnlock() - } - - return nodes, nil -}