diff --git a/archive.go b/archive.go index fb39cfa..62e6922 100644 --- a/archive.go +++ b/archive.go @@ -12,6 +12,7 @@ import ( "os" "path" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -59,7 +60,15 @@ type CheckpointFile struct { var ErrNoNewData error = errors.New("all data already archived") -var NumWorkers int = 6 +var NumWorkers int = 4 + +func init() { + maxWorkers := 10 + NumWorkers = runtime.NumCPU()/2 + 1 + if NumWorkers > maxWorkers { + NumWorkers = maxWorkers + } +} // Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // On a per-host basis a new JSON file is created. I have no idea if this will scale. @@ -116,6 +125,11 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { dir: dir, selector: selectors[i], } + + // See comment in FromCheckpoint() + if i%NumWorkers == 0 { + runtime.GC() + } } close(work) @@ -227,7 +241,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, NumWorkers*2) + work := make(chan [2]string, NumWorkers) n, errs := int32(0), int32(0) wg.Add(NumWorkers) @@ -246,6 +260,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { }() } + i := 0 clustersDir, err := os.ReadDir(dir) for _, clusterDir := range clustersDir { if !clusterDir.IsDir() { @@ -265,6 +280,16 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { goto done } + i++ + if i%NumWorkers == 0 && i > 100 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + runtime.GC() + } + work <- [2]string{clusterDir.Name(), hostDir.Name()} } } diff --git a/cc-metric-store.go b/cc-metric-store.go index 6845b06..b9bc135 100644 --- a/cc-metric-store.go +++ b/cc-metric-store.go @@ -124,6 +124,19 @@ func loadConfiguration(file string) Config { func intervals(wg *sync.WaitGroup, ctx context.Context) { wg.Add(3) + // go func() { + // defer wg.Done() + // ticks := time.Tick(30 * time.Minute) + // for { + // select { + // case <-ctx.Done(): + // return + // case <-ticks: + // runtime.GC() + // } + // } + // }() + go func() { defer wg.Done() d, err := time.ParseDuration(conf.RetentionInMemory) @@ -241,12 +254,18 @@ func main() { if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) } else { - log.Printf("Checkpoints loaded (%d files, %d MB, that took %dms)\n", files, loadedData, time.Since(startupTime).Milliseconds()) + log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) } + // Try to use less memory by forcing a GC run here and then + // lowering the target percentage. The default of 100 means + // that only once the ratio of new allocations execeds the + // previously active heap, a GC is triggered. + // Forcing a GC here will set the "previously active heap" + // to a minumum. runtime.GC() - if loadedData > 1000 { - debug.SetGCPercent(20) + if loadedData > 1000 && os.Getenv("GOGC") == "" { + debug.SetGCPercent(10) } ctx, shutdown := context.WithCancel(context.Background()) diff --git a/scripts/send-fake-data.go b/scripts/send-fake-data.go new file mode 100644 index 0000000..2fdc479 --- /dev/null +++ b/scripts/send-fake-data.go @@ -0,0 +1,105 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "log" + "math" + "math/rand" + "net/http" + "time" +) + +const token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw" +const ccmsurl = "http://localhost:8081/api/write" +const cluster = "fakedev" +const sockets = 2 +const cpus = 8 +const freq = 15 * time.Second + +var hosts = []string{"fake001", "fake002", "fake003", "fake004", "fake005"} +var metrics = []struct { + Name string + Type string + AvgValue float64 +}{ + {"flops_any", "cpu", 10.0}, + {"mem_bw", "socket", 50.0}, + {"ipc", "cpu", 1.25}, + {"cpu_load", "node", 4}, + {"mem_used", "node", 20}, +} + +var states = make([]float64, 0) + +func send(client *http.Client, t int64) { + msg := &bytes.Buffer{} + + i := 0 + for _, host := range hosts { + for _, metric := range metrics { + n := 1 + if metric.Type == "socket" { + n = sockets + } else if metric.Type == "cpu" { + n = cpus + } + + for j := 0; j < n; j++ { + fmt.Fprintf(msg, "%s,cluster=%s,host=%s,type=%s", metric.Name, cluster, host, metric.Type) + if metric.Type == "socket" { + fmt.Fprintf(msg, ",type-id=%d", j) + } else if metric.Type == "cpu" { + fmt.Fprintf(msg, ",type-id=%d", j) + } + + x := metric.AvgValue + math.Sin(states[i])*(metric.AvgValue/10.) + states[i] += 0.1 + fmt.Fprintf(msg, " value=%f ", x) + + fmt.Fprintf(msg, "%d\n", t) + i++ + } + } + } + + req, _ := http.NewRequest(http.MethodPost, ccmsurl, msg) + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + res, err := client.Do(req) + if err != nil { + log.Print(err) + return + } + if res.StatusCode != http.StatusOK { + body, _ := io.ReadAll(res.Body) + log.Printf("%s: %s", res.Status, string(body)) + } +} + +func main() { + for range hosts { + for _, m := range metrics { + n := 1 + if m.Type == "socket" { + n = sockets + } else if m.Type == "cpu" { + n = cpus + } + + for i := 0; i < n; i++ { + states = append(states, rand.Float64()*100) + } + } + } + + client := &http.Client{} + + i := 0 + for t := range time.Tick(freq) { + log.Printf("tick... (#%d)", i) + i++ + + send(client, t.Unix()) + } +}