Introduce nodeprovider interface to break import cycle

This commit is contained in:
2026-01-15 12:20:11 +01:00
parent 7c78407c49
commit 9c92a7796b
2 changed files with 36 additions and 9 deletions

View File

@@ -320,8 +320,13 @@ func runServer(ctx context.Context) error {
mscfg := ccconf.GetPackageConfig("metric-store")
if mscfg != nil {
metricstore.Init(mscfg, &wg)
// Inject repository as NodeProvider to break import cycle
ms := metricstore.GetMemoryStore()
jobRepo := repository.GetJobRepository()
ms.SetNodeProvider(jobRepo)
} else {
cclog.Debug("Metric store configuration not found, skipping metricstore initialization")
return fmt.Errorf("missing metricstore configuration")
}
// Start archiver and task manager

View File

@@ -45,6 +45,15 @@ var (
shutdownFunc context.CancelFunc
)
// NodeProvider provides information about nodes currently in use by running jobs.
// This interface allows metricstore to query job information without directly
// depending on the repository package, breaking the import cycle.
type NodeProvider interface {
// GetUsedNodes returns a map of cluster names to sorted lists of unique hostnames
// that are currently in use by jobs that started before the given timestamp.
GetUsedNodes(ts int64) (map[string][]string, error)
}
type Metric struct {
Name string
Value schema.Float
@@ -54,6 +63,7 @@ type Metric struct {
type MemoryStore struct {
Metrics map[string]MetricConfig
root Level
nodeProvider NodeProvider // Injected dependency for querying running jobs
}
func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
@@ -196,6 +206,14 @@ func GetMemoryStore() *MemoryStore {
return msInstance
}
// SetNodeProvider sets the NodeProvider implementation for the MemoryStore.
// This must be called during initialization to provide job state information
// for selective buffer retention during Free operations.
// If not set, the Free function will fall back to freeing all buffers.
func (ms *MemoryStore) SetNodeProvider(provider NodeProvider) {
ms.nodeProvider = provider
}
func Shutdown() {
if shutdownFunc != nil {
shutdownFunc()
@@ -263,13 +281,17 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
}
func Free(ms *MemoryStore, t time.Time) (int, error) {
// jobRepo := repository.GetJobRepository()
// excludeSelectors, err := jobRepo.GetUsedNodes(t.Unix())
// if err != nil {
// return 0, err
// }
// If no NodeProvider is configured, free all buffers older than t
if ms.nodeProvider == nil {
return ms.Free(nil, t.Unix())
}
excludeSelectors := make(map[string][]string, 0)
excludeSelectors, err := ms.nodeProvider.GetUsedNodes(t.Unix())
if err != nil {
return 0, err
}
// excludeSelectors := make(map[string][]string, 0)
// excludeSelectors := map[string][]string{
// "alex": {"a0122", "a0123", "a0225"},