mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-01-14 00:09:20 +01:00
Some more heap/gc tuning
This commit is contained in:
parent
3eba385e97
commit
aca13c1769
29
archive.go
29
archive.go
@ -12,6 +12,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -59,7 +60,15 @@ type CheckpointFile struct {
|
|||||||
|
|
||||||
var ErrNoNewData error = errors.New("all data already archived")
|
var ErrNoNewData error = errors.New("all data already archived")
|
||||||
|
|
||||||
var NumWorkers int = 6
|
var NumWorkers int = 4
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
maxWorkers := 10
|
||||||
|
NumWorkers = runtime.NumCPU()/2 + 1
|
||||||
|
if NumWorkers > maxWorkers {
|
||||||
|
NumWorkers = maxWorkers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
|
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
|
||||||
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
|
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
|
||||||
@ -116,6 +125,11 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
|
|||||||
dir: dir,
|
dir: dir,
|
||||||
selector: selectors[i],
|
selector: selectors[i],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// See comment in FromCheckpoint()
|
||||||
|
if i%NumWorkers == 0 {
|
||||||
|
runtime.GC()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close(work)
|
close(work)
|
||||||
@ -227,7 +241,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
|||||||
// Different host's data is loaded to memory in parallel.
|
// Different host's data is loaded to memory in parallel.
|
||||||
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
work := make(chan [2]string, NumWorkers*2)
|
work := make(chan [2]string, NumWorkers)
|
||||||
n, errs := int32(0), int32(0)
|
n, errs := int32(0), int32(0)
|
||||||
|
|
||||||
wg.Add(NumWorkers)
|
wg.Add(NumWorkers)
|
||||||
@ -246,6 +260,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
clustersDir, err := os.ReadDir(dir)
|
clustersDir, err := os.ReadDir(dir)
|
||||||
for _, clusterDir := range clustersDir {
|
for _, clusterDir := range clustersDir {
|
||||||
if !clusterDir.IsDir() {
|
if !clusterDir.IsDir() {
|
||||||
@ -265,6 +280,16 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
|||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i++
|
||||||
|
if i%NumWorkers == 0 && i > 100 {
|
||||||
|
// Forcing garbage collection runs here regulary during the loading of checkpoints
|
||||||
|
// will decrease the total heap size after loading everything back to memory is done.
|
||||||
|
// While loading data, the heap will grow fast, so the GC target size will double
|
||||||
|
// almost always. By forcing GCs here, we can keep it growing more slowly so that
|
||||||
|
// at the end, less memory is wasted.
|
||||||
|
runtime.GC()
|
||||||
|
}
|
||||||
|
|
||||||
work <- [2]string{clusterDir.Name(), hostDir.Name()}
|
work <- [2]string{clusterDir.Name(), hostDir.Name()}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,6 +124,19 @@ func loadConfiguration(file string) Config {
|
|||||||
|
|
||||||
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
wg.Add(3)
|
wg.Add(3)
|
||||||
|
// go func() {
|
||||||
|
// defer wg.Done()
|
||||||
|
// ticks := time.Tick(30 * time.Minute)
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-ctx.Done():
|
||||||
|
// return
|
||||||
|
// case <-ticks:
|
||||||
|
// runtime.GC()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
d, err := time.ParseDuration(conf.RetentionInMemory)
|
d, err := time.ParseDuration(conf.RetentionInMemory)
|
||||||
@ -241,12 +254,18 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Checkpoints loaded (%d files, %d MB, that took %dms)\n", files, loadedData, time.Since(startupTime).Milliseconds())
|
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to use less memory by forcing a GC run here and then
|
||||||
|
// lowering the target percentage. The default of 100 means
|
||||||
|
// that only once the ratio of new allocations execeds the
|
||||||
|
// previously active heap, a GC is triggered.
|
||||||
|
// Forcing a GC here will set the "previously active heap"
|
||||||
|
// to a minumum.
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
if loadedData > 1000 {
|
if loadedData > 1000 && os.Getenv("GOGC") == "" {
|
||||||
debug.SetGCPercent(20)
|
debug.SetGCPercent(10)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, shutdown := context.WithCancel(context.Background())
|
ctx, shutdown := context.WithCancel(context.Background())
|
||||||
|
105
scripts/send-fake-data.go
Normal file
105
scripts/send-fake-data.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.eyJ1c2VyIjoiYWRtaW4iLCJyb2xlcyI6WyJST0xFX0FETUlOIiwiUk9MRV9BTkFMWVNUIiwiUk9MRV9VU0VSIl19.d-3_3FZTsadPjDEdsWrrQ7nS0edMAR4zjl-eK7rJU3HziNBfI9PDHDIpJVHTNN5E5SlLGLFXctWyKAkwhXL-Dw"
|
||||||
|
const ccmsurl = "http://localhost:8081/api/write"
|
||||||
|
const cluster = "fakedev"
|
||||||
|
const sockets = 2
|
||||||
|
const cpus = 8
|
||||||
|
const freq = 15 * time.Second
|
||||||
|
|
||||||
|
var hosts = []string{"fake001", "fake002", "fake003", "fake004", "fake005"}
|
||||||
|
var metrics = []struct {
|
||||||
|
Name string
|
||||||
|
Type string
|
||||||
|
AvgValue float64
|
||||||
|
}{
|
||||||
|
{"flops_any", "cpu", 10.0},
|
||||||
|
{"mem_bw", "socket", 50.0},
|
||||||
|
{"ipc", "cpu", 1.25},
|
||||||
|
{"cpu_load", "node", 4},
|
||||||
|
{"mem_used", "node", 20},
|
||||||
|
}
|
||||||
|
|
||||||
|
var states = make([]float64, 0)
|
||||||
|
|
||||||
|
func send(client *http.Client, t int64) {
|
||||||
|
msg := &bytes.Buffer{}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for _, host := range hosts {
|
||||||
|
for _, metric := range metrics {
|
||||||
|
n := 1
|
||||||
|
if metric.Type == "socket" {
|
||||||
|
n = sockets
|
||||||
|
} else if metric.Type == "cpu" {
|
||||||
|
n = cpus
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < n; j++ {
|
||||||
|
fmt.Fprintf(msg, "%s,cluster=%s,host=%s,type=%s", metric.Name, cluster, host, metric.Type)
|
||||||
|
if metric.Type == "socket" {
|
||||||
|
fmt.Fprintf(msg, ",type-id=%d", j)
|
||||||
|
} else if metric.Type == "cpu" {
|
||||||
|
fmt.Fprintf(msg, ",type-id=%d", j)
|
||||||
|
}
|
||||||
|
|
||||||
|
x := metric.AvgValue + math.Sin(states[i])*(metric.AvgValue/10.)
|
||||||
|
states[i] += 0.1
|
||||||
|
fmt.Fprintf(msg, " value=%f ", x)
|
||||||
|
|
||||||
|
fmt.Fprintf(msg, "%d\n", t)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, _ := http.NewRequest(http.MethodPost, ccmsurl, msg)
|
||||||
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||||
|
res, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
log.Print(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(res.Body)
|
||||||
|
log.Printf("%s: %s", res.Status, string(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
for range hosts {
|
||||||
|
for _, m := range metrics {
|
||||||
|
n := 1
|
||||||
|
if m.Type == "socket" {
|
||||||
|
n = sockets
|
||||||
|
} else if m.Type == "cpu" {
|
||||||
|
n = cpus
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
states = append(states, rand.Float64()*100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
for t := range time.Tick(freq) {
|
||||||
|
log.Printf("tick... (#%d)", i)
|
||||||
|
i++
|
||||||
|
|
||||||
|
send(client, t.Unix())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user