cc-metric-store/cc-metric-store.go

350 lines
8.1 KiB
Go
Raw Permalink Normal View History

2021-06-09 06:03:31 +02:00
package main
import (
"bufio"
2021-09-08 09:08:51 +02:00
"context"
2021-06-09 06:03:31 +02:00
"encoding/json"
2021-12-15 10:58:03 +01:00
"flag"
2022-03-08 09:27:44 +01:00
"fmt"
2022-04-01 14:01:43 +02:00
"io"
2021-06-09 06:03:31 +02:00
"log"
"os"
2021-08-20 11:45:34 +02:00
"os/signal"
2022-03-09 14:21:03 +01:00
"runtime"
"runtime/debug"
2021-08-20 12:54:11 +02:00
"sync"
2021-08-20 11:45:34 +02:00
"syscall"
"time"
2022-02-21 10:00:29 +01:00
"github.com/google/gops/agent"
2021-06-09 06:03:31 +02:00
)
2022-03-08 09:27:44 +01:00
// For aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int
const (
NoAggregation AggregationStrategy = iota
SumAggregation
AvgAggregation
)
func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
switch str {
case "":
*as = NoAggregation
case "sum":
*as = SumAggregation
case "avg":
*as = AvgAggregation
default:
return fmt.Errorf("invalid aggregation strategy: %#v", str)
}
return nil
}
type MetricConfig struct {
// Interval in seconds at which measurements will arive.
Frequency int64 `json:"frequency"`
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
2022-03-08 09:27:44 +01:00
Aggregation AggregationStrategy `json:"aggregation"`
// Private, used internally...
offset int
}
type HttpConfig struct {
// Address to bind to, for example "0.0.0.0:8081"
Address string `json:"address"`
// If not the empty string, use https with this as the certificate file
CertFile string `json:"https-cert-file"`
// If not the empty string, use https with this as the key file
KeyFile string `json:"https-key-file"`
2021-06-09 06:03:31 +02:00
}
type NatsConfig struct {
// Address of the nats server
Address string `json:"address"`
2022-02-04 08:52:53 +01:00
// Username/Password, optional
Username string `json:"username"`
Password string `json:"password"`
2022-02-22 14:03:45 +01:00
Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
} `json:"subscriptions"`
2022-02-04 08:30:50 +01:00
}
2021-08-20 11:45:34 +02:00
type Config struct {
Metrics map[string]MetricConfig `json:"metrics"`
RetentionInMemory string `json:"retention-in-memory"`
2022-02-22 14:03:45 +01:00
Nats []*NatsConfig `json:"nats"`
JwtPublicKey string `json:"jwt-public-key"`
HttpConfig *HttpConfig `json:"http-api"`
Checkpoints struct {
Interval string `json:"interval"`
RootDir string `json:"directory"`
Restore string `json:"restore"`
} `json:"checkpoints"`
Archive struct {
Interval string `json:"interval"`
RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"`
} `json:"archive"`
2022-04-01 14:01:43 +02:00
Debug struct {
EnableGops bool `json:"gops"`
DumpToFile string `json:"dump-to-file"`
} `json:"debug"`
2021-06-09 06:03:31 +02:00
}
var conf Config
var memoryStore *MemoryStore = nil
var lastCheckpoint time.Time
2021-06-09 06:03:31 +02:00
2022-04-01 14:01:43 +02:00
var debugDumpLock sync.Mutex
var debugDump io.Writer = io.Discard
2021-06-09 06:03:31 +02:00
func loadConfiguration(file string) Config {
var config Config
configFile, err := os.Open(file)
if err != nil {
2022-02-21 10:00:29 +01:00
log.Fatal(err)
2021-06-09 06:03:31 +02:00
}
defer configFile.Close()
2022-02-21 10:00:29 +01:00
dec := json.NewDecoder(configFile)
dec.DisallowUnknownFields()
if err := dec.Decode(&config); err != nil {
log.Fatal(err)
}
2021-06-09 06:03:31 +02:00
return config
}
func intervals(wg *sync.WaitGroup, ctx context.Context) {
2021-09-13 13:40:39 +02:00
wg.Add(3)
2022-03-24 10:31:11 +01:00
// go func() {
// defer wg.Done()
// ticks := time.Tick(30 * time.Minute)
// for {
// select {
// case <-ctx.Done():
// return
// case <-ticks:
// runtime.GC()
// }
// }
// }()
go func() {
defer wg.Done()
d, err := time.ParseDuration(conf.RetentionInMemory)
if err != nil {
log.Fatal(err)
}
2021-09-13 13:40:39 +02:00
if d <= 0 {
return
}
ticks := time.Tick(d / 2)
for {
select {
case <-ctx.Done():
return
case <-ticks:
t := time.Now().Add(-d)
2021-12-15 10:23:21 +01:00
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
2022-03-08 09:27:44 +01:00
freed, err := memoryStore.Free(nil, t.Unix())
if err != nil {
2021-12-15 10:23:21 +01:00
log.Printf("freeing up buffers failed: %s\n", err.Error())
} else {
2021-12-15 10:23:21 +01:00
log.Printf("done: %d buffers freed\n", freed)
}
}
}
}()
lastCheckpoint = time.Now()
go func() {
defer wg.Done()
d, err := time.ParseDuration(conf.Checkpoints.Interval)
if err != nil {
log.Fatal(err)
}
2021-09-13 13:40:39 +02:00
if d <= 0 {
return
}
ticks := time.Tick(d)
for {
select {
case <-ctx.Done():
return
case <-ticks:
2021-12-15 10:23:21 +01:00
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir,
lastCheckpoint.Unix(), now.Unix())
if err != nil {
2021-12-15 10:23:21 +01:00
log.Printf("checkpointing failed: %s\n", err.Error())
} else {
2021-12-15 10:23:21 +01:00
log.Printf("done: %d checkpoint files created\n", n)
lastCheckpoint = now
}
}
}
}()
2021-09-13 13:40:39 +02:00
go func() {
defer wg.Done()
d, err := time.ParseDuration(conf.Archive.Interval)
if err != nil {
log.Fatal(err)
}
2021-09-13 13:40:39 +02:00
if d <= 0 {
return
}
2021-09-13 13:40:39 +02:00
ticks := time.Tick(d)
for {
select {
case <-ctx.Done():
return
case <-ticks:
t := time.Now().Add(-d)
2021-12-15 10:23:21 +01:00
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead)
2021-09-13 13:40:39 +02:00
if err != nil {
2021-12-15 10:23:21 +01:00
log.Printf("archiving failed: %s\n", err.Error())
2021-09-13 13:40:39 +02:00
} else {
2021-12-15 10:23:21 +01:00
log.Printf("done: %d files zipped and moved to archive\n", n)
2021-09-13 13:40:39 +02:00
}
}
}
}()
}
2021-08-20 11:45:34 +02:00
func main() {
2021-12-15 10:58:03 +01:00
var configFile string
2022-02-21 10:00:29 +01:00
var enableGopsAgent bool
2021-12-15 10:58:03 +01:00
flag.StringVar(&configFile, "config", "./config.json", "configuration file")
2022-02-21 10:00:29 +01:00
flag.BoolVar(&enableGopsAgent, "gops", false, "Listen via github.com/google/gops/agent")
2021-12-15 10:58:03 +01:00
flag.Parse()
2022-04-01 14:01:43 +02:00
startupTime := time.Now()
conf = loadConfiguration(configFile)
memoryStore = NewMemoryStore(conf.Metrics)
if enableGopsAgent || conf.Debug.EnableGops {
2022-02-21 10:00:29 +01:00
if err := agent.Listen(agent.Options{}); err != nil {
log.Fatal(err)
}
}
2022-04-01 14:01:43 +02:00
if conf.Debug.DumpToFile != "" {
f, err := os.Create(conf.Debug.DumpToFile)
if err != nil {
log.Fatal(err)
}
debugDump = f
}
d, err := time.ParseDuration(conf.Checkpoints.Restore)
if err != nil {
log.Fatal(err)
}
restoreFrom := startupTime.Add(-d)
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB
if err != nil {
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
} else {
2022-03-24 10:31:11 +01:00
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
2021-06-09 06:03:31 +02:00
}
2022-03-24 10:31:11 +01:00
// Try to use less memory by forcing a GC run here and then
// lowering the target percentage. The default of 100 means
// that only once the ratio of new allocations execeds the
// previously active heap, a GC is triggered.
// Forcing a GC here will set the "previously active heap"
// to a minumum.
2022-03-09 14:21:03 +01:00
runtime.GC()
2022-03-24 10:31:11 +01:00
if loadedData > 1000 && os.Getenv("GOGC") == "" {
debug.SetGCPercent(10)
}
2021-09-08 09:08:51 +02:00
ctx, shutdown := context.WithCancel(context.Background())
var wg sync.WaitGroup
2021-08-20 11:45:34 +02:00
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
2021-08-20 11:45:34 +02:00
go func() {
for {
sig := <-sigs
if sig == syscall.SIGUSR1 {
2022-03-31 14:17:27 +02:00
memoryStore.DebugDump(bufio.NewWriter(os.Stdout), nil)
continue
}
log.Println("Shuting down...")
shutdown()
}
2021-08-20 12:54:11 +02:00
}()
intervals(&wg, ctx)
2021-10-12 13:26:54 +02:00
wg.Add(1)
2021-08-20 12:54:11 +02:00
go func() {
err := StartApiServer(ctx, conf.HttpConfig)
if err != nil {
log.Fatal(err)
}
2021-08-20 12:54:11 +02:00
wg.Done()
2021-08-20 11:45:34 +02:00
}()
if conf.Nats != nil {
2022-02-22 14:03:45 +01:00
for _, natsConf := range conf.Nats {
// TODO: When multiple nats configs share a URL, do a single connect.
wg.Add(1)
nc := natsConf
go func() {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(nc, decodeLine, 1, ctx)
2021-10-12 13:26:54 +02:00
2022-02-22 14:03:45 +01:00
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
}
2021-10-12 13:26:54 +02:00
}
2021-08-20 12:54:11 +02:00
wg.Wait()
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
if err != nil {
log.Printf("Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
2022-04-01 14:01:43 +02:00
if closer, ok := debugDump.(io.Closer); ok {
if err := closer.Close(); err != nil {
log.Printf("error: %s", err.Error())
}
}
2021-06-09 06:03:31 +02:00
}