cc-metric-store/internal/api/api.go

410 lines
12 KiB
Go
Raw Normal View History

// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2024-05-03 21:08:01 +02:00
package api
2021-08-20 12:54:11 +02:00
import (
2021-10-11 16:28:05 +02:00
"bufio"
2021-08-20 12:54:11 +02:00
"encoding/json"
"errors"
2021-09-07 09:24:50 +02:00
"fmt"
2022-01-24 09:55:33 +01:00
"io"
2021-08-20 12:54:11 +02:00
"log"
"math"
2021-08-20 12:54:11 +02:00
"net/http"
"strconv"
"strings"
2021-08-20 12:54:11 +02:00
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
2021-10-11 16:28:05 +02:00
"github.com/influxdata/line-protocol/v2/lineprotocol"
2021-08-20 12:54:11 +02:00
)
// @title cc-metric-store REST API
// @version 1.0.0
// @description API for cc-metric-store
// @contact.name ClusterCockpit Project
// @contact.url https://clustercockpit.org
// @contact.email support@clustercockpit.org
// @license.name MIT License
// @license.url https://opensource.org/licenses/MIT
// @host localhost:8082
// @basePath /api/
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-Auth-Token
// ErrorResponse model
type ErrorResponse struct {
// Statustext of Errorcode
Status string `json:"status"`
Error string `json:"error"` // Error Message
}
type ApiMetricData struct {
2024-08-21 09:47:16 +02:00
Error *string `json:"error,omitempty"`
Data util.FloatArray `json:"data,omitempty"`
From int64 `json:"from"`
To int64 `json:"to"`
Resolution int64 `json:"resolution"`
Avg util.Float `json:"avg"`
Min util.Float `json:"min"`
Max util.Float `json:"max"`
}
func handleError(err error, statusCode int, rw http.ResponseWriter) {
// log.Warnf("REST ERROR : %s", err.Error())
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(statusCode)
json.NewEncoder(rw).Encode(ErrorResponse{
Status: http.StatusText(statusCode),
Error: err.Error(),
})
}
// TODO: Optimize this, just like the stats endpoint!
func (data *ApiMetricData) AddStats() {
n := 0
2021-12-15 09:59:33 +01:00
sum, min, max := 0.0, math.MaxFloat64, -math.MaxFloat64
for _, x := range data.Data {
if x.IsNaN() {
continue
}
n += 1
sum += float64(x)
min = math.Min(min, float64(x))
max = math.Max(max, float64(x))
}
2021-12-15 09:59:33 +01:00
if n > 0 {
avg := sum / float64(n)
data.Avg = util.Float(avg)
data.Min = util.Float(min)
data.Max = util.Float(max)
2021-12-15 09:59:33 +01:00
} else {
data.Avg, data.Min, data.Max = util.NaN, util.NaN, util.NaN
2021-12-15 09:59:33 +01:00
}
2021-08-20 12:54:11 +02:00
}
func (data *ApiMetricData) ScaleBy(f util.Float) {
2022-03-31 14:17:27 +02:00
if f == 0 || f == 1 {
return
}
data.Avg *= f
data.Min *= f
data.Max *= f
for i := 0; i < len(data.Data); i++ {
data.Data[i] *= f
}
}
2024-05-06 14:20:43 +02:00
func (data *ApiMetricData) PadDataWithNull(ms *memorystore.MemoryStore, from, to int64, metric string) {
minfo, ok := ms.Metrics[metric]
if !ok {
return
}
2022-03-08 09:27:44 +01:00
if (data.From / minfo.Frequency) > (from / minfo.Frequency) {
padfront := int((data.From / minfo.Frequency) - (from / minfo.Frequency))
ndata := make([]util.Float, 0, padfront+len(data.Data))
for i := 0; i < padfront; i++ {
ndata = append(ndata, util.NaN)
}
for j := 0; j < len(data.Data); j++ {
ndata = append(ndata, data.Data[j])
}
data.Data = ndata
}
}
// handleFree godoc
// @summary
// @tags free
// @description
// @produce json
// @param to query string false "up to timestamp"
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
2024-09-17 10:49:32 +02:00
// @router /free/ [post]
func handleFree(rw http.ResponseWriter, r *http.Request) {
rawTo := r.URL.Query().Get("to")
if rawTo == "" {
handleError(errors.New("'to' is a required query parameter"), http.StatusBadRequest, rw)
2021-09-01 08:48:35 +02:00
return
}
to, err := strconv.ParseInt(rawTo, 10, 64)
2021-09-07 09:24:50 +02:00
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
2021-09-07 09:24:50 +02:00
return
}
2024-05-06 14:20:43 +02:00
// // TODO: lastCheckpoint might be modified by different go-routines.
// // Load it using the sync/atomic package?
// freeUpTo := lastCheckpoint.Unix()
// if to < freeUpTo {
// freeUpTo = to
// }
2021-09-07 09:24:50 +02:00
bodyDec := json.NewDecoder(r.Body)
2022-03-08 09:27:44 +01:00
var selectors [][]string
2021-09-07 09:24:50 +02:00
err = bodyDec.Decode(&selectors)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
2024-05-06 14:20:43 +02:00
ms := memorystore.GetMemoryStore()
2021-09-07 09:24:50 +02:00
n := 0
for _, sel := range selectors {
2024-05-06 14:20:43 +02:00
bn, err := ms.Free(sel, to)
2021-09-07 09:24:50 +02:00
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
2021-09-07 09:24:50 +02:00
return
}
n += bn
}
rw.WriteHeader(http.StatusOK)
2024-05-06 14:20:43 +02:00
fmt.Fprintf(rw, "buffers freed: %d\n", n)
2021-09-07 09:24:50 +02:00
}
// handleWrite godoc
// @summary Receive metrics in line-protocol
// @tags write
// @description Receives metrics in the influx line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)
// @accept plain
// @produce json
// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead."
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /write/ [post]
2021-10-11 16:28:05 +02:00
func handleWrite(rw http.ResponseWriter, r *http.Request) {
2022-01-24 09:55:33 +01:00
bytes, err := io.ReadAll(r.Body)
rw.Header().Add("Content-Type", "application/json")
2022-01-24 09:55:33 +01:00
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
2022-01-24 09:55:33 +01:00
return
}
2024-05-06 14:20:43 +02:00
ms := memorystore.GetMemoryStore()
2022-01-24 09:55:33 +01:00
dec := lineprotocol.NewDecoderWithBytes(bytes)
2024-05-06 14:20:43 +02:00
if err := decodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
2022-03-08 09:27:44 +01:00
log.Printf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
2021-10-11 16:28:05 +02:00
return
}
rw.WriteHeader(http.StatusOK)
}
2022-01-07 08:52:55 +01:00
type ApiQueryRequest struct {
Cluster string `json:"cluster"`
2024-05-06 14:20:43 +02:00
Queries []ApiQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
WithStats bool `json:"with-stats"`
WithData bool `json:"with-data"`
WithPadding bool `json:"with-padding"`
2022-01-07 08:52:55 +01:00
}
2022-02-02 11:26:05 +01:00
type ApiQueryResponse struct {
Queries []ApiQuery `json:"queries,omitempty"`
Results [][]ApiMetricData `json:"results"`
}
2022-01-07 08:52:55 +01:00
type ApiQuery struct {
2024-05-06 14:20:43 +02:00
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
2024-08-21 09:47:16 +02:00
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
2024-05-06 14:20:43 +02:00
ScaleFactor util.Float `json:"scale-by,omitempty"`
Aggregate bool `json:"aggreg"`
2022-01-07 08:52:55 +01:00
}
// handleQuery godoc
// @summary Query metrics
// @tags query
// @description Query metrics.
// @accept json
// @produce json
// @param request body api.ApiQueryRequest true "API query payload object"
// @success 200 {object} api.ApiQueryResponse "API query response object"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /query/ [get]
2022-01-07 08:52:55 +01:00
func handleQuery(rw http.ResponseWriter, r *http.Request) {
var err error
2024-09-17 14:37:32 +02:00
ver := r.URL.Query().Get("version")
if ver == "" {
ver = "v2"
}
2024-05-06 14:20:43 +02:00
req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true}
2022-01-07 08:52:55 +01:00
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
handleError(err, http.StatusBadRequest, rw)
2022-01-07 08:52:55 +01:00
return
}
ms := memorystore.GetMemoryStore()
2022-02-02 11:26:05 +01:00
response := ApiQueryResponse{
Results: make([][]ApiMetricData, 0, len(req.Queries)),
}
if req.ForAllNodes != nil {
nodes := ms.ListChildren([]string{req.Cluster})
for _, node := range nodes {
for _, metric := range req.ForAllNodes {
2022-02-02 11:26:05 +01:00
q := ApiQuery{
Metric: metric,
Hostname: node,
2022-02-02 11:26:05 +01:00
}
req.Queries = append(req.Queries, q)
response.Queries = append(response.Queries, q)
2022-01-07 08:52:55 +01:00
}
}
}
2022-01-07 08:52:55 +01:00
for _, query := range req.Queries {
2024-05-06 14:20:43 +02:00
sels := make([]util.Selector, 0, 1)
if query.Aggregate || query.Type == nil {
2024-05-06 14:20:43 +02:00
sel := util.Selector{{String: req.Cluster}, {String: query.Hostname}}
if query.Type != nil {
if len(query.TypeIds) == 1 {
2024-05-06 14:20:43 +02:00
sel = append(sel, util.SelectorElement{String: *query.Type + query.TypeIds[0]})
2022-01-07 08:52:55 +01:00
} else {
ids := make([]string, len(query.TypeIds))
for i, id := range query.TypeIds {
2022-05-04 09:18:56 +02:00
ids[i] = *query.Type + id
2022-01-07 08:52:55 +01:00
}
2024-05-06 14:20:43 +02:00
sel = append(sel, util.SelectorElement{Group: ids})
2022-01-07 08:52:55 +01:00
}
if query.SubType != nil {
if len(query.SubTypeIds) == 1 {
2024-05-06 14:20:43 +02:00
sel = append(sel, util.SelectorElement{String: *query.SubType + query.SubTypeIds[0]})
} else {
ids := make([]string, len(query.SubTypeIds))
for i, id := range query.SubTypeIds {
2022-05-04 09:18:56 +02:00
ids[i] = *query.SubType + id
}
2024-05-06 14:20:43 +02:00
sel = append(sel, util.SelectorElement{Group: ids})
}
}
}
sels = append(sels, sel)
} else {
for _, typeId := range query.TypeIds {
if query.SubType != nil {
for _, subTypeId := range query.SubTypeIds {
2024-05-06 14:20:43 +02:00
sels = append(sels, util.Selector{
2024-05-03 21:08:01 +02:00
{String: req.Cluster},
{String: query.Hostname},
2022-05-04 09:18:56 +02:00
{String: *query.Type + typeId},
2024-05-03 21:08:01 +02:00
{String: *query.SubType + subTypeId},
})
}
} else {
2024-05-06 14:20:43 +02:00
sels = append(sels, util.Selector{
{String: req.Cluster},
{String: query.Hostname},
2024-05-03 21:08:01 +02:00
{String: *query.Type + typeId},
})
}
2022-01-07 08:52:55 +01:00
}
}
// log.Printf("query: %#v\n", query)
// log.Printf("sels: %#v\n", sels)
res := make([]ApiMetricData, 0, len(sels))
for _, sel := range sels {
data := ApiMetricData{}
2024-09-17 14:37:32 +02:00
if ver == "v1" {
data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, 0)
} else {
data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution)
}
if err != nil {
msg := err.Error()
data.Error = &msg
res = append(res, data)
continue
}
2022-01-07 08:52:55 +01:00
if req.WithStats {
data.AddStats()
}
2022-03-31 14:17:27 +02:00
if query.ScaleFactor != 0 {
data.ScaleBy(query.ScaleFactor)
}
if req.WithPadding {
2024-05-06 14:20:43 +02:00
data.PadDataWithNull(ms, req.From, req.To, query.Metric)
}
if !req.WithData {
data.Data = nil
}
res = append(res, data)
}
2022-02-02 11:26:05 +01:00
response.Results = append(response.Results, res)
2022-01-07 08:52:55 +01:00
}
rw.Header().Set("Content-Type", "application/json")
bw := bufio.NewWriter(rw)
defer bw.Flush()
if err := json.NewEncoder(bw).Encode(response); err != nil {
log.Print(err)
return
}
}
// handleDebug godoc
// @summary Debug endpoint
// @tags debug
// @description Write metrics to store
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /debug/ [post]
2024-05-06 14:20:43 +02:00
func handleDebug(rw http.ResponseWriter, r *http.Request) {
raw := r.URL.Query().Get("selector")
rw.Header().Add("Content-Type", "application/json")
2024-05-06 14:20:43 +02:00
selector := []string{}
if len(raw) != 0 {
selector = strings.Split(raw, ":")
}
2024-05-06 14:20:43 +02:00
ms := memorystore.GetMemoryStore()
if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
2021-08-20 12:54:11 +02:00
}
}