Port logging to cclog, use loglevels
Separate REST API from pkg API
This commit is contained in:
2025-10-19 09:33:40 +02:00
parent 047b997a22
commit 67be9aa27b
10 changed files with 268 additions and 301 deletions

177
internal/api/memorystore.go Normal file
View File

@@ -0,0 +1,177 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package api
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
// handleFree godoc
// @summary
// @tags free
// @description This endpoint allows the users to free the Buffers from the
// metric store. This endpoint offers the users to remove then systematically
// and also allows then to prune the data under node, if they do not want to
// remove the whole node.
// @produce json
// @param to query string false "up to timestamp"
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /free/ [post]
func freeMetrics(rw http.ResponseWriter, r *http.Request) {
rawTo := r.URL.Query().Get("to")
if rawTo == "" {
handleError(errors.New("'to' is a required query parameter"), http.StatusBadRequest, rw)
return
}
to, err := strconv.ParseInt(rawTo, 10, 64)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
// // TODO: lastCheckpoint might be modified by different go-routines.
// // Load it using the sync/atomic package?
// freeUpTo := lastCheckpoint.Unix()
// if to < freeUpTo {
// freeUpTo = to
// }
bodyDec := json.NewDecoder(r.Body)
var selectors [][]string
err = bodyDec.Decode(&selectors)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
ms := memorystore.GetMemoryStore()
n := 0
for _, sel := range selectors {
bn, err := ms.Free(sel, to)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
n += bn
}
rw.WriteHeader(http.StatusOK)
fmt.Fprintf(rw, "buffers freed: %d\n", n)
}
// handleWrite godoc
// @summary Receive metrics in InfluxDB line-protocol
// @tags write
// @description Write data to the in-memory store in the InfluxDB line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)
// @accept plain
// @produce json
// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead."
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /write/ [post]
func writeMetrics(rw http.ResponseWriter, r *http.Request) {
bytes, err := io.ReadAll(r.Body)
rw.Header().Add("Content-Type", "application/json")
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
ms := memorystore.GetMemoryStore()
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := memorystore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
cclog.Errorf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
return
}
rw.WriteHeader(http.StatusOK)
}
// handleDebug godoc
// @summary Debug endpoint
// @tags debug
// @description This endpoint allows the users to print the content of
// nodes/clusters/metrics to review the state of the data.
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /debug/ [post]
func debugMetrics(rw http.ResponseWriter, r *http.Request) {
raw := r.URL.Query().Get("selector")
rw.Header().Add("Content-Type", "application/json")
selector := []string{}
if len(raw) != 0 {
selector = strings.Split(raw, ":")
}
ms := memorystore.GetMemoryStore()
if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
}
// handleHealthCheck godoc
// @summary HealthCheck endpoint
// @tags healthcheck
// @description This endpoint allows the users to check if a node is healthy
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /healthcheck/ [get]
func metricsHealth(rw http.ResponseWriter, r *http.Request) {
rawCluster := r.URL.Query().Get("cluster")
rawNode := r.URL.Query().Get("node")
if rawCluster == "" || rawNode == "" {
handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
selector := []string{rawCluster, rawNode}
ms := memorystore.GetMemoryStore()
if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/internal/repository"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -98,15 +97,11 @@ func (api *RestApi) MountUserApiRoutes(r *mux.Router) {
func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) {
// REST API Uses TokenAuth
r.HandleFunc("/api/free", memorystore.HandleFree).Methods(http.MethodPost)
r.HandleFunc("/api/write", memorystore.HandleWrite).Methods(http.MethodPost)
r.HandleFunc("/api/debug", memorystore.HandleDebug).Methods(http.MethodGet)
r.HandleFunc("/api/healthcheck", memorystore.HandleHealthCheck).Methods(http.MethodGet)
// Refactor
r.HandleFunc("/api/free/", memorystore.HandleFree).Methods(http.MethodPost)
r.HandleFunc("/api/write/", memorystore.HandleWrite).Methods(http.MethodPost)
r.HandleFunc("/api/debug/", memorystore.HandleDebug).Methods(http.MethodGet)
r.HandleFunc("/api/healthcheck/", memorystore.HandleHealthCheck).Methods(http.MethodGet)
// Refactor ??
r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/write", writeMetrics).Methods(http.MethodPost)
r.HandleFunc("/api/debug", debugMetrics).Methods(http.MethodGet)
r.HandleFunc("/api/healthcheck", metricsHealth).Methods(http.MethodGet)
}
func (api *RestApi) MountConfigApiRoutes(r *mux.Router) {
@@ -269,7 +264,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) {
cluster := vars["cluster"]
host := vars["host"]
dir := filepath.Join(api.MachineStateDir, cluster)
if err := os.MkdirAll(dir, 0755); err != nil {
if err := os.MkdirAll(dir, 0o755); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}

View File

@@ -1,53 +1,17 @@
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// All rights reserved. This file is part of cc-backend.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package memorystore
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"strconv"
"strings"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/ClusterCockpit/cc-lib/util"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
// @title cc-metric-store REST API
// @version 1.0.0
// @description API for cc-metric-store
// @contact.name ClusterCockpit Project
// @contact.url https://clustercockpit.org
// @contact.email support@clustercockpit.org
// @license.name MIT License
// @license.url https://opensource.org/licenses/MIT
// @host localhost:8082
// @basePath /api/
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-Auth-Token
// ErrorResponse model
type ErrorResponse struct {
// Statustext of Errorcode
Status string `json:"status"`
Error string `json:"error"` // Error Message
}
type APIMetricData struct {
Error *string `json:"error,omitempty"`
Data schema.FloatArray `json:"data,omitempty"`
@@ -59,14 +23,32 @@ type APIMetricData struct {
Max schema.Float `json:"max"`
}
func handleError(err error, statusCode int, rw http.ResponseWriter) {
// log.Warnf("REST ERROR : %s", err.Error())
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(statusCode)
json.NewEncoder(rw).Encode(ErrorResponse{
Status: http.StatusText(statusCode),
Error: err.Error(),
})
type APIQueryRequest struct {
Cluster string `json:"cluster"`
Queries []APIQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
WithStats bool `json:"with-stats"`
WithData bool `json:"with-data"`
WithPadding bool `json:"with-padding"`
}
type APIQueryResponse struct {
Queries []APIQuery `json:"queries,omitempty"`
Results [][]APIMetricData `json:"results"`
}
type APIQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
ScaleFactor schema.Float `json:"scale-by,omitempty"`
Aggregate bool `json:"aggreg"`
}
// TODO: Optimize this, just like the stats endpoint!
@@ -126,127 +108,6 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr
}
}
// handleFree godoc
// @summary
// @tags free
// @description This endpoint allows the users to free the Buffers from the
// metric store. This endpoint offers the users to remove then systematically
// and also allows then to prune the data under node, if they do not want to
// remove the whole node.
// @produce json
// @param to query string false "up to timestamp"
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /free/ [post]
func HandleFree(rw http.ResponseWriter, r *http.Request) {
rawTo := r.URL.Query().Get("to")
if rawTo == "" {
handleError(errors.New("'to' is a required query parameter"), http.StatusBadRequest, rw)
return
}
to, err := strconv.ParseInt(rawTo, 10, 64)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
// // TODO: lastCheckpoint might be modified by different go-routines.
// // Load it using the sync/atomic package?
// freeUpTo := lastCheckpoint.Unix()
// if to < freeUpTo {
// freeUpTo = to
// }
bodyDec := json.NewDecoder(r.Body)
var selectors [][]string
err = bodyDec.Decode(&selectors)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
ms := GetMemoryStore()
n := 0
for _, sel := range selectors {
bn, err := ms.Free(sel, to)
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
n += bn
}
rw.WriteHeader(http.StatusOK)
fmt.Fprintf(rw, "buffers freed: %d\n", n)
}
// handleWrite godoc
// @summary Receive metrics in InfluxDB line-protocol
// @tags write
// @description Write data to the in-memory store in the InfluxDB line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)
// @accept plain
// @produce json
// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead."
// @success 200 {string} string "ok"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /write/ [post]
func HandleWrite(rw http.ResponseWriter, r *http.Request) {
bytes, err := io.ReadAll(r.Body)
rw.Header().Add("Content-Type", "application/json")
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
ms := GetMemoryStore()
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := decodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
log.Printf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
return
}
rw.WriteHeader(http.StatusOK)
}
type APIQueryRequest struct {
Cluster string `json:"cluster"`
Queries []APIQuery `json:"queries"`
ForAllNodes []string `json:"for-all-nodes"`
From int64 `json:"from"`
To int64 `json:"to"`
WithStats bool `json:"with-stats"`
WithData bool `json:"with-data"`
WithPadding bool `json:"with-padding"`
}
type APIQueryResponse struct {
Queries []APIQuery `json:"queries,omitempty"`
Results [][]APIMetricData `json:"results"`
}
type APIQuery struct {
Type *string `json:"type,omitempty"`
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
ScaleFactor schema.Float `json:"scale-by,omitempty"`
Aggregate bool `json:"aggreg"`
}
func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
req.WithData = true
req.WithData = true
@@ -354,65 +215,3 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
return &response, nil
}
// handleDebug godoc
// @summary Debug endpoint
// @tags debug
// @description This endpoint allows the users to print the content of
// nodes/clusters/metrics to review the state of the data.
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /debug/ [post]
func HandleDebug(rw http.ResponseWriter, r *http.Request) {
raw := r.URL.Query().Get("selector")
rw.Header().Add("Content-Type", "application/json")
selector := []string{}
if len(raw) != 0 {
selector = strings.Split(raw, ":")
}
ms := GetMemoryStore()
if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
}
// handleHealthCheck godoc
// @summary HealthCheck endpoint
// @tags healthcheck
// @description This endpoint allows the users to check if a node is healthy
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /healthcheck/ [get]
func HandleHealthCheck(rw http.ResponseWriter, r *http.Request) {
rawCluster := r.URL.Query().Get("cluster")
rawNode := r.URL.Query().Get("node")
if rawCluster == "" || rawNode == "" {
handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
selector := []string{rawCluster, rawNode}
ms := GetMemoryStore()
if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
}

View File

@@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sync"
@@ -27,7 +26,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done()
d, err := time.ParseDuration(Keys.Archive.Interval)
if err != nil {
log.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err)
}
if d <= 0 {
return
@@ -45,14 +44,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
return
case <-ticks:
t := time.Now().Add(-d)
log.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
cclog.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir,
Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead)
if err != nil {
log.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error())
cclog.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error())
} else {
log.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n)
cclog.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n)
}
}
}

View File

@@ -10,7 +10,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path"
"sort"
@@ -20,6 +19,7 @@ import (
"sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/linkedin/goavro/v2"
)
@@ -72,7 +72,7 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
continue
}
log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error())
cclog.Errorf("error while checkpointing %#v: %s", workItem.selector, err.Error())
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)

View File

@@ -7,10 +7,11 @@ package memorystore
import (
"context"
"log"
"slices"
"strconv"
"sync"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
)
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
@@ -34,7 +35,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// Fetch the frequency of the metric from the global configuration
freq, err := GetMetricFrequency(val.MetricName)
if err != nil {
log.Printf("Error fetching metric frequency: %s\n", err)
cclog.Errorf("Error fetching metric frequency: %s\n", err)
continue
}
@@ -59,7 +60,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
// If the Avro level is nil, create a new one
if avroLevel == nil {
log.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
}
oldSelector = slices.Clone(selector)
}

View File

@@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"io/fs"
"log"
"os"
"path"
"path/filepath"
@@ -24,6 +23,7 @@ import (
"sync/atomic"
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/linkedin/goavro/v2"
)
@@ -54,7 +54,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done()
d, err := time.ParseDuration(Keys.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
cclog.Fatal(err)
}
if d <= 0 {
return
@@ -71,14 +71,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
case <-ctx.Done():
return
case <-ticks:
log.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
cclog.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
log.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error())
cclog.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error())
} else {
log.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n)
cclog.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n)
lastCheckpoint = now
}
}
@@ -183,7 +183,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
continue
}
log.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error())
cclog.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error())
atomic.AddInt32(&errs, 1)
} else {
atomic.AddInt32(&n, 1)
@@ -318,7 +318,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension)
if err != nil {
log.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error())
cclog.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error())
atomic.AddInt32(&errs, 1)
}
atomic.AddInt32(&n, int32(nn))
@@ -381,9 +381,9 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
// The directory does not exist, so create it using os.MkdirAll()
err := os.MkdirAll(dir, 0o755) // 0755 sets the permissions for the directory
if err != nil {
log.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err)
cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err)
}
log.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir)
cclog.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir)
}
// Config read (replace with your actual config read)
@@ -402,7 +402,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if found, err := checkFilesWithExtension(dir, fileFormat); err != nil {
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
} else if found {
log.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat)
cclog.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat)
return m.FromCheckpoint(dir, from, fileFormat)
}
@@ -411,11 +411,11 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if found, err := checkFilesWithExtension(dir, altFormat); err != nil {
return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err)
} else if found {
log.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat)
cclog.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat)
return m.FromCheckpoint(dir, from, altFormat)
}
log.Println("[METRICSTORE]> No valid checkpoint files found in the directory.")
cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory")
return 0, nil
}

View File

@@ -6,12 +6,7 @@
package memorystore
import (
"bytes"
"encoding/json"
"fmt"
"github.com/ClusterCockpit/cc-backend/internal/config"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
)
var InternalCCMSFlag bool = false
@@ -93,17 +88,6 @@ type MetricConfig struct {
var Metrics map[string]MetricConfig
func InitMetricStore(rawConfig json.RawMessage) {
if rawConfig != nil {
config.Validate(configSchema, rawConfig)
dec := json.NewDecoder(bytes.NewReader(rawConfig))
// dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error())
}
}
}
func GetMetricFrequency(metricName string) (int64, error) {
if metric, ok := Metrics[metricName]; ok {
return metric.Frequency, nil

View File

@@ -8,10 +8,10 @@ package memorystore
import (
"context"
"fmt"
"log"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/nats-io/nats.go"
@@ -118,8 +118,8 @@ func ReceiveNats(conf *(NatsConfig),
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := decodeLine(dec, ms, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
if err := DecodeLine(dec, ms, clusterTag); err != nil {
cclog.Printf("error: %s\n", err.Error())
}
}
@@ -133,8 +133,8 @@ func ReceiveNats(conf *(NatsConfig),
} else {
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := decodeLine(dec, ms, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
if err := DecodeLine(dec, ms, clusterTag); err != nil {
cclog.Printf("error: %s\n", err.Error())
}
})
}
@@ -142,7 +142,7 @@ func ReceiveNats(conf *(NatsConfig),
if err != nil {
return err
}
log.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address)
cclog.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address)
subs = append(subs, sub)
}
@@ -150,14 +150,14 @@ func ReceiveNats(conf *(NatsConfig),
for _, sub := range subs {
err = sub.Unsubscribe()
if err != nil {
log.Printf("NATS unsubscribe failed: %s", err.Error())
cclog.Printf("NATS unsubscribe failed: %s", err.Error())
}
}
close(msgs)
wg.Wait()
nc.Close()
log.Println("NATS connection closed")
cclog.Print("NATS connection closed")
return nil
}
@@ -182,7 +182,7 @@ func reorder(buf, prefix []byte) []byte {
// Decode lines using dec and make write calls to the MemoryStore.
// If a line is missing its cluster tag, use clusterDefault as default.
func decodeLine(dec *lineprotocol.Decoder,
func DecodeLine(dec *lineprotocol.Decoder,
ms *MemoryStore,
clusterDefault string,
) error {

View File

@@ -6,9 +6,10 @@
package memorystore
import (
"bytes"
"context"
"encoding/json"
"errors"
"log"
"os"
"os/signal"
"runtime"
@@ -16,6 +17,8 @@ import (
"syscall"
"time"
"github.com/ClusterCockpit/cc-backend/internal/config"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/resampler"
"github.com/ClusterCockpit/cc-lib/runtimeEnv"
"github.com/ClusterCockpit/cc-lib/schema"
@@ -47,9 +50,18 @@ type MemoryStore struct {
root Level
}
func Init(wg *sync.WaitGroup) {
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
startupTime := time.Now()
if rawConfig != nil {
config.Validate(configSchema, rawConfig)
dec := json.NewDecoder(bytes.NewReader(rawConfig))
// dec.DisallowUnknownFields()
if err := dec.Decode(&Keys); err != nil {
cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error())
}
}
// Pass the config.MetricStoreKeys
InitMetrics(Metrics)
@@ -57,17 +69,17 @@ func Init(wg *sync.WaitGroup) {
d, err := time.ParseDuration(Keys.Checkpoints.Restore)
if err != nil {
log.Fatal(err)
cclog.Fatal(err)
}
restoreFrom := startupTime.Add(-d)
log.Printf("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
cclog.Infof("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix())
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
if err != nil {
log.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error())
cclog.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error())
} else {
log.Printf("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
cclog.Infof("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
}
// Try to use less memory by forcing a GC run here and then
@@ -106,7 +118,7 @@ func Init(wg *sync.WaitGroup) {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(nc, ms, 1, ctx)
if err != nil {
log.Fatal(err)
cclog.Fatal(err)
}
wg.Done()
}()
@@ -144,14 +156,14 @@ func InitMetrics(metrics map[string]MetricConfig) {
func GetMemoryStore() *MemoryStore {
if msInstance == nil {
log.Fatalf("[METRICSTORE]> MemoryStore not initialized!")
cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!")
}
return msInstance
}
func Shutdown() {
log.Printf("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir)
var files int
var err error
@@ -165,9 +177,9 @@ func Shutdown() {
}
if err != nil {
log.Printf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("[METRICSTORE]> Done! (%d files written)\n", files)
cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files)
// ms.PrintHeirarchy()
}
@@ -248,7 +260,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil {
log.Fatal(err)
cclog.Fatal(err)
}
if d <= 0 {
return
@@ -267,12 +279,12 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
return
case <-ticks:
t := time.Now().Add(-d)
log.Printf("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
if err != nil {
log.Printf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error())
cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error())
} else {
log.Printf("[METRICSTORE]> done: %d buffers freed\n", freed)
cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed)
}
}
}