2021-06-09 06:03:31 +02:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
2021-11-22 17:04:09 +01:00
|
|
|
"bufio"
|
2021-09-08 09:08:51 +02:00
|
|
|
"context"
|
2021-06-09 06:03:31 +02:00
|
|
|
"encoding/json"
|
2021-12-15 10:58:03 +01:00
|
|
|
"flag"
|
2022-03-08 09:27:44 +01:00
|
|
|
"fmt"
|
2021-06-09 06:03:31 +02:00
|
|
|
"log"
|
|
|
|
"os"
|
2021-08-20 11:45:34 +02:00
|
|
|
"os/signal"
|
2021-08-20 12:54:11 +02:00
|
|
|
"sync"
|
2021-08-20 11:45:34 +02:00
|
|
|
"syscall"
|
2021-08-31 15:17:36 +02:00
|
|
|
"time"
|
2022-02-21 10:00:29 +01:00
|
|
|
|
|
|
|
"github.com/google/gops/agent"
|
2021-06-09 06:03:31 +02:00
|
|
|
)
|
|
|
|
|
2022-03-08 09:27:44 +01:00
|
|
|
// For aggregation over multiple values at different cpus/sockets/..., not time!
|
|
|
|
type AggregationStrategy int
|
|
|
|
|
|
|
|
const (
|
|
|
|
NoAggregation AggregationStrategy = iota
|
|
|
|
SumAggregation
|
|
|
|
AvgAggregation
|
|
|
|
)
|
|
|
|
|
|
|
|
func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
|
|
|
|
var str string
|
|
|
|
if err := json.Unmarshal(data, &str); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch str {
|
|
|
|
case "":
|
|
|
|
*as = NoAggregation
|
|
|
|
case "sum":
|
|
|
|
*as = SumAggregation
|
|
|
|
case "avg":
|
|
|
|
*as = AvgAggregation
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("invalid aggregation strategy: %#v", str)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-08-31 15:17:36 +02:00
|
|
|
type MetricConfig struct {
|
2022-02-04 08:46:14 +01:00
|
|
|
// Interval in seconds at which measurements will arive.
|
|
|
|
Frequency int64 `json:"frequency"`
|
|
|
|
|
|
|
|
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
|
2022-03-08 09:27:44 +01:00
|
|
|
Aggregation AggregationStrategy `json:"aggregation"`
|
|
|
|
|
|
|
|
// Private, used internally...
|
|
|
|
offset int
|
2022-02-04 08:46:14 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
type HttpConfig struct {
|
|
|
|
// Address to bind to, for example "0.0.0.0:8081"
|
|
|
|
Address string `json:"address"`
|
|
|
|
|
|
|
|
// If not the empty string, use https with this as the certificate file
|
|
|
|
CertFile string `json:"https-cert-file"`
|
|
|
|
|
|
|
|
// If not the empty string, use https with this as the key file
|
|
|
|
KeyFile string `json:"https-key-file"`
|
2021-06-09 06:03:31 +02:00
|
|
|
}
|
|
|
|
|
2022-02-04 08:46:14 +01:00
|
|
|
type NatsConfig struct {
|
|
|
|
// Address of the nats server
|
|
|
|
Address string `json:"address"`
|
2022-02-04 08:52:53 +01:00
|
|
|
|
|
|
|
// Username/Password, optional
|
|
|
|
Username string `json:"username"`
|
|
|
|
Password string `json:"password"`
|
2022-02-22 14:03:45 +01:00
|
|
|
|
|
|
|
Subscriptions []struct {
|
|
|
|
// Channel name
|
|
|
|
SubscribeTo string `json:"subscribe-to"`
|
|
|
|
|
|
|
|
// Allow lines without a cluster tag, use this as default, optional
|
|
|
|
ClusterTag string `json:"cluster-tag"`
|
|
|
|
} `json:"subscriptions"`
|
2022-02-04 08:30:50 +01:00
|
|
|
}
|
|
|
|
|
2021-08-20 11:45:34 +02:00
|
|
|
type Config struct {
|
2021-09-13 12:28:33 +02:00
|
|
|
Metrics map[string]MetricConfig `json:"metrics"`
|
2022-01-31 10:52:30 +01:00
|
|
|
RetentionInMemory string `json:"retention-in-memory"`
|
2022-02-22 14:03:45 +01:00
|
|
|
Nats []*NatsConfig `json:"nats"`
|
2021-09-20 09:27:31 +02:00
|
|
|
JwtPublicKey string `json:"jwt-public-key"`
|
2022-02-04 08:46:14 +01:00
|
|
|
HttpConfig *HttpConfig `json:"http-api"`
|
2021-09-13 12:28:33 +02:00
|
|
|
Checkpoints struct {
|
2022-01-31 10:52:30 +01:00
|
|
|
Interval string `json:"interval"`
|
2021-09-13 12:28:33 +02:00
|
|
|
RootDir string `json:"directory"`
|
2022-01-31 10:52:30 +01:00
|
|
|
Restore string `json:"restore"`
|
2021-09-13 12:28:33 +02:00
|
|
|
} `json:"checkpoints"`
|
|
|
|
Archive struct {
|
2022-01-31 10:52:30 +01:00
|
|
|
Interval string `json:"interval"`
|
2021-09-13 12:28:33 +02:00
|
|
|
RootDir string `json:"directory"`
|
|
|
|
} `json:"archive"`
|
2021-06-09 06:03:31 +02:00
|
|
|
}
|
|
|
|
|
2021-08-31 15:17:36 +02:00
|
|
|
var conf Config
|
|
|
|
var memoryStore *MemoryStore = nil
|
2021-09-07 09:28:41 +02:00
|
|
|
var lastCheckpoint time.Time
|
2021-06-09 06:03:31 +02:00
|
|
|
|
|
|
|
func loadConfiguration(file string) Config {
|
|
|
|
var config Config
|
|
|
|
configFile, err := os.Open(file)
|
|
|
|
if err != nil {
|
2022-02-21 10:00:29 +01:00
|
|
|
log.Fatal(err)
|
2021-06-09 06:03:31 +02:00
|
|
|
}
|
2021-09-20 09:27:31 +02:00
|
|
|
defer configFile.Close()
|
2022-02-21 10:00:29 +01:00
|
|
|
dec := json.NewDecoder(configFile)
|
|
|
|
dec.DisallowUnknownFields()
|
|
|
|
if err := dec.Decode(&config); err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
2021-06-09 06:03:31 +02:00
|
|
|
return config
|
|
|
|
}
|
|
|
|
|
2021-09-13 12:28:33 +02:00
|
|
|
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
2021-09-13 13:40:39 +02:00
|
|
|
wg.Add(3)
|
2021-09-13 12:28:33 +02:00
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2022-01-31 10:52:30 +01:00
|
|
|
d, err := time.ParseDuration(conf.RetentionInMemory)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
2021-09-13 13:40:39 +02:00
|
|
|
if d <= 0 {
|
|
|
|
return
|
|
|
|
}
|
2022-01-31 10:52:30 +01:00
|
|
|
|
2021-09-13 12:28:33 +02:00
|
|
|
ticks := time.Tick(d / 2)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-ticks:
|
|
|
|
t := time.Now().Add(-d)
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
2022-03-08 09:27:44 +01:00
|
|
|
freed, err := memoryStore.Free(nil, t.Unix())
|
2021-09-13 12:28:33 +02:00
|
|
|
if err != nil {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("freeing up buffers failed: %s\n", err.Error())
|
2021-09-13 12:28:33 +02:00
|
|
|
} else {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("done: %d buffers freed\n", freed)
|
2021-09-13 12:28:33 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
lastCheckpoint = time.Now()
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2022-01-31 10:52:30 +01:00
|
|
|
d, err := time.ParseDuration(conf.Checkpoints.Interval)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
2021-09-13 13:40:39 +02:00
|
|
|
if d <= 0 {
|
|
|
|
return
|
|
|
|
}
|
2022-01-31 10:52:30 +01:00
|
|
|
|
2021-09-13 12:28:33 +02:00
|
|
|
ticks := time.Tick(d)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-ticks:
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
2021-09-13 12:28:33 +02:00
|
|
|
now := time.Now()
|
|
|
|
n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir,
|
|
|
|
lastCheckpoint.Unix(), now.Unix())
|
|
|
|
if err != nil {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("checkpointing failed: %s\n", err.Error())
|
2021-09-13 12:28:33 +02:00
|
|
|
} else {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("done: %d checkpoint files created\n", n)
|
2021-09-13 12:28:33 +02:00
|
|
|
lastCheckpoint = now
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2021-09-13 13:40:39 +02:00
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2022-01-31 10:52:30 +01:00
|
|
|
d, err := time.ParseDuration(conf.Archive.Interval)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
2021-09-13 13:40:39 +02:00
|
|
|
if d <= 0 {
|
|
|
|
return
|
|
|
|
}
|
2022-01-31 10:52:30 +01:00
|
|
|
|
2021-09-13 13:40:39 +02:00
|
|
|
ticks := time.Tick(d)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-ticks:
|
|
|
|
t := time.Now().Add(-d)
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
|
|
|
n, err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix())
|
2021-09-13 13:40:39 +02:00
|
|
|
if err != nil {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("archiving failed: %s\n", err.Error())
|
2021-09-13 13:40:39 +02:00
|
|
|
} else {
|
2021-12-15 10:23:21 +01:00
|
|
|
log.Printf("done: %d files zipped and moved to archive\n", n)
|
2021-09-13 13:40:39 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
2021-09-13 12:28:33 +02:00
|
|
|
}
|
|
|
|
|
2021-08-20 11:45:34 +02:00
|
|
|
func main() {
|
2021-12-15 10:58:03 +01:00
|
|
|
var configFile string
|
2022-02-21 10:00:29 +01:00
|
|
|
var enableGopsAgent bool
|
2021-12-15 10:58:03 +01:00
|
|
|
flag.StringVar(&configFile, "config", "./config.json", "configuration file")
|
2022-02-21 10:00:29 +01:00
|
|
|
flag.BoolVar(&enableGopsAgent, "gops", false, "Listen via github.com/google/gops/agent")
|
2021-12-15 10:58:03 +01:00
|
|
|
flag.Parse()
|
|
|
|
|
2022-02-21 10:00:29 +01:00
|
|
|
if enableGopsAgent {
|
|
|
|
if err := agent.Listen(agent.Options{}); err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-31 15:17:36 +02:00
|
|
|
startupTime := time.Now()
|
2021-12-15 10:58:03 +01:00
|
|
|
conf = loadConfiguration(configFile)
|
2021-08-31 15:17:36 +02:00
|
|
|
memoryStore = NewMemoryStore(conf.Metrics)
|
|
|
|
|
2022-01-31 10:52:30 +01:00
|
|
|
d, err := time.ParseDuration(conf.Checkpoints.Restore)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
|
2022-01-31 16:34:42 +01:00
|
|
|
restoreFrom := startupTime.Add(-d)
|
2021-11-26 09:51:18 +01:00
|
|
|
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
2021-09-13 12:28:33 +02:00
|
|
|
files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
|
|
|
} else {
|
2021-12-01 12:30:01 +01:00
|
|
|
log.Printf("Checkpoints loaded (%d files, that took %dms)\n", files, time.Since(startupTime).Milliseconds())
|
2021-06-09 06:03:31 +02:00
|
|
|
}
|
|
|
|
|
2021-09-08 09:08:51 +02:00
|
|
|
ctx, shutdown := context.WithCancel(context.Background())
|
|
|
|
|
2021-08-31 15:17:36 +02:00
|
|
|
var wg sync.WaitGroup
|
2021-08-20 11:45:34 +02:00
|
|
|
sigs := make(chan os.Signal, 1)
|
2021-11-22 17:04:09 +01:00
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
|
2021-08-20 11:45:34 +02:00
|
|
|
go func() {
|
2021-11-22 17:04:09 +01:00
|
|
|
for {
|
|
|
|
sig := <-sigs
|
|
|
|
if sig == syscall.SIGUSR1 {
|
|
|
|
memoryStore.DebugDump(bufio.NewWriter(os.Stdout))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Println("Shuting down...")
|
|
|
|
shutdown()
|
|
|
|
}
|
2021-08-20 12:54:11 +02:00
|
|
|
}()
|
|
|
|
|
2021-09-13 12:28:33 +02:00
|
|
|
intervals(&wg, ctx)
|
2021-09-07 09:28:41 +02:00
|
|
|
|
2021-10-12 13:26:54 +02:00
|
|
|
wg.Add(1)
|
2021-08-20 12:54:11 +02:00
|
|
|
|
|
|
|
go func() {
|
2022-02-04 08:46:14 +01:00
|
|
|
err := StartApiServer(ctx, conf.HttpConfig)
|
2021-08-31 15:17:36 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
2021-08-20 12:54:11 +02:00
|
|
|
wg.Done()
|
2021-08-20 11:45:34 +02:00
|
|
|
}()
|
|
|
|
|
2022-02-04 08:46:14 +01:00
|
|
|
if conf.Nats != nil {
|
2022-02-22 14:03:45 +01:00
|
|
|
for _, natsConf := range conf.Nats {
|
|
|
|
// TODO: When multiple nats configs share a URL, do a single connect.
|
|
|
|
wg.Add(1)
|
|
|
|
nc := natsConf
|
|
|
|
go func() {
|
|
|
|
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
|
|
|
|
err := ReceiveNats(nc, decodeLine, 1, ctx)
|
2021-10-12 13:26:54 +02:00
|
|
|
|
2022-02-22 14:03:45 +01:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
wg.Done()
|
|
|
|
}()
|
|
|
|
}
|
2021-10-12 13:26:54 +02:00
|
|
|
}
|
2021-08-20 12:54:11 +02:00
|
|
|
|
|
|
|
wg.Wait()
|
2021-08-31 15:17:36 +02:00
|
|
|
|
2021-09-13 12:28:33 +02:00
|
|
|
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
|
|
|
|
files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
2021-08-31 15:17:36 +02:00
|
|
|
}
|
2021-09-13 12:28:33 +02:00
|
|
|
log.Printf("Done! (%d files written)\n", files)
|
2021-06-09 06:03:31 +02:00
|
|
|
}
|