diff --git a/internal/memorystore/api.go b/internal/memorystore/api.go index 1f7a531..b96dc1f 100644 --- a/internal/memorystore/api.go +++ b/internal/memorystore/api.go @@ -6,12 +6,18 @@ package memorystore import ( + "errors" "math" "github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/util" ) +var ( + ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'") + ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty") +) + type APIMetricData struct { Error *string `json:"error,omitempty"` Data schema.FloatArray `json:"data,omitempty"` @@ -109,10 +115,14 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr } func FetchData(req APIQueryRequest) (*APIQueryResponse, error) { - req.WithData = true - req.WithData = true - req.WithData = true + if req.From > req.To { + return nil, ErrInvalidTimeRange + } + if req.Cluster == "" && req.ForAllNodes != nil { + return nil, ErrEmptyCluster + } + req.WithData = true ms := GetMemoryStore() response := APIQueryResponse{ diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index 56065aa..5019ee7 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -32,17 +32,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) { return } - ticks := func() <-chan time.Time { - if d <= 0 { - return nil - } - return time.NewTicker(d).C - }() + ticker := time.NewTicker(d) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticks: + case <-ticker.C: t := time.Now().Add(-d) cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339)) n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir, @@ -165,25 +162,33 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead n := 0 for _, checkpoint := range files { - filename := filepath.Join(dir, checkpoint) - r, err := os.Open(filename) + // Use closure to ensure file is closed immediately after use, + // avoiding file descriptor leak from defer in loop + err := func() error { + filename := filepath.Join(dir, checkpoint) + r, err := os.Open(filename) + if err != nil { + return err + } + defer r.Close() + + w, err := zw.Create(checkpoint) + if err != nil { + return err + } + + if _, err = io.Copy(w, r); err != nil { + return err + } + + if err = os.Remove(filename); err != nil { + return err + } + return nil + }() if err != nil { return n, err } - defer r.Close() - - w, err := zw.Create(checkpoint) - if err != nil { - return n, err - } - - if _, err = io.Copy(w, r); err != nil { - return n, err - } - - if err = os.Remove(filename); err != nil { - return n, err - } n += 1 } diff --git a/internal/memorystore/avroCheckpoint.go b/internal/memorystore/avroCheckpoint.go index 4d36151..42e5f62 100644 --- a/internal/memorystore/avroCheckpoint.go +++ b/internal/memorystore/avroCheckpoint.go @@ -24,9 +24,8 @@ import ( "github.com/linkedin/goavro/v2" ) -var NumAvroWorkers int = 4 +var NumAvroWorkers int = DefaultAvroWorkers var startUp bool = true -var ErrNoNewData error = errors.New("no data in the pool") func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { levels := make([]*AvroLevel, 0) @@ -464,19 +463,15 @@ func generateRecord(data map[string]schema.Float) map[string]any { } func correctKey(key string) string { - // Replace any invalid characters in the key - // For example, replace spaces with underscores - key = strings.ReplaceAll(key, ":", "___") - key = strings.ReplaceAll(key, ".", "__") - + key = strings.ReplaceAll(key, "_", "_0x5F_") + key = strings.ReplaceAll(key, ":", "_0x3A_") + key = strings.ReplaceAll(key, ".", "_0x2E_") return key } func ReplaceKey(key string) string { - // Replace any invalid characters in the key - // For example, replace spaces with underscores - key = strings.ReplaceAll(key, "___", ":") - key = strings.ReplaceAll(key, "__", ".") - + key = strings.ReplaceAll(key, "_0x2E_", ".") + key = strings.ReplaceAll(key, "_0x3A_", ":") + key = strings.ReplaceAll(key, "_0x5F_", "_") return key } diff --git a/internal/memorystore/avroHelper.go b/internal/memorystore/avroHelper.go index 64e5706..a6f6c9b 100644 --- a/internal/memorystore/avroHelper.go +++ b/internal/memorystore/avroHelper.go @@ -42,7 +42,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { metricName := "" for _, selectorName := range val.Selector { - metricName += selectorName + Delimiter + metricName += selectorName + SelectorDelimiter } metricName += val.MetricName @@ -54,7 +54,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { var selector []string selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10)) - if !testEq(oldSelector, selector) { + if !stringSlicesEqual(oldSelector, selector) { // Get the Avro level for the metric avroLevel = avroStore.root.findAvroLevelOrCreate(selector) @@ -71,7 +71,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { }() } -func testEq(a, b []string) bool { +func stringSlicesEqual(a, b []string) bool { if len(a) != len(b) { return false } diff --git a/internal/memorystore/avroStruct.go b/internal/memorystore/avroStruct.go index cc8005c..bde9e02 100644 --- a/internal/memorystore/avroStruct.go +++ b/internal/memorystore/avroStruct.go @@ -13,12 +13,11 @@ import ( var ( LineProtocolMessages = make(chan *AvroStruct) - Delimiter = "ZZZZZ" + // SelectorDelimiter separates hierarchical selector components in metric names for Avro encoding + SelectorDelimiter = "_SEL_" ) -// CheckpointBufferMinutes should always be in minutes. -// Its controls the amount of data to hold for given amount of time. -var CheckpointBufferMinutes = 3 +var CheckpointBufferMinutes = DefaultCheckpointBufferMin type AvroStruct struct { MetricName string @@ -73,7 +72,7 @@ func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { } } - // The level does not exist, take write lock for unqiue access: + // The level does not exist, take write lock for unique access: l.lock.Lock() // While this thread waited for the write lock, another thread // could have created the child node. diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index cd2fd8f..55be2ad 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -12,15 +12,12 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// Default buffer capacity. -// `buffer.data` will only ever grow up to it's capacity and a new link +// BufferCap is the default buffer capacity. +// buffer.data will only ever grow up to its capacity and a new link // in the buffer chain will be created if needed so that no copying // of data or reallocation needs to happen on writes. -const ( - BufferCap int = 512 -) +const BufferCap int = DefaultBufferCapacity -// So that we can reuse allocations var bufferPool sync.Pool = sync.Pool{ New: func() any { return &buffer{ @@ -75,7 +72,6 @@ func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) { newbuf := newBuffer(ts, b.frequency) newbuf.prev = b b.next = newbuf - b.close() b = newbuf idx = 0 } @@ -103,8 +99,6 @@ func (b *buffer) firstWrite() int64 { return b.start + (b.frequency / 2) } -func (b *buffer) close() {} - // Return all known values from `from` to `to`. Gaps of information are represented as NaN. // Simple linear interpolation is done between the two neighboring cells if possible. // If values at the start or end are missing, instead of NaN values, the second and thrid @@ -139,8 +133,6 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 data[i] += schema.NaN } else if t < b.start { data[i] += schema.NaN - // } else if b.data[idx].IsNaN() { - // data[i] += interpolate(idx, b.data) } else { data[i] += b.data[idx] } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index e19cbf7..c676977 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -28,15 +28,10 @@ import ( "github.com/linkedin/goavro/v2" ) -// File operation constants const ( - // CheckpointFilePerms defines default permissions for checkpoint files CheckpointFilePerms = 0o644 - // CheckpointDirPerms defines default permissions for checkpoint directories - CheckpointDirPerms = 0o755 - // GCTriggerInterval determines how often GC is forced during checkpoint loading - // GC is triggered every GCTriggerInterval*NumWorkers loaded hosts - GCTriggerInterval = 100 + CheckpointDirPerms = 0o755 + GCTriggerInterval = DefaultGCTriggerInterval ) // Whenever changed, update MarshalJSON as well! @@ -71,17 +66,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { return } - ticks := func() <-chan time.Time { - if d <= 0 { - return nil - } - return time.NewTicker(d).C - }() + ticker := time.NewTicker(d) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticks: + case <-ticker.C: cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339)) now := time.Now() n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir, @@ -98,33 +90,23 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } else { go func() { defer wg.Done() - d, _ := time.ParseDuration("1m") select { case <-ctx.Done(): return case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute): - // This is the first tick untill we collect the data for given minutes. GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) - // log.Printf("Checkpointing %d avro files", count) - } - ticks := func() <-chan time.Time { - if d <= 0 { - return nil - } - return time.NewTicker(d).C - }() + ticker := time.NewTicker(DefaultAvroCheckpointInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-ticks: - // Regular ticks of 1 minute to write data. + case <-ticker.C: GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false) - // log.Printf("Checkpointing %d avro files", count) } } }() @@ -329,7 +311,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) if err != nil { - cclog.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error()) + cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error()) atomic.AddInt32(&errs, 1) } atomic.AddInt32(&n, int32(nn)) @@ -506,8 +488,8 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { for key, floatArray := range metricsData { metricName := ReplaceKey(key) - if strings.Contains(metricName, Delimiter) { - subString := strings.Split(metricName, Delimiter) + if strings.Contains(metricName, SelectorDelimiter) { + subString := strings.Split(metricName, SelectorDelimiter) lvl := l @@ -557,12 +539,10 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem next: nil, archived: true, } - b.close() minfo, ok := m.Metrics[metricName] if !ok { return nil - // return errors.New("Unkown metric: " + name) } prev := l.metrics[minfo.offset] @@ -616,17 +596,15 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { b := &buffer{ frequency: metric.Frequency, start: metric.Start, - data: metric.Data[0:n:n], // Space is wasted here :( + data: metric.Data[0:n:n], prev: nil, next: nil, archived: true, } - b.close() minfo, ok := m.Metrics[name] if !ok { continue - // return errors.New("Unkown metric: " + name) } prev := l.metrics[minfo.offset] diff --git a/internal/memorystore/config.go b/internal/memorystore/config.go index 8196ed6..fbd6234 100644 --- a/internal/memorystore/config.go +++ b/internal/memorystore/config.go @@ -7,6 +7,16 @@ package memorystore import ( "fmt" + "time" +) + +const ( + DefaultMaxWorkers = 10 + DefaultBufferCapacity = 512 + DefaultGCTriggerInterval = 100 + DefaultAvroWorkers = 4 + DefaultCheckpointBufferMin = 3 + DefaultAvroCheckpointInterval = time.Minute ) var InternalCCMSFlag bool = false @@ -14,7 +24,7 @@ var InternalCCMSFlag bool = false type MetricStoreConfig struct { // Number of concurrent workers for checkpoint and archive operations. // If not set or 0, defaults to min(runtime.NumCPU()/2+1, 10) - NumWorkers int `json:"num-workers"` + NumWorkers int `json:"num-workers"` Checkpoints struct { FileFormat string `json:"file-format"` Interval string `json:"interval"` @@ -31,20 +41,6 @@ type MetricStoreConfig struct { RootDir string `json:"directory"` DeleteInstead bool `json:"delete-instead"` } `json:"archive"` - Nats []*NatsConfig `json:"nats"` -} - -type NatsConfig struct { - // Address of the nats server - Address string `json:"address"` - - // Username/Password, optional - Username string `json:"username"` - Password string `json:"password"` - - // Creds file path - Credsfilepath string `json:"creds-file-path"` - Subscriptions []struct { // Channel name SubscribeTo string `json:"subscribe-to"` diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index aaa1210..f3b3d3f 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -46,7 +46,7 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { } } - // The level does not exist, take write lock for unqiue access: + // The level does not exist, take write lock for unique access: l.lock.Lock() // While this thread waited for the write lock, another thread // could have created the child node. diff --git a/internal/memorystore/lineprotocol.go b/internal/memorystore/lineprotocol.go index 2bbd7ee..87d3b9e 100644 --- a/internal/memorystore/lineprotocol.go +++ b/internal/memorystore/lineprotocol.go @@ -11,113 +11,31 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-backend/pkg/nats" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" "github.com/ClusterCockpit/cc-lib/schema" "github.com/influxdata/line-protocol/v2/lineprotocol" - "github.com/nats-io/nats.go" ) -// Each connection is handled in it's own goroutine. This is a blocking function. -// func ReceiveRaw(ctx context.Context, -// listener net.Listener, -// handleLine func(*lineprotocol.Decoder, string) error, -// ) error { -// var wg sync.WaitGroup - -// wg.Add(1) -// go func() { -// defer wg.Done() -// <-ctx.Done() -// if err := listener.Close(); err != nil { -// log.Printf("listener.Close(): %s", err.Error()) -// } -// }() - -// for { -// conn, err := listener.Accept() -// if err != nil { -// if errors.Is(err, net.ErrClosed) { -// break -// } - -// log.Printf("listener.Accept(): %s", err.Error()) -// } - -// wg.Add(2) -// go func() { -// defer wg.Done() -// defer conn.Close() - -// dec := lineprotocol.NewDecoder(conn) -// connctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// go func() { -// defer wg.Done() -// select { -// case <-connctx.Done(): -// conn.Close() -// case <-ctx.Done(): -// conn.Close() -// } -// }() - -// if err := handleLine(dec, "default"); err != nil { -// if errors.Is(err, net.ErrClosed) { -// return -// } - -// log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error()) -// errmsg := make([]byte, 128) -// errmsg = append(errmsg, `error: `...) -// errmsg = append(errmsg, err.Error()...) -// errmsg = append(errmsg, '\n') -// conn.Write(errmsg) -// } -// }() -// } - -// wg.Wait() -// return nil -// } - -// ReceiveNats connects to a nats server and subscribes to "updates". This is a -// blocking function. handleLine will be called for each line recieved via -// nats. Send `true` through the done channel for gracefull termination. -func ReceiveNats(conf *(NatsConfig), - ms *MemoryStore, +func ReceiveNats(ms *MemoryStore, workers int, ctx context.Context, ) error { - var opts []nats.Option - if conf.Username != "" && conf.Password != "" { - opts = append(opts, nats.UserInfo(conf.Username, conf.Password)) - } - - if conf.Credsfilepath != "" { - opts = append(opts, nats.UserCredentials(conf.Credsfilepath)) - } - - nc, err := nats.Connect(conf.Address, opts...) - if err != nil { - return err - } - defer nc.Close() + nc := nats.GetClient() var wg sync.WaitGroup - var subs []*nats.Subscription - msgs := make(chan *nats.Msg, workers*2) + msgs := make(chan []byte, workers*2) - for _, sc := range conf.Subscriptions { + for _, sc := range Keys.Subscriptions { clusterTag := sc.ClusterTag - var sub *nats.Subscription if workers > 1 { wg.Add(workers) for range workers { go func() { for m := range msgs { - dec := lineprotocol.NewDecoderWithBytes(m.Data) + dec := lineprotocol.NewDecoderWithBytes(m) if err := DecodeLine(dec, ms, clusterTag); err != nil { cclog.Errorf("error: %s", err.Error()) } @@ -127,37 +45,24 @@ func ReceiveNats(conf *(NatsConfig), }() } - sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { - msgs <- m + nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { + msgs <- data }) } else { - sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { - dec := lineprotocol.NewDecoderWithBytes(m.Data) + nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) { + dec := lineprotocol.NewDecoderWithBytes(data) if err := DecodeLine(dec, ms, clusterTag); err != nil { cclog.Errorf("error: %s", err.Error()) } }) } - - if err != nil { - return err - } - cclog.Infof("NATS subscription to '%s' on '%s' established", sc.SubscribeTo, conf.Address) - subs = append(subs, sub) + cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo) } <-ctx.Done() - for _, sub := range subs { - err = sub.Unsubscribe() - if err != nil { - cclog.Errorf("NATS unsubscribe failed: %s", err.Error()) - } - } close(msgs) wg.Wait() - nc.Close() - cclog.Print("NATS connection closed") return nil } @@ -266,8 +171,6 @@ func DecodeLine(dec *lineprotocol.Decoder, case "stype-id": subTypeBuf = append(subTypeBuf, val...) default: - // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) - // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) } } diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 3e372f3..259a86e 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -44,8 +44,6 @@ var ( shutdownFunc context.CancelFunc ) - - type Metric struct { Name string Value schema.Float @@ -71,8 +69,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // Set NumWorkers from config or use default if Keys.NumWorkers <= 0 { - maxWorkers := 10 - Keys.NumWorkers = min(runtime.NumCPU()/2+1, maxWorkers) + Keys.NumWorkers = min(runtime.NumCPU()/2+1, DefaultMaxWorkers) } cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers) @@ -144,20 +141,9 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { // Store the shutdown function for later use by Shutdown() shutdownFunc = shutdown - if Keys.Nats != nil { - for _, natsConf := range Keys.Nats { - // TODO: When multiple nats configs share a URL, do a single connect. - wg.Add(1) - nc := natsConf - go func() { - // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) - err := ReceiveNats(nc, ms, 1, ctx) - if err != nil { - cclog.Fatal(err) - } - wg.Done() - }() - } + err = ReceiveNats(ms, 1, ctx) + if err != nil { + cclog.Fatal(err) } } @@ -244,18 +230,18 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { return } - ticks := func() <-chan time.Time { - d := d / 2 - if d <= 0 { - return nil - } - return time.NewTicker(d).C - }() + tickInterval := d / 2 + if tickInterval <= 0 { + return + } + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticks: + case <-ticker.C: t := time.Now().Add(-d) cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) freed, err := ms.Free(nil, t.Unix()) @@ -332,7 +318,7 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric) + return nil, 0, 0, 0, errors.New("[METRICSTORE]> unknown metric: " + metric) } n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) diff --git a/internal/memorystore/stats.go b/internal/memorystore/stats.go index 91b1f2c..b2cb539 100644 --- a/internal/memorystore/stats.go +++ b/internal/memorystore/stats.go @@ -77,7 +77,7 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6 minfo, ok := m.Metrics[metric] if !ok { - return nil, 0, 0, errors.New("unkown metric: " + metric) + return nil, 0, 0, errors.New("unknown metric: " + metric) } n, samples := 0, 0