From 869f956ed0bc0e78b6861af727d497bf2fcbe9ad Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Wed, 27 Jul 2022 10:46:13 +0200 Subject: [PATCH] Enable binary checkpoints --- .github/workflows/test.yml | 2 +- cmd/cc-metric-store/cc-metric-store.go | 151 +++++++++++++++++++------ go.mod | 2 + go.sum | 2 + internal/memstore/json-checkpoints.go | 5 +- 5 files changed, 127 insertions(+), 35 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 36bac91..2622ce6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,7 +7,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.16.x + go-version: 1.18.x - name: Checkout code uses: actions/checkout@v2 - name: Build, Vet & Test diff --git a/cmd/cc-metric-store/cc-metric-store.go b/cmd/cc-metric-store/cc-metric-store.go index bb2606f..bf5feb3 100644 --- a/cmd/cc-metric-store/cc-metric-store.go +++ b/cmd/cc-metric-store/cc-metric-store.go @@ -6,11 +6,15 @@ import ( "encoding/json" "flag" "fmt" + "io" "log" "os" "os/signal" + "path/filepath" "runtime" "runtime/debug" + "strconv" + "strings" "sync" "syscall" "time" @@ -19,6 +23,7 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/api/apiv1" "github.com/ClusterCockpit/cc-metric-store/internal/memstore" "github.com/ClusterCockpit/cc-metric-store/internal/types" + "github.com/golang/snappy" "github.com/google/gops/agent" "github.com/influxdata/line-protocol/v2/lineprotocol" ) @@ -69,9 +74,11 @@ type Config struct { JwtPublicKey string `json:"jwt-public-key"` HttpConfig *HttpConfig `json:"http-api"` Checkpoints struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - Restore string `json:"restore"` + Interval string `json:"interval"` + RootDir string `json:"directory"` + Restore string `json:"restore"` + Binary bool `json:"binary-enabled"` + BinaryCompressed bool `json:"binary-compressed"` } `json:"checkpoints"` Archive struct { Interval string `json:"interval"` @@ -104,18 +111,6 @@ func loadConfiguration(file string) Config { func intervals(wg *sync.WaitGroup, ctx context.Context) { wg.Add(3) - // go func() { - // defer wg.Done() - // ticks := time.Tick(30 * time.Minute) - // for { - // select { - // case <-ctx.Done(): - // return - // case <-ticks: - // runtime.GC() - // } - // } - // }() go func() { defer wg.Done() @@ -160,13 +155,21 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) now := time.Now() - n, err := memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, - lastCheckpoint.Unix(), now.Unix()) - if err != nil { - log.Printf("checkpointing failed: %s\n", err.Error()) + if conf.Checkpoints.Binary { + if err := makeBinaryCheckpoint(lastCheckpoint.Unix(), now.Unix()); err != nil { + log.Printf("checkpointing failed: %s\n", err.Error()) + } else { + log.Printf("done!") + } } else { - log.Printf("done: %d checkpoint files created\n", n) - lastCheckpoint = now + n, err := memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, + lastCheckpoint.Unix(), now.Unix()) + if err != nil { + log.Printf("checkpointing failed: %s\n", err.Error()) + } else { + log.Printf("done: %d checkpoint files created\n", n) + lastCheckpoint = now + } } } } @@ -225,12 +228,21 @@ func main() { restoreFrom := startupTime.Add(-d) log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) - files, err := memoryStore.FromJSONCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) - loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB - if err != nil { - log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) + if conf.Checkpoints.Binary { + if err := loadBinaryCheckpoints(restoreFrom.Unix()); err != nil { + log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) + } else { + loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB + log.Printf("Checkpoints loaded (%d MB, that took %fs)\n", loadedData, time.Since(startupTime).Seconds()) + } } else { - log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) + files, err := memoryStore.FromJSONCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) + if err != nil { + log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) + } else { + loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB + log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds()) + } } // Try to use less memory by forcing a GC run here and then @@ -240,7 +252,7 @@ func main() { // Forcing a GC here will set the "previously active heap" // to a minumum. runtime.GC() - if loadedData > 1000 && os.Getenv("GOGC") == "" { + if os.Getenv("GOGC") == "" { debug.SetGCPercent(10) } @@ -303,10 +315,85 @@ func main() { wg.Wait() - log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir) - files, err = memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) - if err != nil { - log.Printf("Writing checkpoint failed: %s\n", err.Error()) + log.Printf("Writing checkpoint...\n") + if conf.Checkpoints.Binary { + if err := makeBinaryCheckpoint(lastCheckpoint.Unix(), time.Now().Unix()+60); err != nil { + log.Printf("Writing checkpoint failed: %s\n", err.Error()) + } + log.Printf("Done!") + } else { + files, err := memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + if err != nil { + log.Printf("Writing checkpoint failed: %s\n", err.Error()) + } + log.Printf("Done! (%d files written)\n", files) } - log.Printf("Done! (%d files written)\n", files) +} + +func loadBinaryCheckpoints(from int64) error { + dir, err := os.ReadDir(conf.Checkpoints.RootDir) + if err != nil { + return err + } + + for _, entry := range dir { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".data") { + continue + } + + t, err := strconv.ParseInt(strings.TrimSuffix(entry.Name(), ".data"), 10, 64) + if err != nil { + return err + } + + if t < from { + continue + } + + log.Printf("loading checkpoint file %s/%s...", conf.Checkpoints.RootDir, entry.Name()) + f, err := os.Open(filepath.Join(conf.Checkpoints.RootDir, entry.Name())) + if err != nil { + return err + } + + var reader io.Reader = bufio.NewReader(f) + if conf.Checkpoints.BinaryCompressed { + reader = snappy.NewReader(reader) + } + + if err := memoryStore.LoadCheckpoint(reader); err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + } + + return nil +} + +func makeBinaryCheckpoint(from, to int64) error { + filename := filepath.Join(conf.Checkpoints.RootDir, fmt.Sprintf("%d.data", from)) + f, err := os.Create(filename) + if err != nil { + return err + } + defer f.Close() + + if conf.Checkpoints.BinaryCompressed { + compressor := snappy.NewBufferedWriter(f) + if err := memoryStore.SaveCheckpoint(from, to, compressor); err != nil { + return err + } + if err := compressor.Flush(); err != nil { + return err + } + } else { + if err := memoryStore.SaveCheckpoint(from, to, f); err != nil { + return err + } + } + + return nil } diff --git a/go.mod b/go.mod index d9ac179..f7ff511 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,11 @@ go 1.18 require ( github.com/golang-jwt/jwt/v4 v4.0.0 + github.com/golang/snappy v0.0.4 github.com/google/gops v0.3.22 github.com/gorilla/mux v1.8.0 github.com/influxdata/line-protocol/v2 v2.2.0 + github.com/klauspost/compress v1.11.12 github.com/nats-io/nats.go v1.11.0 golang.org/x/sys v0.0.0-20210902050250-f475640dd07b ) diff --git a/go.sum b/go.sum index 17b80a9..bc05071 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= diff --git a/internal/memstore/json-checkpoints.go b/internal/memstore/json-checkpoints.go index 3a836f7..e68135e 100644 --- a/internal/memstore/json-checkpoints.go +++ b/internal/memstore/json-checkpoints.go @@ -417,7 +417,8 @@ func (l *Level) fromJSONCheckpoint(dir string, from int64, m *MemoryStore) (int, func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { - ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) + ts, err := strconv.ParseInt( + strings.TrimSuffix(strings.TrimSuffix(e.Name(), ".json"), ".data"), 10, 64) if err != nil { return nil, err } @@ -469,7 +470,7 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteIns cluster, host string } - numWorkers := 2 + numWorkers := 3 var wg sync.WaitGroup n, errs := int32(0), int32(0) work := make(chan workItem, numWorkers)