2024-06-25 20:08:25 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
2024-05-03 21:08:01 +02:00
package api
2021-08-20 12:54:11 +02:00
import (
2021-10-11 16:28:05 +02:00
"bufio"
2021-08-20 12:54:11 +02:00
"encoding/json"
2024-06-25 20:08:25 +02:00
"errors"
2021-09-07 09:24:50 +02:00
"fmt"
2022-01-24 09:55:33 +01:00
"io"
2021-08-20 12:54:11 +02:00
"log"
2021-11-26 09:51:18 +01:00
"math"
2021-08-20 12:54:11 +02:00
"net/http"
"strconv"
2021-09-20 09:27:31 +02:00
"strings"
2021-08-20 12:54:11 +02:00
2024-05-06 09:27:28 +02:00
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
2021-10-11 16:28:05 +02:00
"github.com/influxdata/line-protocol/v2/lineprotocol"
2021-08-20 12:54:11 +02:00
)
2024-06-25 20:08:25 +02:00
// @title cc-metric-store REST API
// @version 1.0.0
// @description API for cc-metric-store
// @contact.name ClusterCockpit Project
// @contact.url https://clustercockpit.org
// @contact.email support@clustercockpit.org
// @license.name MIT License
// @license.url https://opensource.org/licenses/MIT
// @host localhost:8082
// @basePath /api/
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-Auth-Token
// ErrorResponse model
type ErrorResponse struct {
// Statustext of Errorcode
Status string ` json:"status" `
Error string ` json:"error" ` // Error Message
}
2021-08-31 15:17:36 +02:00
type ApiMetricData struct {
2024-06-25 20:08:25 +02:00
Error * string ` json:"error,omitempty" `
2024-05-06 14:20:43 +02:00
Data util . FloatArray ` json:"data,omitempty" `
2024-06-25 20:08:25 +02:00
From int64 ` json:"from" `
To int64 ` json:"to" `
2024-05-06 09:27:28 +02:00
Avg util . Float ` json:"avg" `
Min util . Float ` json:"min" `
Max util . Float ` json:"max" `
2021-11-26 09:51:18 +01:00
}
2024-06-25 20:08:25 +02:00
func handleError ( err error , statusCode int , rw http . ResponseWriter ) {
// log.Warnf("REST ERROR : %s", err.Error())
rw . Header ( ) . Add ( "Content-Type" , "application/json" )
rw . WriteHeader ( statusCode )
json . NewEncoder ( rw ) . Encode ( ErrorResponse {
Status : http . StatusText ( statusCode ) ,
Error : err . Error ( ) ,
} )
}
2021-11-26 09:51:18 +01:00
// TODO: Optimize this, just like the stats endpoint!
func ( data * ApiMetricData ) AddStats ( ) {
n := 0
2021-12-15 09:59:33 +01:00
sum , min , max := 0.0 , math . MaxFloat64 , - math . MaxFloat64
2021-11-26 09:51:18 +01:00
for _ , x := range data . Data {
if x . IsNaN ( ) {
continue
}
n += 1
sum += float64 ( x )
min = math . Min ( min , float64 ( x ) )
max = math . Max ( max , float64 ( x ) )
}
2021-12-15 09:59:33 +01:00
if n > 0 {
avg := sum / float64 ( n )
2024-05-06 09:27:28 +02:00
data . Avg = util . Float ( avg )
data . Min = util . Float ( min )
data . Max = util . Float ( max )
2021-12-15 09:59:33 +01:00
} else {
2024-05-06 09:27:28 +02:00
data . Avg , data . Min , data . Max = util . NaN , util . NaN , util . NaN
2021-12-15 09:59:33 +01:00
}
2021-08-20 12:54:11 +02:00
}
2024-05-06 09:27:28 +02:00
func ( data * ApiMetricData ) ScaleBy ( f util . Float ) {
2022-03-31 14:17:27 +02:00
if f == 0 || f == 1 {
return
}
data . Avg *= f
data . Min *= f
data . Max *= f
for i := 0 ; i < len ( data . Data ) ; i ++ {
data . Data [ i ] *= f
}
}
2024-05-06 14:20:43 +02:00
func ( data * ApiMetricData ) PadDataWithNull ( ms * memorystore . MemoryStore , from , to int64 , metric string ) {
minfo , ok := ms . Metrics [ metric ]
2022-01-31 16:32:50 +01:00
if ! ok {
return
}
2022-03-08 09:27:44 +01:00
if ( data . From / minfo . Frequency ) > ( from / minfo . Frequency ) {
padfront := int ( ( data . From / minfo . Frequency ) - ( from / minfo . Frequency ) )
2024-05-06 09:27:28 +02:00
ndata := make ( [ ] util . Float , 0 , padfront + len ( data . Data ) )
2022-01-31 16:32:50 +01:00
for i := 0 ; i < padfront ; i ++ {
2024-05-06 09:27:28 +02:00
ndata = append ( ndata , util . NaN )
2022-01-31 16:32:50 +01:00
}
for j := 0 ; j < len ( data . Data ) ; j ++ {
ndata = append ( ndata , data . Data [ j ] )
}
data . Data = ndata
}
}
2024-06-25 20:08:25 +02:00
// handleFree godoc
// @summary
// @tags free
// @description
// @produce json
// @param to query string false "up to timestamp"
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /free/ [get]
2022-01-20 10:42:44 +01:00
func handleFree ( rw http . ResponseWriter , r * http . Request ) {
rawTo := r . URL . Query ( ) . Get ( "to" )
if rawTo == "" {
2024-06-25 20:08:25 +02:00
handleError ( errors . New ( "'to' is a required query parameter" ) , http . StatusBadRequest , rw )
2021-09-01 08:48:35 +02:00
return
}
2022-01-20 10:42:44 +01:00
to , err := strconv . ParseInt ( rawTo , 10 , 64 )
2021-09-07 09:24:50 +02:00
if err != nil {
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusInternalServerError , rw )
2021-09-07 09:24:50 +02:00
return
}
2024-05-06 14:20:43 +02:00
// // TODO: lastCheckpoint might be modified by different go-routines.
// // Load it using the sync/atomic package?
// freeUpTo := lastCheckpoint.Unix()
// if to < freeUpTo {
// freeUpTo = to
// }
2021-09-07 09:24:50 +02:00
bodyDec := json . NewDecoder ( r . Body )
2022-03-08 09:27:44 +01:00
var selectors [ ] [ ] string
2021-09-07 09:24:50 +02:00
err = bodyDec . Decode ( & selectors )
if err != nil {
http . Error ( rw , err . Error ( ) , http . StatusBadRequest )
return
}
2024-05-06 14:20:43 +02:00
ms := memorystore . GetMemoryStore ( )
2021-09-07 09:24:50 +02:00
n := 0
for _ , sel := range selectors {
2024-05-06 14:20:43 +02:00
bn , err := ms . Free ( sel , to )
2021-09-07 09:24:50 +02:00
if err != nil {
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusInternalServerError , rw )
2021-09-07 09:24:50 +02:00
return
}
n += bn
}
rw . WriteHeader ( http . StatusOK )
2024-05-06 14:20:43 +02:00
fmt . Fprintf ( rw , "buffers freed: %d\n" , n )
2021-09-07 09:24:50 +02:00
}
2024-06-25 20:08:25 +02:00
// handleWrite godoc
// @summary Receive metrics in line-protocol
// @tags write
// @description Receives metrics in the influx line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)
// @accept plain
// @produce json
// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead."
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /write/ [post]
2021-10-11 16:28:05 +02:00
func handleWrite ( rw http . ResponseWriter , r * http . Request ) {
2022-01-24 09:55:33 +01:00
bytes , err := io . ReadAll ( r . Body )
2024-06-25 20:08:25 +02:00
rw . Header ( ) . Add ( "Content-Type" , "application/json" )
2022-01-24 09:55:33 +01:00
if err != nil {
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusInternalServerError , rw )
2022-01-24 09:55:33 +01:00
return
}
2024-05-06 14:20:43 +02:00
ms := memorystore . GetMemoryStore ( )
2022-01-24 09:55:33 +01:00
dec := lineprotocol . NewDecoderWithBytes ( bytes )
2024-05-06 14:20:43 +02:00
if err := decodeLine ( dec , ms , r . URL . Query ( ) . Get ( "cluster" ) ) ; err != nil {
2022-03-08 09:27:44 +01:00
log . Printf ( "/api/write error: %s" , err . Error ( ) )
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusBadRequest , rw )
2021-10-11 16:28:05 +02:00
return
}
rw . WriteHeader ( http . StatusOK )
}
2022-01-07 08:52:55 +01:00
type ApiQueryRequest struct {
2022-01-20 10:42:44 +01:00
Cluster string ` json:"cluster" `
2024-05-06 14:20:43 +02:00
Queries [ ] ApiQuery ` json:"queries" `
ForAllNodes [ ] string ` json:"for-all-nodes" `
2022-01-20 10:42:44 +01:00
From int64 ` json:"from" `
To int64 ` json:"to" `
WithStats bool ` json:"with-stats" `
WithData bool ` json:"with-data" `
2022-01-31 16:32:50 +01:00
WithPadding bool ` json:"with-padding" `
2022-01-07 08:52:55 +01:00
}
2022-02-02 11:26:05 +01:00
type ApiQueryResponse struct {
Queries [ ] ApiQuery ` json:"queries,omitempty" `
Results [ ] [ ] ApiMetricData ` json:"results" `
}
2022-01-07 08:52:55 +01:00
type ApiQuery struct {
2024-05-06 14:20:43 +02:00
Type * string ` json:"type,omitempty" `
SubType * string ` json:"subtype,omitempty" `
2024-06-25 20:08:25 +02:00
Metric string ` json:"metric" `
Hostname string ` json:"host" `
TypeIds [ ] string ` json:"type-ids,omitempty" `
SubTypeIds [ ] string ` json:"subtype-ids,omitempty" `
2024-05-06 14:20:43 +02:00
ScaleFactor util . Float ` json:"scale-by,omitempty" `
Aggregate bool ` json:"aggreg" `
2022-01-07 08:52:55 +01:00
}
2024-06-25 20:08:25 +02:00
// handleQuery godoc
// @summary Query metrics
// @tags query
// @description Query metrics.
// @accept json
// @produce json
// @param request body api.ApiQueryRequest true "API query payload object"
// @success 200 {object} api.ApiQueryResponse "API query response object"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /query/ [get]
2022-01-07 08:52:55 +01:00
func handleQuery ( rw http . ResponseWriter , r * http . Request ) {
var err error
2024-05-06 14:20:43 +02:00
req := ApiQueryRequest { WithStats : true , WithData : true , WithPadding : true }
2022-01-07 08:52:55 +01:00
if err := json . NewDecoder ( r . Body ) . Decode ( & req ) ; err != nil {
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusBadRequest , rw )
2022-01-07 08:52:55 +01:00
return
}
2024-05-06 09:27:28 +02:00
ms := memorystore . GetMemoryStore ( )
2022-02-02 11:26:05 +01:00
response := ApiQueryResponse {
Results : make ( [ ] [ ] ApiMetricData , 0 , len ( req . Queries ) ) ,
}
2022-01-20 10:42:44 +01:00
if req . ForAllNodes != nil {
2024-05-06 09:27:28 +02:00
nodes := ms . ListChildren ( [ ] string { req . Cluster } )
2022-01-20 10:42:44 +01:00
for _ , node := range nodes {
for _ , metric := range req . ForAllNodes {
2022-02-02 11:26:05 +01:00
q := ApiQuery {
2022-01-20 10:42:44 +01:00
Metric : metric ,
Hostname : node ,
2022-02-02 11:26:05 +01:00
}
req . Queries = append ( req . Queries , q )
response . Queries = append ( response . Queries , q )
2022-01-07 08:52:55 +01:00
}
2022-01-20 10:42:44 +01:00
}
}
2022-01-07 08:52:55 +01:00
2022-01-20 10:42:44 +01:00
for _ , query := range req . Queries {
2024-05-06 14:20:43 +02:00
sels := make ( [ ] util . Selector , 0 , 1 )
2022-01-20 10:42:44 +01:00
if query . Aggregate || query . Type == nil {
2024-05-06 14:20:43 +02:00
sel := util . Selector { { String : req . Cluster } , { String : query . Hostname } }
2022-01-20 10:42:44 +01:00
if query . Type != nil {
if len ( query . TypeIds ) == 1 {
2024-05-06 14:20:43 +02:00
sel = append ( sel , util . SelectorElement { String : * query . Type + query . TypeIds [ 0 ] } )
2022-01-07 08:52:55 +01:00
} else {
2022-01-20 10:42:44 +01:00
ids := make ( [ ] string , len ( query . TypeIds ) )
for i , id := range query . TypeIds {
2022-05-04 09:18:56 +02:00
ids [ i ] = * query . Type + id
2022-01-07 08:52:55 +01:00
}
2024-05-06 14:20:43 +02:00
sel = append ( sel , util . SelectorElement { Group : ids } )
2022-01-07 08:52:55 +01:00
}
2022-01-20 10:42:44 +01:00
if query . SubType != nil {
if len ( query . SubTypeIds ) == 1 {
2024-05-06 14:20:43 +02:00
sel = append ( sel , util . SelectorElement { String : * query . SubType + query . SubTypeIds [ 0 ] } )
2022-01-20 10:42:44 +01:00
} else {
ids := make ( [ ] string , len ( query . SubTypeIds ) )
for i , id := range query . SubTypeIds {
2022-05-04 09:18:56 +02:00
ids [ i ] = * query . SubType + id
2022-01-20 10:42:44 +01:00
}
2024-05-06 14:20:43 +02:00
sel = append ( sel , util . SelectorElement { Group : ids } )
2022-01-20 10:42:44 +01:00
}
}
}
sels = append ( sels , sel )
} else {
for _ , typeId := range query . TypeIds {
if query . SubType != nil {
for _ , subTypeId := range query . SubTypeIds {
2024-05-06 14:20:43 +02:00
sels = append ( sels , util . Selector {
2024-05-03 21:08:01 +02:00
{ String : req . Cluster } ,
{ String : query . Hostname } ,
2022-05-04 09:18:56 +02:00
{ String : * query . Type + typeId } ,
2024-05-03 21:08:01 +02:00
{ String : * query . SubType + subTypeId } ,
} )
2022-01-20 10:42:44 +01:00
}
} else {
2024-05-06 14:20:43 +02:00
sels = append ( sels , util . Selector {
2022-01-20 10:42:44 +01:00
{ String : req . Cluster } ,
{ String : query . Hostname } ,
2024-05-03 21:08:01 +02:00
{ String : * query . Type + typeId } ,
} )
2022-01-20 10:42:44 +01:00
}
2022-01-07 08:52:55 +01:00
}
}
2022-02-02 11:45:52 +01:00
// log.Printf("query: %#v\n", query)
// log.Printf("sels: %#v\n", sels)
2022-01-20 10:42:44 +01:00
res := make ( [ ] ApiMetricData , 0 , len ( sels ) )
for _ , sel := range sels {
data := ApiMetricData { }
2024-05-06 14:20:43 +02:00
data . Data , data . From , data . To , err = ms . Read ( sel , query . Metric , req . From , req . To )
2022-02-02 11:45:52 +01:00
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err)
2022-01-20 10:42:44 +01:00
if err != nil {
msg := err . Error ( )
data . Error = & msg
2022-02-02 11:45:52 +01:00
res = append ( res , data )
2022-01-20 10:42:44 +01:00
continue
}
2022-01-07 08:52:55 +01:00
2022-01-20 10:42:44 +01:00
if req . WithStats {
data . AddStats ( )
}
2022-03-31 14:17:27 +02:00
if query . ScaleFactor != 0 {
data . ScaleBy ( query . ScaleFactor )
}
2022-01-31 16:32:50 +01:00
if req . WithPadding {
2024-05-06 14:20:43 +02:00
data . PadDataWithNull ( ms , req . From , req . To , query . Metric )
2022-01-31 16:32:50 +01:00
}
2022-01-20 10:42:44 +01:00
if ! req . WithData {
data . Data = nil
}
res = append ( res , data )
}
2022-02-02 11:26:05 +01:00
response . Results = append ( response . Results , res )
2022-01-07 08:52:55 +01:00
}
rw . Header ( ) . Set ( "Content-Type" , "application/json" )
bw := bufio . NewWriter ( rw )
defer bw . Flush ( )
if err := json . NewEncoder ( bw ) . Encode ( response ) ; err != nil {
log . Print ( err )
return
}
}
2021-11-22 17:04:09 +01:00
2024-06-25 20:08:25 +02:00
// handleDebug godoc
// @summary Debug endpoint
2024-06-26 05:31:28 +02:00
// @tags debug
2024-06-25 20:08:25 +02:00
// @description Write metrics to store
// @produce json
2024-06-26 05:31:28 +02:00
// @param selector query string false "Selector"
2024-06-25 20:08:25 +02:00
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /debug/ [post]
2024-05-06 14:20:43 +02:00
func handleDebug ( rw http . ResponseWriter , r * http . Request ) {
raw := r . URL . Query ( ) . Get ( "selector" )
2024-06-25 20:08:25 +02:00
rw . Header ( ) . Add ( "Content-Type" , "application/json" )
2024-05-06 14:20:43 +02:00
selector := [ ] string { }
if len ( raw ) != 0 {
selector = strings . Split ( raw , ":" )
2024-06-25 20:08:25 +02:00
}
2024-05-06 14:20:43 +02:00
ms := memorystore . GetMemoryStore ( )
if err := ms . DebugDump ( bufio . NewWriter ( rw ) , selector ) ; err != nil {
2024-06-25 20:08:25 +02:00
handleError ( err , http . StatusBadRequest , rw )
return
2021-08-20 12:54:11 +02:00
}
}