diff --git a/internal/api/memorystore.go b/internal/api/memorystore.go new file mode 100644 index 0000000..4d598ee --- /dev/null +++ b/internal/api/memorystore.go @@ -0,0 +1,177 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package api + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + + "github.com/ClusterCockpit/cc-backend/internal/memorystore" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" + + "github.com/influxdata/line-protocol/v2/lineprotocol" +) + +// handleFree godoc +// @summary +// @tags free +// @description This endpoint allows the users to free the Buffers from the +// metric store. This endpoint offers the users to remove then systematically +// and also allows then to prune the data under node, if they do not want to +// remove the whole node. +// @produce json +// @param to query string false "up to timestamp" +// @success 200 {string} string "ok" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /free/ [post] +func freeMetrics(rw http.ResponseWriter, r *http.Request) { + rawTo := r.URL.Query().Get("to") + if rawTo == "" { + handleError(errors.New("'to' is a required query parameter"), http.StatusBadRequest, rw) + return + } + + to, err := strconv.ParseInt(rawTo, 10, 64) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + + // // TODO: lastCheckpoint might be modified by different go-routines. + // // Load it using the sync/atomic package? + // freeUpTo := lastCheckpoint.Unix() + // if to < freeUpTo { + // freeUpTo = to + // } + + bodyDec := json.NewDecoder(r.Body) + var selectors [][]string + err = bodyDec.Decode(&selectors) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + ms := memorystore.GetMemoryStore() + n := 0 + for _, sel := range selectors { + bn, err := ms.Free(sel, to) + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + + n += bn + } + + rw.WriteHeader(http.StatusOK) + fmt.Fprintf(rw, "buffers freed: %d\n", n) +} + +// handleWrite godoc +// @summary Receive metrics in InfluxDB line-protocol +// @tags write +// @description Write data to the in-memory store in the InfluxDB line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md) + +// @accept plain +// @produce json +// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead." +// @success 200 {string} string "ok" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /write/ [post] +func writeMetrics(rw http.ResponseWriter, r *http.Request) { + bytes, err := io.ReadAll(r.Body) + rw.Header().Add("Content-Type", "application/json") + if err != nil { + handleError(err, http.StatusInternalServerError, rw) + return + } + + ms := memorystore.GetMemoryStore() + dec := lineprotocol.NewDecoderWithBytes(bytes) + if err := memorystore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { + cclog.Errorf("/api/write error: %s", err.Error()) + handleError(err, http.StatusBadRequest, rw) + return + } + rw.WriteHeader(http.StatusOK) +} + +// handleDebug godoc +// @summary Debug endpoint +// @tags debug +// @description This endpoint allows the users to print the content of +// nodes/clusters/metrics to review the state of the data. +// @produce json +// @param selector query string false "Selector" +// @success 200 {string} string "Debug dump" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /debug/ [post] +func debugMetrics(rw http.ResponseWriter, r *http.Request) { + raw := r.URL.Query().Get("selector") + rw.Header().Add("Content-Type", "application/json") + selector := []string{} + if len(raw) != 0 { + selector = strings.Split(raw, ":") + } + + ms := memorystore.GetMemoryStore() + if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } +} + +// handleHealthCheck godoc +// @summary HealthCheck endpoint +// @tags healthcheck +// @description This endpoint allows the users to check if a node is healthy +// @produce json +// @param selector query string false "Selector" +// @success 200 {string} string "Debug dump" +// @failure 400 {object} api.ErrorResponse "Bad Request" +// @failure 401 {object} api.ErrorResponse "Unauthorized" +// @failure 403 {object} api.ErrorResponse "Forbidden" +// @failure 500 {object} api.ErrorResponse "Internal Server Error" +// @security ApiKeyAuth +// @router /healthcheck/ [get] +func metricsHealth(rw http.ResponseWriter, r *http.Request) { + rawCluster := r.URL.Query().Get("cluster") + rawNode := r.URL.Query().Get("node") + + if rawCluster == "" || rawNode == "" { + handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw) + return + } + + rw.Header().Add("Content-Type", "application/json") + + selector := []string{rawCluster, rawNode} + + ms := memorystore.GetMemoryStore() + if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil { + handleError(err, http.StatusBadRequest, rw) + return + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index fcadc90..1c6b601 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -15,7 +15,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/internal/repository" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" @@ -98,15 +97,11 @@ func (api *RestApi) MountUserApiRoutes(r *mux.Router) { func (api *RestApi) MountMetricStoreApiRoutes(r *mux.Router) { // REST API Uses TokenAuth - r.HandleFunc("/api/free", memorystore.HandleFree).Methods(http.MethodPost) - r.HandleFunc("/api/write", memorystore.HandleWrite).Methods(http.MethodPost) - r.HandleFunc("/api/debug", memorystore.HandleDebug).Methods(http.MethodGet) - r.HandleFunc("/api/healthcheck", memorystore.HandleHealthCheck).Methods(http.MethodGet) - // Refactor - r.HandleFunc("/api/free/", memorystore.HandleFree).Methods(http.MethodPost) - r.HandleFunc("/api/write/", memorystore.HandleWrite).Methods(http.MethodPost) - r.HandleFunc("/api/debug/", memorystore.HandleDebug).Methods(http.MethodGet) - r.HandleFunc("/api/healthcheck/", memorystore.HandleHealthCheck).Methods(http.MethodGet) + // Refactor ?? + r.HandleFunc("/api/free", freeMetrics).Methods(http.MethodPost) + r.HandleFunc("/api/write", writeMetrics).Methods(http.MethodPost) + r.HandleFunc("/api/debug", debugMetrics).Methods(http.MethodGet) + r.HandleFunc("/api/healthcheck", metricsHealth).Methods(http.MethodGet) } func (api *RestApi) MountConfigApiRoutes(r *mux.Router) { @@ -269,7 +264,7 @@ func (api *RestApi) putMachineState(rw http.ResponseWriter, r *http.Request) { cluster := vars["cluster"] host := vars["host"] dir := filepath.Join(api.MachineStateDir, cluster) - if err := os.MkdirAll(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0o755); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/memorystore/api.go b/internal/memorystore/api.go index 6382090..1f7a531 100644 --- a/internal/memorystore/api.go +++ b/internal/memorystore/api.go @@ -1,53 +1,17 @@ // Copyright (C) NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. +// All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package memorystore import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "io" - "log" "math" - "net/http" - "strconv" - "strings" "github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/util" - - "github.com/influxdata/line-protocol/v2/lineprotocol" ) -// @title cc-metric-store REST API -// @version 1.0.0 -// @description API for cc-metric-store - -// @contact.name ClusterCockpit Project -// @contact.url https://clustercockpit.org -// @contact.email support@clustercockpit.org - -// @license.name MIT License -// @license.url https://opensource.org/licenses/MIT - -// @host localhost:8082 -// @basePath /api/ - -// @securityDefinitions.apikey ApiKeyAuth -// @in header -// @name X-Auth-Token - -// ErrorResponse model -type ErrorResponse struct { - // Statustext of Errorcode - Status string `json:"status"` - Error string `json:"error"` // Error Message -} - type APIMetricData struct { Error *string `json:"error,omitempty"` Data schema.FloatArray `json:"data,omitempty"` @@ -59,14 +23,32 @@ type APIMetricData struct { Max schema.Float `json:"max"` } -func handleError(err error, statusCode int, rw http.ResponseWriter) { - // log.Warnf("REST ERROR : %s", err.Error()) - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(statusCode) - json.NewEncoder(rw).Encode(ErrorResponse{ - Status: http.StatusText(statusCode), - Error: err.Error(), - }) +type APIQueryRequest struct { + Cluster string `json:"cluster"` + Queries []APIQuery `json:"queries"` + ForAllNodes []string `json:"for-all-nodes"` + From int64 `json:"from"` + To int64 `json:"to"` + WithStats bool `json:"with-stats"` + WithData bool `json:"with-data"` + WithPadding bool `json:"with-padding"` +} + +type APIQueryResponse struct { + Queries []APIQuery `json:"queries,omitempty"` + Results [][]APIMetricData `json:"results"` +} + +type APIQuery struct { + Type *string `json:"type,omitempty"` + SubType *string `json:"subtype,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + Resolution int64 `json:"resolution"` + TypeIds []string `json:"type-ids,omitempty"` + SubTypeIds []string `json:"subtype-ids,omitempty"` + ScaleFactor schema.Float `json:"scale-by,omitempty"` + Aggregate bool `json:"aggreg"` } // TODO: Optimize this, just like the stats endpoint! @@ -126,127 +108,6 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr } } -// handleFree godoc -// @summary -// @tags free -// @description This endpoint allows the users to free the Buffers from the -// metric store. This endpoint offers the users to remove then systematically -// and also allows then to prune the data under node, if they do not want to -// remove the whole node. -// @produce json -// @param to query string false "up to timestamp" -// @success 200 {string} string "ok" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /free/ [post] -func HandleFree(rw http.ResponseWriter, r *http.Request) { - rawTo := r.URL.Query().Get("to") - if rawTo == "" { - handleError(errors.New("'to' is a required query parameter"), http.StatusBadRequest, rw) - return - } - - to, err := strconv.ParseInt(rawTo, 10, 64) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - - // // TODO: lastCheckpoint might be modified by different go-routines. - // // Load it using the sync/atomic package? - // freeUpTo := lastCheckpoint.Unix() - // if to < freeUpTo { - // freeUpTo = to - // } - - bodyDec := json.NewDecoder(r.Body) - var selectors [][]string - err = bodyDec.Decode(&selectors) - if err != nil { - http.Error(rw, err.Error(), http.StatusBadRequest) - return - } - - ms := GetMemoryStore() - n := 0 - for _, sel := range selectors { - bn, err := ms.Free(sel, to) - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - - n += bn - } - - rw.WriteHeader(http.StatusOK) - fmt.Fprintf(rw, "buffers freed: %d\n", n) -} - -// handleWrite godoc -// @summary Receive metrics in InfluxDB line-protocol -// @tags write -// @description Write data to the in-memory store in the InfluxDB line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md) - -// @accept plain -// @produce json -// @param cluster query string false "If the lines in the body do not have a cluster tag, use this value instead." -// @success 200 {string} string "ok" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /write/ [post] -func HandleWrite(rw http.ResponseWriter, r *http.Request) { - bytes, err := io.ReadAll(r.Body) - rw.Header().Add("Content-Type", "application/json") - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } - - ms := GetMemoryStore() - dec := lineprotocol.NewDecoderWithBytes(bytes) - if err := decodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { - log.Printf("/api/write error: %s", err.Error()) - handleError(err, http.StatusBadRequest, rw) - return - } - rw.WriteHeader(http.StatusOK) -} - -type APIQueryRequest struct { - Cluster string `json:"cluster"` - Queries []APIQuery `json:"queries"` - ForAllNodes []string `json:"for-all-nodes"` - From int64 `json:"from"` - To int64 `json:"to"` - WithStats bool `json:"with-stats"` - WithData bool `json:"with-data"` - WithPadding bool `json:"with-padding"` -} - -type APIQueryResponse struct { - Queries []APIQuery `json:"queries,omitempty"` - Results [][]APIMetricData `json:"results"` -} - -type APIQuery struct { - Type *string `json:"type,omitempty"` - SubType *string `json:"subtype,omitempty"` - Metric string `json:"metric"` - Hostname string `json:"host"` - Resolution int64 `json:"resolution"` - TypeIds []string `json:"type-ids,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` - ScaleFactor schema.Float `json:"scale-by,omitempty"` - Aggregate bool `json:"aggreg"` -} - func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { req.WithData = true req.WithData = true @@ -354,65 +215,3 @@ func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { return &response, nil } - -// handleDebug godoc -// @summary Debug endpoint -// @tags debug -// @description This endpoint allows the users to print the content of -// nodes/clusters/metrics to review the state of the data. -// @produce json -// @param selector query string false "Selector" -// @success 200 {string} string "Debug dump" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /debug/ [post] -func HandleDebug(rw http.ResponseWriter, r *http.Request) { - raw := r.URL.Query().Get("selector") - rw.Header().Add("Content-Type", "application/json") - selector := []string{} - if len(raw) != 0 { - selector = strings.Split(raw, ":") - } - - ms := GetMemoryStore() - if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } -} - -// handleHealthCheck godoc -// @summary HealthCheck endpoint -// @tags healthcheck -// @description This endpoint allows the users to check if a node is healthy -// @produce json -// @param selector query string false "Selector" -// @success 200 {string} string "Debug dump" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /healthcheck/ [get] -func HandleHealthCheck(rw http.ResponseWriter, r *http.Request) { - rawCluster := r.URL.Query().Get("cluster") - rawNode := r.URL.Query().Get("node") - - if rawCluster == "" || rawNode == "" { - handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - - selector := []string{rawCluster, rawNode} - - ms := GetMemoryStore() - if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } -} diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index 0b65869..6483e74 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -12,7 +12,6 @@ import ( "errors" "fmt" "io" - "log" "os" "path/filepath" "sync" @@ -27,7 +26,7 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, err := time.ParseDuration(Keys.Archive.Interval) if err != nil { - log.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) + cclog.Fatalf("[METRICSTORE]> error parsing archive interval duration: %v\n", err) } if d <= 0 { return @@ -45,14 +44,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { return case <-ticks: t := time.Now().Add(-d) - log.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) + cclog.Printf("[METRICSTORE]> start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, Keys.Archive.RootDir, t.Unix(), Keys.Archive.DeleteInstead) if err != nil { - log.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error()) + cclog.Printf("[METRICSTORE]> archiving failed: %s\n", err.Error()) } else { - log.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n) + cclog.Printf("[METRICSTORE]> done: %d files zipped and moved to archive\n", n) } } } diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 8c82364..3642186 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -10,7 +10,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "os" "path" "sort" @@ -20,6 +19,7 @@ import ( "sync/atomic" "time" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" "github.com/linkedin/goavro/v2" ) @@ -72,7 +72,7 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { continue } - log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) + cclog.Errorf("error while checkpointing %#v: %s", workItem.selector, err.Error()) atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) diff --git a/internal/memorystore/avroHelper.go b/internal/memorystore/avroHelper.go index dadacdc..64e5706 100644 --- a/internal/memorystore/avroHelper.go +++ b/internal/memorystore/avroHelper.go @@ -7,10 +7,11 @@ package memorystore import ( "context" - "log" "slices" "strconv" "sync" + + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) func DataStaging(wg *sync.WaitGroup, ctx context.Context) { @@ -34,7 +35,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { // Fetch the frequency of the metric from the global configuration freq, err := GetMetricFrequency(val.MetricName) if err != nil { - log.Printf("Error fetching metric frequency: %s\n", err) + cclog.Errorf("Error fetching metric frequency: %s\n", err) continue } @@ -59,7 +60,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { // If the Avro level is nil, create a new one if avroLevel == nil { - log.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) + cclog.Errorf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName) } oldSelector = slices.Clone(selector) } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index e649ee0..fe09b9e 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -12,7 +12,6 @@ import ( "errors" "fmt" "io/fs" - "log" "os" "path" "path/filepath" @@ -24,6 +23,7 @@ import ( "sync/atomic" "time" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" "github.com/linkedin/goavro/v2" ) @@ -54,7 +54,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, err := time.ParseDuration(Keys.Checkpoints.Interval) if err != nil { - log.Fatal(err) + cclog.Fatal(err) } if d <= 0 { return @@ -71,14 +71,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { case <-ctx.Done(): return case <-ticks: - log.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) + cclog.Printf("[METRICSTORE]> start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) now := time.Now() n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) if err != nil { - log.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error()) + cclog.Printf("[METRICSTORE]> checkpointing failed: %s\n", err.Error()) } else { - log.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n) + cclog.Printf("[METRICSTORE]> done: %d checkpoint files created\n", n) lastCheckpoint = now } } @@ -183,7 +183,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { continue } - log.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error()) + cclog.Printf("[METRICSTORE]> error while checkpointing %#v: %s", workItem.selector, err.Error()) atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) @@ -318,7 +318,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) if err != nil { - log.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error()) + cclog.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error()) atomic.AddInt32(&errs, 1) } atomic.AddInt32(&n, int32(nn)) @@ -381,9 +381,9 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { // The directory does not exist, so create it using os.MkdirAll() err := os.MkdirAll(dir, 0o755) // 0755 sets the permissions for the directory if err != nil { - log.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) + cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err) } - log.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir) + cclog.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir) } // Config read (replace with your actual config read) @@ -402,7 +402,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) } else if found { - log.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat) + cclog.Printf("[METRICSTORE]> Loading %s files because fileformat is %s\n", fileFormat, fileFormat) return m.FromCheckpoint(dir, from, fileFormat) } @@ -411,11 +411,11 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { if found, err := checkFilesWithExtension(dir, altFormat); err != nil { return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) } else if found { - log.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat) + cclog.Printf("[METRICSTORE]> Loading %s files but fileformat is %s\n", altFormat, fileFormat) return m.FromCheckpoint(dir, from, altFormat) } - log.Println("[METRICSTORE]> No valid checkpoint files found in the directory.") + cclog.Print("[METRICSTORE]> No valid checkpoint files found in the directory") return 0, nil } diff --git a/internal/memorystore/config.go b/internal/memorystore/config.go index 3bbca27..7baef97 100644 --- a/internal/memorystore/config.go +++ b/internal/memorystore/config.go @@ -6,12 +6,7 @@ package memorystore import ( - "bytes" - "encoding/json" "fmt" - - "github.com/ClusterCockpit/cc-backend/internal/config" - cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) var InternalCCMSFlag bool = false @@ -93,17 +88,6 @@ type MetricConfig struct { var Metrics map[string]MetricConfig -func InitMetricStore(rawConfig json.RawMessage) { - if rawConfig != nil { - config.Validate(configSchema, rawConfig) - dec := json.NewDecoder(bytes.NewReader(rawConfig)) - // dec.DisallowUnknownFields() - if err := dec.Decode(&Keys); err != nil { - cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error()) - } - } -} - func GetMetricFrequency(metricName string) (int64, error) { if metric, ok := Metrics[metricName]; ok { return metric.Frequency, nil diff --git a/internal/memorystore/lineprotocol.go b/internal/memorystore/lineprotocol.go index 5265848..607fff3 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/memorystore/lineprotocol.go @@ -8,10 +8,10 @@ package memorystore import ( "context" "fmt" - "log" "sync" "time" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" @@ -118,8 +118,8 @@ func ReceiveNats(conf *(NatsConfig), go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := decodeLine(dec, ms, clusterTag); err != nil { - log.Printf("error: %s\n", err.Error()) + if err := DecodeLine(dec, ms, clusterTag); err != nil { + cclog.Printf("error: %s\n", err.Error()) } } @@ -133,8 +133,8 @@ func ReceiveNats(conf *(NatsConfig), } else { sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := decodeLine(dec, ms, clusterTag); err != nil { - log.Printf("error: %s\n", err.Error()) + if err := DecodeLine(dec, ms, clusterTag); err != nil { + cclog.Printf("error: %s\n", err.Error()) } }) } @@ -142,7 +142,7 @@ func ReceiveNats(conf *(NatsConfig), if err != nil { return err } - log.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address) + cclog.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address) subs = append(subs, sub) } @@ -150,14 +150,14 @@ func ReceiveNats(conf *(NatsConfig), for _, sub := range subs { err = sub.Unsubscribe() if err != nil { - log.Printf("NATS unsubscribe failed: %s", err.Error()) + cclog.Printf("NATS unsubscribe failed: %s", err.Error()) } } close(msgs) wg.Wait() nc.Close() - log.Println("NATS connection closed") + cclog.Print("NATS connection closed") return nil } @@ -182,7 +182,7 @@ func reorder(buf, prefix []byte) []byte { // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. -func decodeLine(dec *lineprotocol.Decoder, +func DecodeLine(dec *lineprotocol.Decoder, ms *MemoryStore, clusterDefault string, ) error { diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 11b4a1c..ed0b4c8 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -6,9 +6,10 @@ package memorystore import ( + "bytes" "context" + "encoding/json" "errors" - "log" "os" "os/signal" "runtime" @@ -16,6 +17,8 @@ import ( "syscall" "time" + "github.com/ClusterCockpit/cc-backend/internal/config" + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/resampler" "github.com/ClusterCockpit/cc-lib/runtimeEnv" "github.com/ClusterCockpit/cc-lib/schema" @@ -47,9 +50,18 @@ type MemoryStore struct { root Level } -func Init(wg *sync.WaitGroup) { +func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { startupTime := time.Now() + if rawConfig != nil { + config.Validate(configSchema, rawConfig) + dec := json.NewDecoder(bytes.NewReader(rawConfig)) + // dec.DisallowUnknownFields() + if err := dec.Decode(&Keys); err != nil { + cclog.Abortf("[METRICSTORE]> Metric Store Config Init: Could not decode config file '%s'.\nError: %s\n", rawConfig, err.Error()) + } + } + // Pass the config.MetricStoreKeys InitMetrics(Metrics) @@ -57,17 +69,17 @@ func Init(wg *sync.WaitGroup) { d, err := time.ParseDuration(Keys.Checkpoints.Restore) if err != nil { - log.Fatal(err) + cclog.Fatal(err) } restoreFrom := startupTime.Add(-d) - log.Printf("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) + cclog.Infof("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix()) loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB if err != nil { - log.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error()) + cclog.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error()) } else { - log.Printf("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) + cclog.Infof("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) } // Try to use less memory by forcing a GC run here and then @@ -106,7 +118,7 @@ func Init(wg *sync.WaitGroup) { // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) err := ReceiveNats(nc, ms, 1, ctx) if err != nil { - log.Fatal(err) + cclog.Fatal(err) } wg.Done() }() @@ -144,14 +156,14 @@ func InitMetrics(metrics map[string]MetricConfig) { func GetMemoryStore() *MemoryStore { if msInstance == nil { - log.Fatalf("[METRICSTORE]> MemoryStore not initialized!") + cclog.Fatalf("[METRICSTORE]> MemoryStore not initialized!") } return msInstance } func Shutdown() { - log.Printf("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) + cclog.Infof("[METRICSTORE]> Writing to '%s'...\n", Keys.Checkpoints.RootDir) var files int var err error @@ -165,9 +177,9 @@ func Shutdown() { } if err != nil { - log.Printf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) + cclog.Errorf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error()) } - log.Printf("[METRICSTORE]> Done! (%d files written)\n", files) + cclog.Infof("[METRICSTORE]> Done! (%d files written)\n", files) // ms.PrintHeirarchy() } @@ -248,7 +260,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() d, err := time.ParseDuration(Keys.RetentionInMemory) if err != nil { - log.Fatal(err) + cclog.Fatal(err) } if d <= 0 { return @@ -267,12 +279,12 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { return case <-ticks: t := time.Now().Add(-d) - log.Printf("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) + cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) freed, err := ms.Free(nil, t.Unix()) if err != nil { - log.Printf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) + cclog.Errorf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error()) } else { - log.Printf("[METRICSTORE]> done: %d buffers freed\n", freed) + cclog.Infof("[METRICSTORE]> done: %d buffers freed\n", freed) } } }