diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 12aa9104..9b8cd1b0 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -320,8 +320,13 @@ func runServer(ctx context.Context) error { mscfg := ccconf.GetPackageConfig("metric-store") if mscfg != nil { metricstore.Init(mscfg, &wg) + + // Inject repository as NodeProvider to break import cycle + ms := metricstore.GetMemoryStore() + jobRepo := repository.GetJobRepository() + ms.SetNodeProvider(jobRepo) } else { - cclog.Debug("Metric store configuration not found, skipping metricstore initialization") + return fmt.Errorf("missing metricstore configuration") } // Start archiver and task manager diff --git a/internal/metricstore/metricstore.go b/internal/metricstore/metricstore.go index 7c17cd88..37028654 100644 --- a/internal/metricstore/metricstore.go +++ b/internal/metricstore/metricstore.go @@ -45,6 +45,15 @@ var ( shutdownFunc context.CancelFunc ) +// NodeProvider provides information about nodes currently in use by running jobs. +// This interface allows metricstore to query job information without directly +// depending on the repository package, breaking the import cycle. +type NodeProvider interface { + // GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames + // that are currently in use by jobs that started before the given timestamp. + GetUsedNodes(ts int64) (map[string][]string, error) +} + type Metric struct { Name string Value schema.Float @@ -52,8 +61,9 @@ type Metric struct { } type MemoryStore struct { - Metrics map[string]MetricConfig - root Level + Metrics map[string]MetricConfig + root Level + nodeProvider NodeProvider // Injected dependency for querying running jobs } func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) { @@ -196,6 +206,14 @@ func GetMemoryStore() *MemoryStore { return msInstance } +// SetNodeProvider sets the NodeProvider implementation for the MemoryStore. +// This must be called during initialization to provide job state information +// for selective buffer retention during Free operations. +// If not set, the Free function will fall back to freeing all buffers. +func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) { + ms.nodeProvider = provider +} + func Shutdown() { if shutdownFunc != nil { shutdownFunc() @@ -263,13 +281,17 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) { } func Free(ms *MemoryStore, t time.Time) (int, error) { - // jobRepo := repository.GetJobRepository() - // excludeSelectors, err := jobRepo.GetUsedNodes(t.Unix()) - // if err != nil { - // return 0, err - // } + // If no NodeProvider is configured, free all buffers older than t + if ms.nodeProvider == nil { + return ms.Free(nil, t.Unix()) + } - excludeSelectors := make(map[string][]string, 0) + excludeSelectors, err := ms.nodeProvider.GetUsedNodes(t.Unix()) + if err != nil { + return 0, err + } + + // excludeSelectors := make(map[string][]string, 0) // excludeSelectors := map[string][]string{ // "alex": {"a0122", "a0123", "a0225"},