From f34e10cfd9aff3acf9b150600a802bfd85a4d0b4 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Thu, 23 Oct 2025 17:58:17 +0200 Subject: [PATCH 1/8] Schema for metric store --- configs/config-demo.json | 17 +- internal/memorystore/buffer.go | 3 + internal/memorystore/configSchema.go | 245 ++++++++------------------- internal/memorystore/memorystore.go | 3 + startDemo.sh | 15 ++ 5 files changed, 102 insertions(+), 181 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index 92e5b47..197aee0 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -34,9 +34,9 @@ { "name": "fritz", "metricDataRepository": { - "kind": "cc-metric-store", + "kind": "cc-metric-store-internal", "url": "http://localhost:8082", - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9" + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw" }, "filterRanges": { "numNodes": { @@ -56,9 +56,9 @@ { "name": "alex", "metricDataRepository": { - "kind": "cc-metric-store", + "kind": "cc-metric-store-internal", "url": "http://localhost:8082", - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9" + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw" }, "filterRanges": { "numNodes": { @@ -78,15 +78,16 @@ ], "metric-store": { "checkpoints": { - "file-format": "avro", - "interval": "2h", + "file-format": "json", + "interval": "1h", "directory": "./var/checkpoints", "restore": "48h" }, "archive": { - "interval": "2h", + "interval": "1h", "directory": "./var/archive" }, "retention-in-memory": "48h" - } + }, + "ui-file": "./configs/uiConfig.json" } \ No newline at end of file diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index a942d0a..413b04e 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -7,6 +7,7 @@ package memorystore import ( "errors" + "fmt" "sync" "github.com/ClusterCockpit/cc-lib/schema" @@ -187,6 +188,8 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 i++ } + fmt.Printf("Given From : %d, To: %d\n", from, to) + return data[:i], from, t, nil } diff --git a/internal/memorystore/configSchema.go b/internal/memorystore/configSchema.go index f548ac6..133ba58 100644 --- a/internal/memorystore/configSchema.go +++ b/internal/memorystore/configSchema.go @@ -6,185 +6,84 @@ package memorystore const configSchema = `{ - "type": "object", - "properties": { - "jobList": { - "description": "Job list defaults. Applies to user- and jobs views.", - "type": "object", - "properties": { - "usePaging": { - "description": "If classic paging is used instead of continuous scrolling by default.", - "type": "boolean" - }, - "showFootprint": { - "description": "If footprint bars are shown as first column by default.", - "type": "boolean" - } - } - }, - "nodeList": { - "description": "Node list defaults. Applies to node list view.", - "type": "object", - "properties": { - "usePaging": { - "description": "If classic paging is used instead of continuous scrolling by default.", - "type": "boolean" - } - } - }, - "jobView": { - "description": "Job view defaults.", - "type": "object", - "properties": { - "showPolarPlot": { - "description": "If the job metric footprints polar plot is shown by default.", - "type": "boolean" - }, - "showFootprint": { - "description": "If the annotated job metric footprint bars are shown by default.", - "type": "boolean" - }, - "showRoofline": { - "description": "If the job roofline plot is shown by default.", - "type": "boolean" - }, - "showStatTable": { - "description": "If the job metric statistics table is shown by default.", - "type": "boolean" - } - } - }, - "metricConfig": { - "description": "Global initial metric selections for primary views of all clusters.", - "type": "object", - "properties": { - "jobListMetrics": { - "description": "Initial metrics shown for new users in job lists (User and jobs view).", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "jobViewPlotMetrics": { - "description": "Initial metrics shown for new users as job view metric plots.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "jobViewTableMetrics": { - "description": "Initial metrics shown for new users in job view statistics table.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "clusters": { - "description": "Overrides for global defaults by cluster and subcluster.", - "type": "array", - "items": { + "type": "object", + "description": "Configuration specific to built-in metric-store.", + "properties": { + "checkpoints": { + "description": "Configuration for checkpointing the metrics within metric-store", "type": "object", "properties": { - "name": { - "description": "The name of the cluster." - }, - "jobListMetrics": { - "description": "Initial metrics shown for new users in job lists (User and jobs view) for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 + "file-format": { + "description": "Specify the type of checkpoint file. There are 2 variants: 'avro' and 'json'. If nothing is specified, 'avro' is default.", + "type": "string" + }, + "interval": { + "description": "Interval at which the metrics should be checkpointed.", + "type": "string" + }, + "directory": { + "description": "Specify the parent directy in which the checkpointed files should be placed.", + "type": "string" + }, + "restore": { + "description": "When cc-backend starts up, look for checkpointed files that are less than X hours old and load metrics from these selected checkpoint files.", + "type": "string" } - }, - "jobViewPlotMetrics": { - "description": "Initial metrics shown for new users as job view timeplots for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "jobViewTableMetrics": { - "description": "Initial metrics shown for new users in job view statistics table for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "subClusters": { - "description": "The array of overrides per subcluster.", - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "description": "The name of the subcluster.", - "type": "string" - }, - "jobListMetrics": { - "description": "Initial metrics shown for new users in job lists (User and jobs view) for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "jobViewPlotMetrics": { - "description": "Initial metrics shown for new users as job view timeplots for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - }, - "jobViewTableMetrics": { - "description": "Initial metrics shown for new users in job view statistics table for subcluster.", - "type": "array", - "items": { - "type": "string", - "minItems": 1 - } - } - }, - "required": ["name"], - "minItems": 1 - } - } - }, - "required": ["name", "subClusters"], - "minItems": 1 - } - } - } - }, - "plotConfiguration": { - "description": "Initial settings for plot render options.", - "type": "object", - "properties": { - "colorBackground": { - "description": "If the metric plot backgrounds are initially colored by threshold limits.", - "type": "boolean" + } }, - "plotsPerRow": { - "description": "How many plots are initially rendered in per row. Applies to job, single node, and analysis views.", - "type": "integer" + "archive": { + "description": "Configuration for archiving the already checkpointed files.", + "type": "object", + "properties": { + "interval": { + "description": "Interval at which the checkpointed files should be archived.", + "type": "string" + }, + "directory": { + "description": "Specify the parent directy in which the archived files should be placed.", + "type": "string" + } + } }, - "lineWidth": { - "description": "Initial thickness of rendered plotlines. Applies to metric plot, job compare plot and roofline.", - "type": "integer" - }, - "colorScheme": { - "description": "Initial colorScheme to be used for metric plots.", - "type": "array", - "items": { + "retention-in-memory": { + "description": "Keep the metrics within memory for given time interval. Retention for X hours, then the metrics would be freed.", "type": "string" - } + }, + "nats": { + "description": "Configuration for accepting published data through NATS.", + "type": "object", + "properties": { + "address": { + "description": "Address of the NATS server.", + "type": "string" + }, + "username": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "password": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "creds-file-path": { + "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", + "type": "string" + }, + "subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } + } + } } - } } - } }` diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index d76b83b..552390e 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "os" "os/signal" "runtime" @@ -386,6 +387,8 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) + fmt.Printf("Requested From : %d, To: %d\n", from, to) + err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { diff --git a/startDemo.sh b/startDemo.sh index 6817dc8..160ac63 100755 --- a/startDemo.sh +++ b/startDemo.sh @@ -1,5 +1,7 @@ #!/bin/sh +# rm -rf var + if [ -d './var' ]; then echo 'Directory ./var already exists! Skipping initialization.' ./cc-backend -server -dev @@ -14,6 +16,19 @@ else ./cc-backend -migrate-db ./cc-backend -dev -init-db -add-user demo:admin,api:demo + + # Generate JWT and extract only the token value + JWT=$(./cc-backend -jwt demo | grep -oP "(?<=JWT: Successfully generated JWT for user 'demo': ).*") + + # Replace the existing JWT in test_ccms_write_api.sh with the new one + if [ -n "$JWT" ]; then + sed -i "1s|^JWT=.*|JWT=\"$JWT\"|" test_ccms_write_api.sh + echo "✅ Updated JWT in test_ccms_write_api.sh" + else + echo "❌ Failed to generate JWT for demo user" + exit 1 + fi + ./cc-backend -server -dev fi From 0920286b4c727266172ff027ddf1d99f4dbde14d Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Thu, 23 Oct 2025 17:58:56 +0200 Subject: [PATCH 2/8] Clean up --- startDemo.sh | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/startDemo.sh b/startDemo.sh index 160ac63..af72a80 100755 --- a/startDemo.sh +++ b/startDemo.sh @@ -16,18 +16,6 @@ else ./cc-backend -migrate-db ./cc-backend -dev -init-db -add-user demo:admin,api:demo - - # Generate JWT and extract only the token value - JWT=$(./cc-backend -jwt demo | grep -oP "(?<=JWT: Successfully generated JWT for user 'demo': ).*") - - # Replace the existing JWT in test_ccms_write_api.sh with the new one - if [ -n "$JWT" ]; then - sed -i "1s|^JWT=.*|JWT=\"$JWT\"|" test_ccms_write_api.sh - echo "✅ Updated JWT in test_ccms_write_api.sh" - else - echo "❌ Failed to generate JWT for demo user" - exit 1 - fi ./cc-backend -server -dev From 856ccbb969b82fb418ce33effa44ecc6a09d79f5 Mon Sep 17 00:00:00 2001 From: Michael Panzlaff Date: Mon, 27 Oct 2025 14:50:08 +0100 Subject: [PATCH 3/8] Fix wrong memorystore nats schema --- internal/memorystore/configSchema.go | 72 +++++++++++++++------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/internal/memorystore/configSchema.go b/internal/memorystore/configSchema.go index 133ba58..a40b95e 100644 --- a/internal/memorystore/configSchema.go +++ b/internal/memorystore/configSchema.go @@ -51,39 +51,45 @@ const configSchema = `{ }, "nats": { "description": "Configuration for accepting published data through NATS.", - "type": "object", - "properties": { - "address": { - "description": "Address of the NATS server.", - "type": "string" - }, - "username": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "password": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "creds-file-path": { - "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", - "type": "string" - }, - "subscriptions": { - "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", - "type": "object", - "properties": { - "subscribe-to": { - "description": "Channel name", - "type": "string" - }, - "cluster-tag": { - "description": "Optional: Allow lines without a cluster tag, use this as default", - "type": "string" - } - } - } - } + "type": "array", + "items": { + "type": "object", + "properties": { + "address": { + "description": "Address of the NATS server.", + "type": "string" + }, + "username": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "password": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "creds-file-path": { + "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", + "type": "string" + }, + "subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "array", + "items": { + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } + } + } + } + } } } }` From 44e98e8f2fd7c73018cee82317d50e7214484882 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Mon, 27 Oct 2025 20:44:40 +0100 Subject: [PATCH 4/8] Fix to avro reader --- configs/config-demo.json | 2 +- internal/memorystore/avroCheckpoint.go | 46 ++++++++++++++++++++++++++ internal/memorystore/buffer.go | 3 -- internal/memorystore/checkpoint.go | 26 +++++++++++++++ internal/memorystore/memorystore.go | 3 -- 5 files changed, 73 insertions(+), 7 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index 197aee0..7daff78 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -78,7 +78,7 @@ ], "metric-store": { "checkpoints": { - "file-format": "json", + "file-format": "avro", "interval": "1h", "directory": "./var/checkpoints", "restore": "48h" diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 3642186..563c2aa 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -28,6 +28,52 @@ var NumAvroWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") +func UpdateAvroFile(f *os.File, insertCount int64) error { + filePath := f.Name() + f.Close() // close the original handle immediately + + // Reopen fresh for reading + readFile, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to reopen file for reading: %v", err) + } + defer readFile.Close() + + reader, err := goavro.NewOCFReader(readFile) + if err != nil { + return fmt.Errorf("failed to create OCF reader: %v", err) + } + + codec := reader.Codec() + + // Now reopen again for appending + appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644) + if err != nil { + return fmt.Errorf("failed to reopen file for appending: %v", err) + } + defer appendFile.Close() + + recordList := make([]map[string]any, insertCount) + for i := range recordList { + recordList[i] = make(map[string]any) + } + + writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: appendFile, + Codec: codec, + CompressionName: goavro.CompressionDeflateLabel, + }) + if err != nil { + return fmt.Errorf("failed to create OCF writer: %v", err) + } + + if err := writer.Append(recordList); err != nil { + return fmt.Errorf("failed to append record: %v", err) + } + + return nil +} + func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) selectors := make([][]string, 0) diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index 413b04e..a942d0a 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -7,7 +7,6 @@ package memorystore import ( "errors" - "fmt" "sync" "github.com/ClusterCockpit/cc-lib/schema" @@ -188,8 +187,6 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 i++ } - fmt.Printf("Given From : %d, To: %d\n", from, to) - return data[:i], from, t, nil } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index fe09b9e..465bb0d 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -492,6 +492,32 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return nil } + interval, err := time.ParseDuration(Keys.Checkpoints.Interval) + if err != nil { + fmt.Printf("error while parsing interval: %#v", err) + } + + now := time.Now().Unix() + cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix() + + newCount := (min(now, cutOff) - fromTimestamp) / resolution + + if recordCounter < newCount { + // fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount) + + insertCount := newCount - recordCounter + for range insertCount { + for key := range metricsData { + metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0)) + } + } + + err := UpdateAvroFile(f, insertCount) + if err != nil { + fmt.Printf("error while inserting blanks into avro: %s\n", err) + } + } + for key, floatArray := range metricsData { metricName := ReplaceKey(key) diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 552390e..d76b83b 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -10,7 +10,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "os" "os/signal" "runtime" @@ -387,8 +386,6 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) - fmt.Printf("Requested From : %d, To: %d\n", from, to) - err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { From ea7660ddb3ed0be0d89b9ea4d05201029ecf5296 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 28 Oct 2025 08:50:33 +0100 Subject: [PATCH 5/8] Revert "Fix wrong memorystore nats schema" This reverts commit 856ccbb969b82fb418ce33effa44ecc6a09d79f5. --- internal/memorystore/configSchema.go | 72 +++++++++++++--------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/internal/memorystore/configSchema.go b/internal/memorystore/configSchema.go index a40b95e..133ba58 100644 --- a/internal/memorystore/configSchema.go +++ b/internal/memorystore/configSchema.go @@ -51,45 +51,39 @@ const configSchema = `{ }, "nats": { "description": "Configuration for accepting published data through NATS.", - "type": "array", - "items": { - "type": "object", - "properties": { - "address": { - "description": "Address of the NATS server.", - "type": "string" - }, - "username": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "password": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "creds-file-path": { - "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", - "type": "string" - }, - "subscriptions": { - "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", - "type": "array", - "items": { - "type": "object", - "properties": { - "subscribe-to": { - "description": "Channel name", - "type": "string" - }, - "cluster-tag": { - "description": "Optional: Allow lines without a cluster tag, use this as default", - "type": "string" - } - } - } - } - } - } + "type": "object", + "properties": { + "address": { + "description": "Address of the NATS server.", + "type": "string" + }, + "username": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "password": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "creds-file-path": { + "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", + "type": "string" + }, + "subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } + } + } } } }` From 2287586700be1b207d8177ca82dbb86d4030f286 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 28 Oct 2025 08:53:43 +0100 Subject: [PATCH 6/8] Revert avro files writing logic --- internal/memorystore/avroCheckpoint.go | 46 -------------------------- internal/memorystore/checkpoint.go | 26 --------------- 2 files changed, 72 deletions(-) diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 563c2aa..3642186 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -28,52 +28,6 @@ var NumAvroWorkers int = 4 var ErrNoNewData error = errors.New("no data in the pool") -func UpdateAvroFile(f *os.File, insertCount int64) error { - filePath := f.Name() - f.Close() // close the original handle immediately - - // Reopen fresh for reading - readFile, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to reopen file for reading: %v", err) - } - defer readFile.Close() - - reader, err := goavro.NewOCFReader(readFile) - if err != nil { - return fmt.Errorf("failed to create OCF reader: %v", err) - } - - codec := reader.Codec() - - // Now reopen again for appending - appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644) - if err != nil { - return fmt.Errorf("failed to reopen file for appending: %v", err) - } - defer appendFile.Close() - - recordList := make([]map[string]any, insertCount) - for i := range recordList { - recordList[i] = make(map[string]any) - } - - writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ - W: appendFile, - Codec: codec, - CompressionName: goavro.CompressionDeflateLabel, - }) - if err != nil { - return fmt.Errorf("failed to create OCF writer: %v", err) - } - - if err := writer.Append(recordList); err != nil { - return fmt.Errorf("failed to append record: %v", err) - } - - return nil -} - func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) selectors := make([][]string, 0) diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index 465bb0d..fe09b9e 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -492,32 +492,6 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { return nil } - interval, err := time.ParseDuration(Keys.Checkpoints.Interval) - if err != nil { - fmt.Printf("error while parsing interval: %#v", err) - } - - now := time.Now().Unix() - cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix() - - newCount := (min(now, cutOff) - fromTimestamp) / resolution - - if recordCounter < newCount { - // fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount) - - insertCount := newCount - recordCounter - for range insertCount { - for key := range metricsData { - metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0)) - } - } - - err := UpdateAvroFile(f, insertCount) - if err != nil { - fmt.Printf("error while inserting blanks into avro: %s\n", err) - } - } - for key, floatArray := range metricsData { metricName := ReplaceKey(key) From 3c1a7e0171f8e4241e26fae691af52ef0b6eef7a Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 28 Oct 2025 09:42:28 +0100 Subject: [PATCH 7/8] Fixed the behavior of avro write to old files --- internal/memorystore/avroCheckpoint.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 3642186..4d36151 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -25,7 +25,7 @@ import ( ) var NumAvroWorkers int = 4 - +var startUp bool = true var ErrNoNewData error = errors.New("no data in the pool") func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { @@ -96,6 +96,9 @@ func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { if errs > 0 { return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) } + + startUp = false + return int(n), nil } @@ -143,6 +146,10 @@ func getTimestamp(dir string) int64 { interval, _ := time.ParseDuration(Keys.Checkpoints.Interval) updateTime := time.Unix(maxTS, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix() + if startUp { + return 0 + } + if updateTime < time.Now().Unix() { return 0 } From 2287f4493aae730faea9ddb47969e927d3d30498 Mon Sep 17 00:00:00 2001 From: Michael Panzlaff Date: Tue, 28 Oct 2025 13:14:33 +0100 Subject: [PATCH 8/8] Reapply "Fix wrong memorystore nats schema" This reverts commit ea7660ddb3ed0be0d89b9ea4d05201029ecf5296. --- internal/memorystore/configSchema.go | 64 +++++++++++++++------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/internal/memorystore/configSchema.go b/internal/memorystore/configSchema.go index 133ba58..2616edc 100644 --- a/internal/memorystore/configSchema.go +++ b/internal/memorystore/configSchema.go @@ -51,35 +51,41 @@ const configSchema = `{ }, "nats": { "description": "Configuration for accepting published data through NATS.", - "type": "object", - "properties": { - "address": { - "description": "Address of the NATS server.", - "type": "string" - }, - "username": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "password": { - "description": "Optional: If configured with username/password method.", - "type": "string" - }, - "creds-file-path": { - "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", - "type": "string" - }, - "subscriptions": { - "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", - "type": "object", - "properties": { - "subscribe-to": { - "description": "Channel name", - "type": "string" - }, - "cluster-tag": { - "description": "Optional: Allow lines without a cluster tag, use this as default", - "type": "string" + "type": "array", + "items": { + "type": "object", + "properties": { + "address": { + "description": "Address of the NATS server.", + "type": "string" + }, + "username": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "password": { + "description": "Optional: If configured with username/password method.", + "type": "string" + }, + "creds-file-path": { + "description": "Optional: If configured with Credential File method. Path to your NATS cred file.", + "type": "string" + }, + "subscriptions": { + "description": "Array of various subscriptions. Allows to subscibe to different subjects and publishers.", + "type": "array", + "items": { + "type": "object", + "properties": { + "subscribe-to": { + "description": "Channel name", + "type": "string" + }, + "cluster-tag": { + "description": "Optional: Allow lines without a cluster tag, use this as default", + "type": "string" + } + } } } }