mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-11-10 05:07:25 +01:00
Unified API for all query types; remove dead code
This commit is contained in:
parent
15733cb1b7
commit
b6b219a9ad
328
api.go
328
api.go
@ -20,21 +20,11 @@ import (
|
|||||||
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Example:
|
|
||||||
// {
|
|
||||||
// "metrics": ["flops_sp", "flops_dp"]
|
|
||||||
// "selectors": [["emmy", "host123", "cpu", "0"], ["emmy", "host123", "cpu", "1"]]
|
|
||||||
// }
|
|
||||||
type ApiRequestBody struct {
|
|
||||||
Metrics []string `json:"metrics"`
|
|
||||||
Selectors []Selector `json:"selectors"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ApiMetricData struct {
|
type ApiMetricData struct {
|
||||||
Error *string `json:"error,omitempty"`
|
Error *string `json:"error,omitempty"`
|
||||||
From int64 `json:"from"`
|
From int64 `json:"from"`
|
||||||
To int64 `json:"to"`
|
To int64 `json:"to"`
|
||||||
Data []Float `json:"data"`
|
Data []Float `json:"data,omitempty"`
|
||||||
Avg Float `json:"avg"`
|
Avg Float `json:"avg"`
|
||||||
Min Float `json:"min"`
|
Min Float `json:"min"`
|
||||||
Max Float `json:"max"`
|
Max Float `json:"max"`
|
||||||
@ -65,136 +55,14 @@ func (data *ApiMetricData) AddStats() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiStatsData struct {
|
|
||||||
Error *string `json:"error"`
|
|
||||||
From int64 `json:"from"`
|
|
||||||
To int64 `json:"to"`
|
|
||||||
Samples int `json:"samples"`
|
|
||||||
Avg Float `json:"avg"`
|
|
||||||
Min Float `json:"min"`
|
|
||||||
Max Float `json:"max"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
to, err := strconv.ParseInt(vars["to"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Method != http.MethodPost {
|
|
||||||
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
withStats := r.URL.Query().Get("with-stats") == "true"
|
|
||||||
|
|
||||||
bodyDec := json.NewDecoder(r.Body)
|
|
||||||
var reqBody ApiRequestBody
|
|
||||||
err = bodyDec.Decode(&reqBody)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors))
|
|
||||||
for _, selector := range reqBody.Selectors {
|
|
||||||
metrics := make(map[string]ApiMetricData)
|
|
||||||
for _, metric := range reqBody.Metrics {
|
|
||||||
data, f, t, err := memoryStore.Read(selector, metric, from, to)
|
|
||||||
if err != nil {
|
|
||||||
// http.Error(rw, err.Error(), http.StatusInternalServerError)
|
|
||||||
msg := err.Error()
|
|
||||||
metrics[metric] = ApiMetricData{Error: &msg}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
amd := ApiMetricData{
|
|
||||||
From: f,
|
|
||||||
To: t,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
if withStats {
|
|
||||||
amd.AddStats()
|
|
||||||
}
|
|
||||||
metrics[metric] = amd
|
|
||||||
}
|
|
||||||
res = append(res, metrics)
|
|
||||||
}
|
|
||||||
|
|
||||||
rw.Header().Set("Content-Type", "application/json")
|
|
||||||
err = json.NewEncoder(rw).Encode(res)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleStats(rw http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
to, err := strconv.ParseInt(vars["to"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Method != http.MethodPost {
|
|
||||||
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
bodyDec := json.NewDecoder(r.Body)
|
|
||||||
var reqBody ApiRequestBody
|
|
||||||
err = bodyDec.Decode(&reqBody)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make([]map[string]ApiStatsData, 0, len(reqBody.Selectors))
|
|
||||||
for _, selector := range reqBody.Selectors {
|
|
||||||
metrics := make(map[string]ApiStatsData)
|
|
||||||
for _, metric := range reqBody.Metrics {
|
|
||||||
stats, f, t, err := memoryStore.Stats(selector, metric, from, to)
|
|
||||||
if err != nil {
|
|
||||||
// http.Error(rw, err.Error(), http.StatusInternalServerError)
|
|
||||||
msg := err.Error()
|
|
||||||
metrics[metric] = ApiStatsData{Error: &msg}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics[metric] = ApiStatsData{
|
|
||||||
From: f,
|
|
||||||
To: t,
|
|
||||||
Samples: stats.Samples,
|
|
||||||
Avg: stats.Avg,
|
|
||||||
Min: stats.Min,
|
|
||||||
Max: stats.Max,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
res = append(res, metrics)
|
|
||||||
}
|
|
||||||
|
|
||||||
rw.Header().Set("Content-Type", "application/json")
|
|
||||||
err = json.NewEncoder(rw).Encode(res)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleFree(rw http.ResponseWriter, r *http.Request) {
|
func handleFree(rw http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
rawTo := r.URL.Query().Get("to")
|
||||||
to, err := strconv.ParseInt(vars["to"], 10, 64)
|
if rawTo == "" {
|
||||||
|
http.Error(rw, "'to' is a required query parameter", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
to, err := strconv.ParseInt(rawTo, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
@ -235,22 +103,6 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
|
|||||||
rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
|
rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func handlePeek(rw http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
cluster := vars["cluster"]
|
|
||||||
res, err := memoryStore.Peek(cluster)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
rw.Header().Set("Content-Type", "application/json")
|
|
||||||
err = json.NewEncoder(rw).Encode(res)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||||
@ -258,158 +110,118 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dec := lineprotocol.NewDecoder(bufio.NewReader(r.Body))
|
dec := lineprotocol.NewDecoder(bufio.NewReader(r.Body))
|
||||||
// Unlike the name suggests, handleLine can handle multiple lines
|
if err := decodeLine(dec); err != nil {
|
||||||
if err := handleLine(dec); err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleAllNodes(rw http.ResponseWriter, r *http.Request) {
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
clusterId := vars["cluster"]
|
|
||||||
from, err := strconv.ParseInt(vars["from"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
to, err := strconv.ParseInt(vars["to"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Method != http.MethodPost {
|
|
||||||
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
bodyDec := json.NewDecoder(r.Body)
|
|
||||||
var reqBody struct {
|
|
||||||
Metrics []string `json:"metrics"`
|
|
||||||
}
|
|
||||||
err = bodyDec.Decode(&reqBody)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make(map[string]map[string]ApiMetricData)
|
|
||||||
|
|
||||||
memoryStore.root.lock.RLock()
|
|
||||||
cluster, ok := memoryStore.root.children[clusterId]
|
|
||||||
memoryStore.root.lock.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
http.Error(rw, fmt.Sprintf("cluster '%s' does not exist", clusterId), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster.lock.RLock()
|
|
||||||
hosts := make([]string, 0, len(cluster.children))
|
|
||||||
for host := range cluster.children {
|
|
||||||
hosts = append(hosts, host)
|
|
||||||
}
|
|
||||||
cluster.lock.RUnlock()
|
|
||||||
|
|
||||||
for _, host := range hosts {
|
|
||||||
metrics := make(map[string]ApiMetricData)
|
|
||||||
for _, metric := range reqBody.Metrics {
|
|
||||||
data, f, t, err := memoryStore.Read(Selector{SelectorElement{String: clusterId}, SelectorElement{String: host}}, metric, from, to)
|
|
||||||
if err != nil {
|
|
||||||
// http.Error(rw, err.Error(), http.StatusInternalServerError)
|
|
||||||
msg := err.Error()
|
|
||||||
metrics[metric] = ApiMetricData{Error: &msg}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics[metric] = ApiMetricData{
|
|
||||||
From: f,
|
|
||||||
To: t,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
res[host] = metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
rw.Header().Set("Content-Type", "application/json")
|
|
||||||
err = json.NewEncoder(rw).Encode(res)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ApiQueryRequest struct {
|
type ApiQueryRequest struct {
|
||||||
Cluster string `json:"cluster"`
|
Cluster string `json:"cluster"`
|
||||||
From int64 `json:"from"`
|
From int64 `json:"from"`
|
||||||
To int64 `json:"to"`
|
To int64 `json:"to"`
|
||||||
|
WithStats bool `json:"with-stats"`
|
||||||
|
WithData bool `json:"with-data"`
|
||||||
Queries []ApiQuery `json:"queries"`
|
Queries []ApiQuery `json:"queries"`
|
||||||
}
|
ForAllNodes []string `json:"for-all-nodes"`
|
||||||
|
|
||||||
type ApiQueryResponse struct {
|
|
||||||
ApiMetricData
|
|
||||||
Query *ApiQuery `json:"query"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiQuery struct {
|
type ApiQuery struct {
|
||||||
Metric string `json:"metric"`
|
Metric string `json:"metric"`
|
||||||
Hostname string `json:"hostname"`
|
Hostname string `json:"host"`
|
||||||
|
Aggregate bool `json:"aggreg"`
|
||||||
Type *string `json:"type,omitempty"`
|
Type *string `json:"type,omitempty"`
|
||||||
TypeIds []string `json:"type-ids,omitempty"`
|
TypeIds []int `json:"type-ids,omitempty"`
|
||||||
SubType *string `json:"subtype,omitempty"`
|
SubType *string `json:"subtype,omitempty"`
|
||||||
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
SubTypeIds []int `json:"subtype-ids,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
||||||
var err error
|
var err error
|
||||||
var req ApiQueryRequest
|
var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true}
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := make([]ApiQueryResponse, 0, len(req.Queries))
|
if req.ForAllNodes != nil {
|
||||||
for _, query := range req.Queries {
|
nodes := memoryStore.ListChildren([]string{req.Cluster})
|
||||||
q := query // One of the few shitty things about Go: range looped-over variables are in the outer scope
|
for _, node := range nodes {
|
||||||
res := ApiQueryResponse{
|
for _, metric := range req.ForAllNodes {
|
||||||
Query: &q,
|
req.Queries = append(req.Queries, ApiQuery{
|
||||||
|
Metric: metric,
|
||||||
|
Hostname: node,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sel := Selector{SelectorElement{String: req.Cluster}, SelectorElement{String: query.Hostname}}
|
response := make([][]ApiMetricData, 0, len(req.Queries))
|
||||||
|
for _, query := range req.Queries {
|
||||||
|
sels := make([]Selector, 0, 1)
|
||||||
|
if query.Aggregate || query.Type == nil {
|
||||||
|
sel := Selector{{String: req.Cluster}, {String: query.Hostname}}
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
if len(query.TypeIds) == 1 {
|
if len(query.TypeIds) == 1 {
|
||||||
sel = append(sel, SelectorElement{String: *query.Type + query.TypeIds[0]})
|
sel = append(sel, SelectorElement{String: fmt.Sprintf("%s%d", *query.Type, query.TypeIds[0])})
|
||||||
} else {
|
} else {
|
||||||
ids := make([]string, len(query.TypeIds))
|
ids := make([]string, len(query.TypeIds))
|
||||||
for i, id := range query.TypeIds {
|
for i, id := range query.TypeIds {
|
||||||
ids[i] = *query.Type + id
|
ids[i] = fmt.Sprintf("%s%d", *query.Type, id)
|
||||||
}
|
}
|
||||||
sel = append(sel, SelectorElement{Group: ids})
|
sel = append(sel, SelectorElement{Group: ids})
|
||||||
}
|
}
|
||||||
|
|
||||||
if query.SubType != nil {
|
if query.SubType != nil {
|
||||||
if len(query.SubTypeIds) == 1 {
|
if len(query.SubTypeIds) == 1 {
|
||||||
sel = append(sel, SelectorElement{String: *query.SubType + query.SubTypeIds[0]})
|
sel = append(sel, SelectorElement{String: fmt.Sprintf("%s%d", *query.SubType, query.SubTypeIds[0])})
|
||||||
} else {
|
} else {
|
||||||
ids := make([]string, len(query.SubTypeIds))
|
ids := make([]string, len(query.SubTypeIds))
|
||||||
for i, id := range query.SubTypeIds {
|
for i, id := range query.SubTypeIds {
|
||||||
ids[i] = *query.SubType + id
|
ids[i] = fmt.Sprintf("%s%d", *query.SubType, id)
|
||||||
}
|
}
|
||||||
sel = append(sel, SelectorElement{Group: ids})
|
sel = append(sel, SelectorElement{Group: ids})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sels = append(sels, sel)
|
||||||
|
} else {
|
||||||
|
for _, typeId := range query.TypeIds {
|
||||||
|
if query.SubType != nil {
|
||||||
|
for _, subTypeId := range query.SubTypeIds {
|
||||||
|
sels = append(sels, Selector{
|
||||||
|
{String: req.Cluster}, {String: query.Hostname},
|
||||||
|
{String: fmt.Sprintf("%s%d", *query.Type, typeId)},
|
||||||
|
{String: fmt.Sprintf("%s%d", *query.SubType, subTypeId)}})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sels = append(sels, Selector{
|
||||||
|
{String: req.Cluster},
|
||||||
|
{String: query.Hostname},
|
||||||
|
{String: fmt.Sprintf("%s%d", *query.Type, typeId)}})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// log.Printf("selector (metric: %s): %v", query.Metric, sel)
|
res := make([]ApiMetricData, 0, len(sels))
|
||||||
res.Data, res.From, res.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
|
for _, sel := range sels {
|
||||||
|
data := ApiMetricData{}
|
||||||
|
data.Data, data.From, data.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := err.Error()
|
msg := err.Error()
|
||||||
res.Error = &msg
|
data.Error = &msg
|
||||||
response = append(response, res)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
res.AddStats()
|
if req.WithStats {
|
||||||
|
data.AddStats()
|
||||||
|
}
|
||||||
|
if !req.WithData {
|
||||||
|
data.Data = nil
|
||||||
|
}
|
||||||
|
res = append(res, data)
|
||||||
|
}
|
||||||
response = append(response, res)
|
response = append(response, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -454,11 +266,7 @@ func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler
|
|||||||
func StartApiServer(address string, ctx context.Context) error {
|
func StartApiServer(address string, ctx context.Context) error {
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
|
|
||||||
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
|
r.HandleFunc("/api/free", handleFree)
|
||||||
r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
|
|
||||||
r.HandleFunc("/api/{to:[0-9]+}/free", handleFree)
|
|
||||||
r.HandleFunc("/api/{cluster}/peek", handlePeek)
|
|
||||||
r.HandleFunc("/api/{cluster}/{from:[0-9]+}/{to:[0-9]+}/all-nodes", handleAllNodes)
|
|
||||||
r.HandleFunc("/api/write", handleWrite)
|
r.HandleFunc("/api/write", handleWrite)
|
||||||
r.HandleFunc("/api/query", handleQuery)
|
r.HandleFunc("/api/query", handleQuery)
|
||||||
|
|
||||||
|
24
memstore.go
24
memstore.go
@ -247,8 +247,9 @@ type level struct {
|
|||||||
|
|
||||||
// Find the correct level for the given selector, creating it if
|
// Find the correct level for the given selector, creating it if
|
||||||
// it does not exist. Example selector in the context of the
|
// it does not exist. Example selector in the context of the
|
||||||
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" }
|
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }.
|
||||||
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`?
|
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`?
|
||||||
|
// If nMetrics is -1, do not create new levels.
|
||||||
func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
|
func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
|
||||||
if len(selector) == 0 {
|
if len(selector) == 0 {
|
||||||
return l
|
return l
|
||||||
@ -261,6 +262,9 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
|
|||||||
if l.children == nil {
|
if l.children == nil {
|
||||||
// Children map needs to be created...
|
// Children map needs to be created...
|
||||||
l.lock.RUnlock()
|
l.lock.RUnlock()
|
||||||
|
if nMetrics == -1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
child, ok := l.children[selector[0]]
|
child, ok := l.children[selector[0]]
|
||||||
l.lock.RUnlock()
|
l.lock.RUnlock()
|
||||||
@ -458,3 +462,21 @@ func (m *MemoryStore) FreeAll() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Given a selector, return a list of all children of the level selected.
|
||||||
|
func (m *MemoryStore) ListChildren(selector []string) []string {
|
||||||
|
lvl := m.root.findLevelOrCreate(selector, -1)
|
||||||
|
if lvl == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lvl.lock.RLock()
|
||||||
|
defer lvl.lock.RUnlock()
|
||||||
|
|
||||||
|
children := make([]string, 0, len(lvl.children))
|
||||||
|
for child := range lvl.children {
|
||||||
|
children = append(children, child)
|
||||||
|
}
|
||||||
|
|
||||||
|
return children
|
||||||
|
}
|
||||||
|
57
stats.go
57
stats.go
@ -116,60 +116,3 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*
|
|||||||
Max: Float(max),
|
Max: Float(max),
|
||||||
}, from, to, nil
|
}, from, to, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the newest value of the metric at offset `offset`.
|
|
||||||
// In case the level does not hold the metric itself,
|
|
||||||
// sum up the values from all lower levels.
|
|
||||||
func (l *level) peek(offset int) (Float, int) {
|
|
||||||
b := l.metrics[offset]
|
|
||||||
if b != nil {
|
|
||||||
x := b.data[len(b.data)-1]
|
|
||||||
return x, 1
|
|
||||||
}
|
|
||||||
|
|
||||||
n, sum := 0, Float(0)
|
|
||||||
for _, lvl := range l.children {
|
|
||||||
lvl.lock.RLock()
|
|
||||||
x, m := lvl.peek(offset)
|
|
||||||
lvl.lock.RUnlock()
|
|
||||||
n += m
|
|
||||||
sum += x
|
|
||||||
}
|
|
||||||
|
|
||||||
return sum, n
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the newest value for every metric of every node for the given cluster.
|
|
||||||
// All values are always aggregated to a node.
|
|
||||||
func (m *MemoryStore) Peek(cluster string) (map[string]map[string]Float, error) {
|
|
||||||
m.root.lock.RLock()
|
|
||||||
clusterLevel, ok := m.root.children[cluster]
|
|
||||||
m.root.lock.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("no such cluster: " + cluster)
|
|
||||||
}
|
|
||||||
|
|
||||||
clusterLevel.lock.RLock()
|
|
||||||
defer clusterLevel.lock.RUnlock()
|
|
||||||
|
|
||||||
nodes := make(map[string]map[string]Float)
|
|
||||||
for node, l := range clusterLevel.children {
|
|
||||||
l.lock.RLock()
|
|
||||||
metrics := make(map[string]Float)
|
|
||||||
for metric, minfo := range m.metrics {
|
|
||||||
x, n := l.peek(minfo.offset)
|
|
||||||
if n > 1 {
|
|
||||||
if minfo.aggregation == NoAggregation {
|
|
||||||
return nil, errors.New("cannot aggregate: " + metric)
|
|
||||||
} else if minfo.aggregation == AvgAggregation {
|
|
||||||
x /= Float(n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metrics[metric] = x
|
|
||||||
}
|
|
||||||
nodes[node] = metrics
|
|
||||||
l.lock.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes, nil
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user