Run go fix

This commit is contained in:
2026-02-27 14:40:26 +01:00
parent a1db8263d7
commit a418abc7d5
8 changed files with 22 additions and 36 deletions

View File

@@ -302,7 +302,7 @@ func (api *RestAPI) runTagger(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "text/plain") rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(http.StatusOK) rw.WriteHeader(http.StatusOK)
if _, err := rw.Write([]byte(fmt.Sprintf("Tagger %s started", name))); err != nil { if _, err := rw.Write(fmt.Appendf(nil, "Tagger %s started", name)); err != nil {
cclog.Errorf("Failed to write response: %v", err) cclog.Errorf("Failed to write response: %v", err)
} }
} }

View File

@@ -501,9 +501,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
var wg sync.WaitGroup var wg sync.WaitGroup
for range numWorkers { for range numWorkers {
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
for jobPath := range jobPaths { for jobPath := range jobPaths {
job, err := loadJobMeta(filepath.Join(jobPath, "meta.json")) job, err := loadJobMeta(filepath.Join(jobPath, "meta.json"))
if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) { if err != nil && !errors.Is(err, &jsonschema.ValidationError{}) {
@@ -529,7 +527,7 @@ func (fsa *FsArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
} }
} }
}() })
} }
clustersDir, err := os.ReadDir(fsa.path) clustersDir, err := os.ReadDir(fsa.path)

View File

@@ -821,9 +821,7 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
var wg sync.WaitGroup var wg sync.WaitGroup
for range numWorkers { for range numWorkers {
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
for metaKey := range metaKeys { for metaKey := range metaKeys {
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{ result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s3a.bucket), Bucket: aws.String(s3a.bucket),
@@ -859,7 +857,7 @@ func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
} }
} }
}() })
} }
for _, cluster := range s3a.clusters { for _, cluster := range s3a.clusters {

View File

@@ -576,9 +576,7 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
var wg sync.WaitGroup var wg sync.WaitGroup
for range numWorkers { for range numWorkers {
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
for row := range jobRows { for row := range jobRows {
job, err := DecodeJobMeta(bytes.NewReader(row.metaBlob)) job, err := DecodeJobMeta(bytes.NewReader(row.metaBlob))
if err != nil { if err != nil {
@@ -617,7 +615,7 @@ func (sa *SqliteArchive) Iter(loadMetricData bool) <-chan JobContainer {
ch <- JobContainer{Meta: job, Data: nil} ch <- JobContainer{Meta: job, Data: nil}
} }
} }
}() })
} }
for { for {

View File

@@ -49,9 +49,7 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// runWorker takes simple values to configure what it does // runWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
d, err := time.ParseDuration(interval) d, err := time.ParseDuration(interval)
if err != nil { if err != nil {
@@ -85,7 +83,7 @@ func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mod
} }
} }
} }
}() })
} }
var ErrNoNewArchiveData error = errors.New("all data already archived") var ErrNoNewArchiveData error = errors.New("all data already archived")

View File

@@ -96,9 +96,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
d, err := time.ParseDuration(Keys.Checkpoints.Interval) d, err := time.ParseDuration(Keys.Checkpoints.Interval)
if err != nil { if err != nil {
@@ -149,7 +147,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
} }
}() })
} }
// MarshalJSON provides optimized JSON encoding for CheckpointMetrics. // MarshalJSON provides optimized JSON encoding for CheckpointMetrics.

View File

@@ -320,9 +320,7 @@ func Shutdown() {
func Retention(wg *sync.WaitGroup, ctx context.Context) { func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory) d, err := time.ParseDuration(Keys.RetentionInMemory)
if err != nil { if err != nil {
cclog.Fatal(err) cclog.Fatal(err)
@@ -361,7 +359,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
state.mu.Unlock() state.mu.Unlock()
} }
} }
}() })
} }
// MemoryUsageTracker starts a background goroutine that monitors memory usage. // MemoryUsageTracker starts a background goroutine that monitors memory usage.
@@ -382,9 +380,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) { func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore() ms := GetMemoryStore()
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
d := DefaultMemoryUsageTrackerInterval d := DefaultMemoryUsageTrackerInterval
if d <= 0 { if d <= 0 {
@@ -470,7 +466,7 @@ func MemoryUsageTracker(wg *sync.WaitGroup, ctx context.Context) {
} }
} }
} }
}() })
} }
// Free removes metric data older than the given time while preserving data for active nodes. // Free removes metric data older than the given time while preserving data for active nodes.

View File

@@ -65,6 +65,7 @@ import (
"math" "math"
"os" "os"
"path" "path"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -114,9 +115,7 @@ type walFileState struct {
// and appends binary WAL records to per-host current.wal files. // and appends binary WAL records to per-host current.wal files.
// Also handles WAL rotation requests from the checkpoint goroutine. // Also handles WAL rotation requests from the checkpoint goroutine.
func WALStaging(wg *sync.WaitGroup, ctx context.Context) { func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
wg.Add(1) wg.Go(func() {
go func() {
defer wg.Done()
if Keys.Checkpoints.FileFormat == "json" { if Keys.Checkpoints.FileFormat == "json" {
return return
@@ -220,7 +219,7 @@ func WALStaging(wg *sync.WaitGroup, ctx context.Context) {
processRotate(req) processRotate(req)
} }
} }
}() })
} }
// RotateWALFiles sends rotation requests for the given host directories // RotateWALFiles sends rotation requests for the given host directories
@@ -478,11 +477,12 @@ func joinSelector(sel []string) string {
if len(sel) == 0 { if len(sel) == 0 {
return "" return ""
} }
result := sel[0] var result strings.Builder
result.WriteString(sel[0])
for i := 1; i < len(sel); i++ { for i := 1; i < len(sel); i++ {
result += "\x00" + sel[i] result.WriteString("\x00" + sel[i])
} }
return result return result.String()
} }
// ToCheckpointWAL writes binary snapshot files for all hosts in parallel. // ToCheckpointWAL writes binary snapshot files for all hosts in parallel.