mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-11-13 22:47:25 +01:00
Restructure and Cleanup
Compiles
This commit is contained in:
parent
b2528f958c
commit
fcc8eac2d5
@ -4,7 +4,6 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -20,120 +19,6 @@ import (
|
|||||||
"github.com/google/gops/agent"
|
"github.com/google/gops/agent"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
conf config.Config
|
|
||||||
ms *memorystore.MemoryStore = nil
|
|
||||||
lastCheckpoint time.Time
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
debugDumpLock sync.Mutex
|
|
||||||
debugDump io.Writer = io.Discard
|
|
||||||
)
|
|
||||||
|
|
||||||
func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
|
||||||
wg.Add(3)
|
|
||||||
// go func() {
|
|
||||||
// defer wg.Done()
|
|
||||||
// ticks := time.Tick(30 * time.Minute)
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// return
|
|
||||||
// case <-ticks:
|
|
||||||
// runtime.GC()
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
d, err := time.ParseDuration(conf.RetentionInMemory)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
if d <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ticks := time.Tick(d / 2)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticks:
|
|
||||||
t := time.Now().Add(-d)
|
|
||||||
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
|
||||||
freed, err := ms.Free(nil, t.Unix())
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("freeing up buffers failed: %s\n", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Printf("done: %d buffers freed\n", freed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
lastCheckpoint = time.Now()
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
d, err := time.ParseDuration(conf.Checkpoints.Interval)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
if d <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ticks := time.Tick(d)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticks:
|
|
||||||
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
|
||||||
now := time.Now()
|
|
||||||
n, err := ms.ToCheckpoint(conf.Checkpoints.RootDir,
|
|
||||||
lastCheckpoint.Unix(), now.Unix())
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("checkpointing failed: %s\n", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Printf("done: %d checkpoint files created\n", n)
|
|
||||||
lastCheckpoint = now
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
d, err := time.ParseDuration(conf.Archive.Interval)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
if d <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ticks := time.Tick(d)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticks:
|
|
||||||
t := time.Now().Add(-d)
|
|
||||||
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
|
||||||
n, err := memorystore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("archiving failed: %s\n", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Printf("done: %d files zipped and moved to archive\n", n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
var configFile string
|
var configFile string
|
||||||
var enableGopsAgent bool
|
var enableGopsAgent bool
|
||||||
@ -142,33 +27,24 @@ func main() {
|
|||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
startupTime := time.Now()
|
startupTime := time.Now()
|
||||||
conf = config.LoadConfiguration(configFile)
|
config.Init(configFile)
|
||||||
memorystore.Init(conf.Metrics)
|
memorystore.Init(config.Keys.Metrics)
|
||||||
ms = memorystore.GetMemoryStore()
|
ms := memorystore.GetMemoryStore()
|
||||||
|
|
||||||
if enableGopsAgent || conf.Debug.EnableGops {
|
if enableGopsAgent || config.Keys.Debug.EnableGops {
|
||||||
if err := agent.Listen(agent.Options{}); err != nil {
|
if err := agent.Listen(agent.Options{}); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.Debug.DumpToFile != "" {
|
d, err := time.ParseDuration(config.Keys.Checkpoints.Restore)
|
||||||
f, err := os.Create(conf.Debug.DumpToFile)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
debugDump = f
|
|
||||||
}
|
|
||||||
|
|
||||||
d, err := time.ParseDuration(conf.Checkpoints.Restore)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
restoreFrom := startupTime.Add(-d)
|
restoreFrom := startupTime.Add(-d)
|
||||||
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
||||||
files, err := ms.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
|
files, err := ms.FromCheckpoint(config.Keys.Checkpoints.RootDir, restoreFrom.Unix())
|
||||||
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
|
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||||
@ -205,20 +81,24 @@ func main() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
intervals(&wg, ctx)
|
wg.Add(3)
|
||||||
|
|
||||||
|
memorystore.Retention(&wg, ctx)
|
||||||
|
memorystore.Checkpointing(&wg, ctx)
|
||||||
|
memorystore.Archiving(&wg, ctx)
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := api.StartApiServer(ctx, conf.HttpConfig)
|
err := api.StartApiServer(ctx, config.Keys.HttpConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if conf.Nats != nil {
|
if config.Keys.Nats != nil {
|
||||||
for _, natsConf := range conf.Nats {
|
for _, natsConf := range config.Keys.Nats {
|
||||||
// TODO: When multiple nats configs share a URL, do a single connect.
|
// TODO: When multiple nats configs share a URL, do a single connect.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
nc := natsConf
|
nc := natsConf
|
||||||
@ -234,17 +114,5 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
memorystore.Shutdown()
|
||||||
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
|
|
||||||
files, err = ms.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
|
||||||
}
|
|
||||||
log.Printf("Done! (%d files written)\n", files)
|
|
||||||
|
|
||||||
if closer, ok := debugDump.(io.Closer); ok {
|
|
||||||
if err := closer.Close(); err != nil {
|
|
||||||
log.Printf("error: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -27,9 +27,9 @@ import (
|
|||||||
|
|
||||||
type ApiMetricData struct {
|
type ApiMetricData struct {
|
||||||
Error *string `json:"error,omitempty"`
|
Error *string `json:"error,omitempty"`
|
||||||
|
Data util.FloatArray `json:"data,omitempty"`
|
||||||
From int64 `json:"from"`
|
From int64 `json:"from"`
|
||||||
To int64 `json:"to"`
|
To int64 `json:"to"`
|
||||||
Data util.FloatArray `json:"data,omitempty"`
|
|
||||||
Avg util.Float `json:"avg"`
|
Avg util.Float `json:"avg"`
|
||||||
Min util.Float `json:"min"`
|
Min util.Float `json:"min"`
|
||||||
Max util.Float `json:"max"`
|
Max util.Float `json:"max"`
|
||||||
@ -73,8 +73,8 @@ func (data *ApiMetricData) ScaleBy(f util.Float) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) {
|
func (data *ApiMetricData) PadDataWithNull(ms *memorystore.MemoryStore, from, to int64, metric string) {
|
||||||
minfo, ok := memoryStore.metrics[metric]
|
minfo, ok := ms.Metrics[metric]
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -105,12 +105,12 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: lastCheckpoint might be modified by different go-routines.
|
// // TODO: lastCheckpoint might be modified by different go-routines.
|
||||||
// Load it using the sync/atomic package?
|
// // Load it using the sync/atomic package?
|
||||||
freeUpTo := lastCheckpoint.Unix()
|
// freeUpTo := lastCheckpoint.Unix()
|
||||||
if to < freeUpTo {
|
// if to < freeUpTo {
|
||||||
freeUpTo = to
|
// freeUpTo = to
|
||||||
}
|
// }
|
||||||
|
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||||
@ -125,9 +125,10 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ms := memorystore.GetMemoryStore()
|
||||||
n := 0
|
n := 0
|
||||||
for _, sel := range selectors {
|
for _, sel := range selectors {
|
||||||
bn, err := memoryStore.Free(sel, freeUpTo)
|
bn, err := ms.Free(sel, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -137,7 +138,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
|
fmt.Fprintf(rw, "buffers freed: %d\n", n)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
||||||
@ -153,26 +154,9 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if debugDump != io.Discard {
|
ms := memorystore.GetMemoryStore()
|
||||||
now := time.Now()
|
|
||||||
msg := make([]byte, 0, 512)
|
|
||||||
msg = append(msg, "\n--- local unix time: "...)
|
|
||||||
msg = strconv.AppendInt(msg, now.Unix(), 10)
|
|
||||||
msg = append(msg, " ---\n"...)
|
|
||||||
|
|
||||||
debugDumpLock.Lock()
|
|
||||||
defer debugDumpLock.Unlock()
|
|
||||||
if _, err := debugDump.Write(msg); err != nil {
|
|
||||||
log.Printf("error while writing to debug dump: %s", err.Error())
|
|
||||||
}
|
|
||||||
if _, err := debugDump.Write(bytes); err != nil {
|
|
||||||
log.Printf("error while writing to debug dump: %s", err.Error())
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
dec := lineprotocol.NewDecoderWithBytes(bytes)
|
dec := lineprotocol.NewDecoderWithBytes(bytes)
|
||||||
if err := decodeLine(dec, r.URL.Query().Get("cluster")); err != nil {
|
if err := decodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
|
||||||
log.Printf("/api/write error: %s", err.Error())
|
log.Printf("/api/write error: %s", err.Error())
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
@ -182,13 +166,13 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
type ApiQueryRequest struct {
|
type ApiQueryRequest struct {
|
||||||
Cluster string `json:"cluster"`
|
Cluster string `json:"cluster"`
|
||||||
|
Queries []ApiQuery `json:"queries"`
|
||||||
|
ForAllNodes []string `json:"for-all-nodes"`
|
||||||
From int64 `json:"from"`
|
From int64 `json:"from"`
|
||||||
To int64 `json:"to"`
|
To int64 `json:"to"`
|
||||||
WithStats bool `json:"with-stats"`
|
WithStats bool `json:"with-stats"`
|
||||||
WithData bool `json:"with-data"`
|
WithData bool `json:"with-data"`
|
||||||
WithPadding bool `json:"with-padding"`
|
WithPadding bool `json:"with-padding"`
|
||||||
Queries []ApiQuery `json:"queries"`
|
|
||||||
ForAllNodes []string `json:"for-all-nodes"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApiQueryResponse struct {
|
type ApiQueryResponse struct {
|
||||||
@ -197,19 +181,19 @@ type ApiQueryResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ApiQuery struct {
|
type ApiQuery struct {
|
||||||
|
Type *string `json:"type,omitempty"`
|
||||||
|
SubType *string `json:"subtype,omitempty"`
|
||||||
Metric string `json:"metric"`
|
Metric string `json:"metric"`
|
||||||
Hostname string `json:"host"`
|
Hostname string `json:"host"`
|
||||||
Aggregate bool `json:"aggreg"`
|
|
||||||
ScaleFactor Float `json:"scale-by,omitempty"`
|
|
||||||
Type *string `json:"type,omitempty"`
|
|
||||||
TypeIds []string `json:"type-ids,omitempty"`
|
TypeIds []string `json:"type-ids,omitempty"`
|
||||||
SubType *string `json:"subtype,omitempty"`
|
|
||||||
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
SubTypeIds []string `json:"subtype-ids,omitempty"`
|
||||||
|
ScaleFactor util.Float `json:"scale-by,omitempty"`
|
||||||
|
Aggregate bool `json:"aggreg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
||||||
var err error
|
var err error
|
||||||
var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true}
|
req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true}
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
@ -235,29 +219,29 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, query := range req.Queries {
|
for _, query := range req.Queries {
|
||||||
sels := make([]Selector, 0, 1)
|
sels := make([]util.Selector, 0, 1)
|
||||||
if query.Aggregate || query.Type == nil {
|
if query.Aggregate || query.Type == nil {
|
||||||
sel := Selector{{String: req.Cluster}, {String: query.Hostname}}
|
sel := util.Selector{{String: req.Cluster}, {String: query.Hostname}}
|
||||||
if query.Type != nil {
|
if query.Type != nil {
|
||||||
if len(query.TypeIds) == 1 {
|
if len(query.TypeIds) == 1 {
|
||||||
sel = append(sel, SelectorElement{String: *query.Type + query.TypeIds[0]})
|
sel = append(sel, util.SelectorElement{String: *query.Type + query.TypeIds[0]})
|
||||||
} else {
|
} else {
|
||||||
ids := make([]string, len(query.TypeIds))
|
ids := make([]string, len(query.TypeIds))
|
||||||
for i, id := range query.TypeIds {
|
for i, id := range query.TypeIds {
|
||||||
ids[i] = *query.Type + id
|
ids[i] = *query.Type + id
|
||||||
}
|
}
|
||||||
sel = append(sel, SelectorElement{Group: ids})
|
sel = append(sel, util.SelectorElement{Group: ids})
|
||||||
}
|
}
|
||||||
|
|
||||||
if query.SubType != nil {
|
if query.SubType != nil {
|
||||||
if len(query.SubTypeIds) == 1 {
|
if len(query.SubTypeIds) == 1 {
|
||||||
sel = append(sel, SelectorElement{String: *query.SubType + query.SubTypeIds[0]})
|
sel = append(sel, util.SelectorElement{String: *query.SubType + query.SubTypeIds[0]})
|
||||||
} else {
|
} else {
|
||||||
ids := make([]string, len(query.SubTypeIds))
|
ids := make([]string, len(query.SubTypeIds))
|
||||||
for i, id := range query.SubTypeIds {
|
for i, id := range query.SubTypeIds {
|
||||||
ids[i] = *query.SubType + id
|
ids[i] = *query.SubType + id
|
||||||
}
|
}
|
||||||
sel = append(sel, SelectorElement{Group: ids})
|
sel = append(sel, util.SelectorElement{Group: ids})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -266,7 +250,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
for _, typeId := range query.TypeIds {
|
for _, typeId := range query.TypeIds {
|
||||||
if query.SubType != nil {
|
if query.SubType != nil {
|
||||||
for _, subTypeId := range query.SubTypeIds {
|
for _, subTypeId := range query.SubTypeIds {
|
||||||
sels = append(sels, Selector{
|
sels = append(sels, util.Selector{
|
||||||
{String: req.Cluster},
|
{String: req.Cluster},
|
||||||
{String: query.Hostname},
|
{String: query.Hostname},
|
||||||
{String: *query.Type + typeId},
|
{String: *query.Type + typeId},
|
||||||
@ -274,7 +258,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sels = append(sels, Selector{
|
sels = append(sels, util.Selector{
|
||||||
{String: req.Cluster},
|
{String: req.Cluster},
|
||||||
{String: query.Hostname},
|
{String: query.Hostname},
|
||||||
{String: *query.Type + typeId},
|
{String: *query.Type + typeId},
|
||||||
@ -289,7 +273,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
res := make([]ApiMetricData, 0, len(sels))
|
res := make([]ApiMetricData, 0, len(sels))
|
||||||
for _, sel := range sels {
|
for _, sel := range sels {
|
||||||
data := ApiMetricData{}
|
data := ApiMetricData{}
|
||||||
data.Data, data.From, data.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
|
data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To)
|
||||||
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err)
|
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := err.Error()
|
msg := err.Error()
|
||||||
@ -305,7 +289,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
data.ScaleBy(query.ScaleFactor)
|
data.ScaleBy(query.ScaleFactor)
|
||||||
}
|
}
|
||||||
if req.WithPadding {
|
if req.WithPadding {
|
||||||
data.PadDataWithNull(req.From, req.To, query.Metric)
|
data.PadDataWithNull(ms, req.From, req.To, query.Metric)
|
||||||
}
|
}
|
||||||
if !req.WithData {
|
if !req.WithData {
|
||||||
data.Data = nil
|
data.Data = nil
|
||||||
@ -324,6 +308,20 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleDebug(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
raw := r.URL.Query().Get("selector")
|
||||||
|
selector := []string{}
|
||||||
|
if len(raw) != 0 {
|
||||||
|
selector = strings.Split(raw, ":")
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := memorystore.GetMemoryStore()
|
||||||
|
if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil {
|
||||||
|
rw.WriteHeader(http.StatusBadRequest)
|
||||||
|
rw.Write([]byte(err.Error()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
|
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
|
||||||
cacheLock := sync.RWMutex{}
|
cacheLock := sync.RWMutex{}
|
||||||
cache := map[string]*jwt.Token{}
|
cache := map[string]*jwt.Token{}
|
||||||
@ -375,18 +373,7 @@ func StartApiServer(ctx context.Context, httpConfig *config.HttpConfig) error {
|
|||||||
r.HandleFunc("/api/free", handleFree)
|
r.HandleFunc("/api/free", handleFree)
|
||||||
r.HandleFunc("/api/write", handleWrite)
|
r.HandleFunc("/api/write", handleWrite)
|
||||||
r.HandleFunc("/api/query", handleQuery)
|
r.HandleFunc("/api/query", handleQuery)
|
||||||
r.HandleFunc("/api/debug", func(rw http.ResponseWriter, r *http.Request) {
|
r.HandleFunc("/api/debug", handleDebug)
|
||||||
raw := r.URL.Query().Get("selector")
|
|
||||||
selector := []string{}
|
|
||||||
if len(raw) != 0 {
|
|
||||||
selector = strings.Split(raw, ":")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := memoryStore.DebugDump(bufio.NewWriter(rw), selector); err != nil {
|
|
||||||
rw.WriteHeader(http.StatusBadRequest)
|
|
||||||
rw.Write([]byte(err.Error()))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Handler: r,
|
Handler: r,
|
||||||
@ -395,8 +382,8 @@ func StartApiServer(ctx context.Context, httpConfig *config.HttpConfig) error {
|
|||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(conf.JwtPublicKey) > 0 {
|
if len(config.Keys.JwtPublicKey) > 0 {
|
||||||
buf, err := base64.StdEncoding.DecodeString(conf.JwtPublicKey)
|
buf, err := base64.StdEncoding.DecodeString(config.Keys.JwtPublicKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -191,7 +191,7 @@ func decodeLine(dec *lineprotocol.Decoder,
|
|||||||
// cluster and host. By using `WriteToLevel` (level = host), we do not need
|
// cluster and host. By using `WriteToLevel` (level = host), we do not need
|
||||||
// to take the root- and cluster-level lock as often.
|
// to take the root- and cluster-level lock as often.
|
||||||
var lvl *memorystore.Level = nil
|
var lvl *memorystore.Level = nil
|
||||||
var prevCluster, prevHost string = "", ""
|
prevCluster, prevHost := "", ""
|
||||||
|
|
||||||
var ok bool
|
var ok bool
|
||||||
for dec.Next() {
|
for dec.Next() {
|
||||||
|
@ -96,8 +96,9 @@ type Config struct {
|
|||||||
Nats []*NatsConfig `json:"nats"`
|
Nats []*NatsConfig `json:"nats"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadConfiguration(file string) Config {
|
var Keys Config
|
||||||
var config Config
|
|
||||||
|
func Init(file string) {
|
||||||
configFile, err := os.Open(file)
|
configFile, err := os.Open(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -105,8 +106,7 @@ func LoadConfiguration(file string) Config {
|
|||||||
defer configFile.Close()
|
defer configFile.Close()
|
||||||
dec := json.NewDecoder(configFile)
|
dec := json.NewDecoder(configFile)
|
||||||
dec.DisallowUnknownFields()
|
dec.DisallowUnknownFields()
|
||||||
if err := dec.Decode(&config); err != nil {
|
if err := dec.Decode(&Keys); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
return config
|
|
||||||
}
|
}
|
||||||
|
@ -3,470 +3,57 @@ package memorystore
|
|||||||
import (
|
import (
|
||||||
"archive/zip"
|
"archive/zip"
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/fs"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Whenever changed, update MarshalJSON as well!
|
func Archiving(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
type CheckpointMetrics struct {
|
go func() {
|
||||||
Frequency int64 `json:"frequency"`
|
defer wg.Done()
|
||||||
Start int64 `json:"start"`
|
d, err := time.ParseDuration(config.Keys.Archive.Interval)
|
||||||
Data []Float `json:"data"`
|
if err != nil {
|
||||||
}
|
log.Fatal(err)
|
||||||
|
|
||||||
// As `Float` implements a custom MarshalJSON() function,
|
|
||||||
// serializing an array of such types has more overhead
|
|
||||||
// than one would assume (because of extra allocations, interfaces and so on).
|
|
||||||
func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
|
|
||||||
buf := make([]byte, 0, 128+len(cm.Data)*8)
|
|
||||||
buf = append(buf, `{"frequency":`...)
|
|
||||||
buf = strconv.AppendInt(buf, cm.Frequency, 10)
|
|
||||||
buf = append(buf, `,"start":`...)
|
|
||||||
buf = strconv.AppendInt(buf, cm.Start, 10)
|
|
||||||
buf = append(buf, `,"data":[`...)
|
|
||||||
for i, x := range cm.Data {
|
|
||||||
if i != 0 {
|
|
||||||
buf = append(buf, ',')
|
|
||||||
}
|
}
|
||||||
if x.IsNaN() {
|
if d <= 0 {
|
||||||
buf = append(buf, `null`...)
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ticks := func() <-chan time.Time {
|
||||||
|
if d <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return time.NewTicker(d).C
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticks:
|
||||||
|
t := time.Now().Add(-d)
|
||||||
|
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
||||||
|
n, err := ArchiveCheckpoints(config.Keys.Checkpoints.RootDir, config.Keys.Archive.RootDir, t.Unix(), config.Keys.Archive.DeleteInstead)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("archiving failed: %s\n", err.Error())
|
||||||
} else {
|
} else {
|
||||||
buf = strconv.AppendFloat(buf, float64(x), 'f', 1, 32)
|
log.Printf("done: %d files zipped and moved to archive\n", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
buf = append(buf, `]}`...)
|
}
|
||||||
return buf, nil
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
type CheckpointFile struct {
|
|
||||||
From int64 `json:"from"`
|
|
||||||
To int64 `json:"to"`
|
|
||||||
Metrics map[string]*CheckpointMetrics `json:"metrics"`
|
|
||||||
Children map[string]*CheckpointFile `json:"children"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrNoNewData error = errors.New("all data already archived")
|
var ErrNoNewData error = errors.New("all data already archived")
|
||||||
|
|
||||||
var NumWorkers int = 4
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
maxWorkers := 10
|
|
||||||
NumWorkers = runtime.NumCPU()/2 + 1
|
|
||||||
if NumWorkers > maxWorkers {
|
|
||||||
NumWorkers = maxWorkers
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
|
|
||||||
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
|
|
||||||
// The good thing: Only a host at a time is locked, so this function can run
|
|
||||||
// in parallel to writes/reads.
|
|
||||||
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
|
|
||||||
levels := make([]*Level, 0)
|
|
||||||
selectors := make([][]string, 0)
|
|
||||||
m.root.lock.RLock()
|
|
||||||
for sel1, l1 := range m.root.children {
|
|
||||||
l1.lock.RLock()
|
|
||||||
for sel2, l2 := range l1.children {
|
|
||||||
levels = append(levels, l2)
|
|
||||||
selectors = append(selectors, []string{sel1, sel2})
|
|
||||||
}
|
|
||||||
l1.lock.RUnlock()
|
|
||||||
}
|
|
||||||
m.root.lock.RUnlock()
|
|
||||||
|
|
||||||
type workItem struct {
|
|
||||||
level *Level
|
|
||||||
dir string
|
|
||||||
selector []string
|
|
||||||
}
|
|
||||||
|
|
||||||
n, errs := int32(0), int32(0)
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(NumWorkers)
|
|
||||||
work := make(chan workItem, NumWorkers*2)
|
|
||||||
for worker := 0; worker < NumWorkers; worker++ {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
for workItem := range work {
|
|
||||||
if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil {
|
|
||||||
if err == ErrNoNewData {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error())
|
|
||||||
atomic.AddInt32(&errs, 1)
|
|
||||||
} else {
|
|
||||||
atomic.AddInt32(&n, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(levels); i++ {
|
|
||||||
dir := path.Join(dir, path.Join(selectors[i]...))
|
|
||||||
work <- workItem{
|
|
||||||
level: levels[i],
|
|
||||||
dir: dir,
|
|
||||||
selector: selectors[i],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close(work)
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if errs > 0 {
|
|
||||||
return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n)
|
|
||||||
}
|
|
||||||
return int(n), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) {
|
|
||||||
l.lock.RLock()
|
|
||||||
defer l.lock.RUnlock()
|
|
||||||
|
|
||||||
retval := &CheckpointFile{
|
|
||||||
From: from,
|
|
||||||
To: to,
|
|
||||||
Metrics: make(map[string]*CheckpointMetrics),
|
|
||||||
Children: make(map[string]*CheckpointFile),
|
|
||||||
}
|
|
||||||
|
|
||||||
for metric, minfo := range m.Metrics {
|
|
||||||
b := l.metrics[minfo.offset]
|
|
||||||
if b == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
allArchived := true
|
|
||||||
b.iterFromTo(from, to, func(b *buffer) error {
|
|
||||||
if !b.archived {
|
|
||||||
allArchived = false
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if allArchived {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
data := make([]Float, (to-from)/b.frequency+1)
|
|
||||||
data, start, end, err := b.read(from, to, data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := int((end - start) / b.frequency); i < len(data); i++ {
|
|
||||||
data[i] = NaN
|
|
||||||
}
|
|
||||||
|
|
||||||
retval.Metrics[metric] = &CheckpointMetrics{
|
|
||||||
Frequency: b.frequency,
|
|
||||||
Start: start,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, child := range l.children {
|
|
||||||
val, err := child.toCheckpointFile(from, to, m)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if val != nil {
|
|
||||||
retval.Children[name] = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(retval.Children) == 0 && len(retval.Metrics) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return retval, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
|
||||||
cf, err := l.toCheckpointFile(from, to, m)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if cf == nil {
|
|
||||||
return ErrNoNewData
|
|
||||||
}
|
|
||||||
|
|
||||||
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
|
||||||
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
|
|
||||||
if err != nil && os.IsNotExist(err) {
|
|
||||||
err = os.MkdirAll(dir, 0o755)
|
|
||||||
if err == nil {
|
|
||||||
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
bw := bufio.NewWriter(f)
|
|
||||||
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return bw.Flush()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
|
|
||||||
// This function can only be called once and before the very first write or read.
|
|
||||||
// Different host's data is loaded to memory in parallel.
|
|
||||||
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
work := make(chan [2]string, NumWorkers)
|
|
||||||
n, errs := int32(0), int32(0)
|
|
||||||
|
|
||||||
wg.Add(NumWorkers)
|
|
||||||
for worker := 0; worker < NumWorkers; worker++ {
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
for host := range work {
|
|
||||||
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
|
|
||||||
nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("error while loading checkpoints: %s", err.Error())
|
|
||||||
atomic.AddInt32(&errs, 1)
|
|
||||||
}
|
|
||||||
atomic.AddInt32(&n, int32(nn))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
i := 0
|
|
||||||
clustersDir, err := os.ReadDir(dir)
|
|
||||||
for _, clusterDir := range clustersDir {
|
|
||||||
if !clusterDir.IsDir() {
|
|
||||||
err = errors.New("expected only directories at first level of checkpoints/ directory")
|
|
||||||
goto done
|
|
||||||
}
|
|
||||||
|
|
||||||
hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
|
|
||||||
if e != nil {
|
|
||||||
err = e
|
|
||||||
goto done
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, hostDir := range hostsDir {
|
|
||||||
if !hostDir.IsDir() {
|
|
||||||
err = errors.New("expected only directories at second level of checkpoints/ directory")
|
|
||||||
goto done
|
|
||||||
}
|
|
||||||
|
|
||||||
i++
|
|
||||||
if i%NumWorkers == 0 && i > 100 {
|
|
||||||
// Forcing garbage collection runs here regulary during the loading of checkpoints
|
|
||||||
// will decrease the total heap size after loading everything back to memory is done.
|
|
||||||
// While loading data, the heap will grow fast, so the GC target size will double
|
|
||||||
// almost always. By forcing GCs here, we can keep it growing more slowly so that
|
|
||||||
// at the end, less memory is wasted.
|
|
||||||
runtime.GC()
|
|
||||||
}
|
|
||||||
|
|
||||||
work <- [2]string{clusterDir.Name(), hostDir.Name()}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
done:
|
|
||||||
close(work)
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return int(n), err
|
|
||||||
}
|
|
||||||
|
|
||||||
if errs > 0 {
|
|
||||||
return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n)
|
|
||||||
}
|
|
||||||
return int(n), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
|
||||||
for name, metric := range cf.Metrics {
|
|
||||||
n := len(metric.Data)
|
|
||||||
b := &buffer{
|
|
||||||
frequency: metric.Frequency,
|
|
||||||
start: metric.Start,
|
|
||||||
data: metric.Data[0:n:n], // Space is wasted here :(
|
|
||||||
prev: nil,
|
|
||||||
next: nil,
|
|
||||||
archived: true,
|
|
||||||
}
|
|
||||||
b.close()
|
|
||||||
|
|
||||||
minfo, ok := m.Metrics[name]
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
// return errors.New("Unkown metric: " + name)
|
|
||||||
}
|
|
||||||
|
|
||||||
prev := l.metrics[minfo.offset]
|
|
||||||
if prev == nil {
|
|
||||||
l.metrics[minfo.offset] = b
|
|
||||||
} else {
|
|
||||||
if prev.start > b.start {
|
|
||||||
return errors.New("wooops")
|
|
||||||
}
|
|
||||||
|
|
||||||
b.prev = prev
|
|
||||||
prev.next = b
|
|
||||||
}
|
|
||||||
l.metrics[minfo.offset] = b
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(cf.Children) > 0 && l.children == nil {
|
|
||||||
l.children = make(map[string]*Level)
|
|
||||||
}
|
|
||||||
|
|
||||||
for sel, childCf := range cf.Children {
|
|
||||||
child, ok := l.children[sel]
|
|
||||||
if !ok {
|
|
||||||
child = &Level{
|
|
||||||
metrics: make([]*buffer, len(m.Metrics)),
|
|
||||||
children: nil,
|
|
||||||
}
|
|
||||||
l.children[sel] = child
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := child.loadFile(childCf, m); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
|
|
||||||
direntries, err := os.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonFiles := make([]fs.DirEntry, 0)
|
|
||||||
filesLoaded := 0
|
|
||||||
for _, e := range direntries {
|
|
||||||
if e.IsDir() {
|
|
||||||
child := &Level{
|
|
||||||
metrics: make([]*buffer, len(m.Metrics)),
|
|
||||||
children: make(map[string]*Level),
|
|
||||||
}
|
|
||||||
|
|
||||||
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m)
|
|
||||||
filesLoaded += files
|
|
||||||
if err != nil {
|
|
||||||
return filesLoaded, err
|
|
||||||
}
|
|
||||||
|
|
||||||
l.children[e.Name()] = child
|
|
||||||
} else if strings.HasSuffix(e.Name(), ".json") {
|
|
||||||
jsonFiles = append(jsonFiles, e)
|
|
||||||
} else {
|
|
||||||
return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
files, err := findFiles(jsonFiles, from, true)
|
|
||||||
if err != nil {
|
|
||||||
return filesLoaded, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, filename := range files {
|
|
||||||
f, err := os.Open(path.Join(dir, filename))
|
|
||||||
if err != nil {
|
|
||||||
return filesLoaded, err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
br := bufio.NewReader(f)
|
|
||||||
cf := &CheckpointFile{}
|
|
||||||
if err = json.NewDecoder(br).Decode(cf); err != nil {
|
|
||||||
return filesLoaded, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if cf.To != 0 && cf.To < from {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = l.loadFile(cf, m); err != nil {
|
|
||||||
return filesLoaded, err
|
|
||||||
}
|
|
||||||
|
|
||||||
filesLoaded += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
return filesLoaded, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will probably get very slow over time!
|
|
||||||
// A solution could be some sort of an index file in which all other files
|
|
||||||
// and the timespan they contain is listed.
|
|
||||||
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
|
|
||||||
nums := map[string]int64{}
|
|
||||||
for _, e := range direntries {
|
|
||||||
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
nums[e.Name()] = ts
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(direntries, func(i, j int) bool {
|
|
||||||
a, b := direntries[i], direntries[j]
|
|
||||||
return nums[a.Name()] < nums[b.Name()]
|
|
||||||
})
|
|
||||||
|
|
||||||
filenames := make([]string, 0)
|
|
||||||
for i := 0; i < len(direntries); i++ {
|
|
||||||
e := direntries[i]
|
|
||||||
ts1 := nums[e.Name()]
|
|
||||||
|
|
||||||
if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 {
|
|
||||||
filenames = append(filenames, e.Name())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
enext := direntries[i+1]
|
|
||||||
ts2 := nums[enext.Name()]
|
|
||||||
|
|
||||||
if findMoreRecentFiles {
|
|
||||||
if ts1 < t && t < ts2 {
|
|
||||||
filenames = append(filenames, e.Name())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if ts2 < t {
|
|
||||||
filenames = append(filenames, e.Name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return filenames, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`,
|
// ZIP all checkpoint files older than `from` together and write them to the `archiveDir`,
|
||||||
// deleting them from the `checkpointsDir`.
|
// deleting them from the `checkpointsDir`.
|
||||||
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) {
|
func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) {
|
||||||
|
@ -34,21 +34,13 @@ var (
|
|||||||
// If `cap(data)` is reached, a new buffer is created and
|
// If `cap(data)` is reached, a new buffer is created and
|
||||||
// becomes the new head of a buffer list.
|
// becomes the new head of a buffer list.
|
||||||
type buffer struct {
|
type buffer struct {
|
||||||
frequency int64 // Time between two "slots"
|
prev *buffer
|
||||||
start int64 // Timestamp of when `data[0]` was written.
|
next *buffer
|
||||||
data []util.Float // The slice should never reallocacte as `cap(data)` is respected.
|
data []util.Float
|
||||||
prev, next *buffer // `prev` contains older data, `next` newer data.
|
frequency int64
|
||||||
archived bool // If true, this buffer is already archived
|
start int64
|
||||||
|
archived bool
|
||||||
closed bool
|
closed bool
|
||||||
/*
|
|
||||||
statisticts struct {
|
|
||||||
samples int
|
|
||||||
min Float
|
|
||||||
max Float
|
|
||||||
avg Float
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBuffer(ts, freq int64) *buffer {
|
func newBuffer(ts, freq int64) *buffer {
|
||||||
@ -163,8 +155,8 @@ func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, i
|
|||||||
from = b.firstWrite()
|
from = b.firstWrite()
|
||||||
}
|
}
|
||||||
|
|
||||||
var i int = 0
|
i := 0
|
||||||
var t int64 = from
|
t := from
|
||||||
for ; t < to; t += b.frequency {
|
for ; t < to; t += b.frequency {
|
||||||
idx := int((t - b.start) / b.frequency)
|
idx := int((t - b.start) / b.frequency)
|
||||||
if idx >= cap(b.data) {
|
if idx >= cap(b.data) {
|
||||||
|
501
internal/memorystore/checkpoint.go
Normal file
501
internal/memorystore/checkpoint.go
Normal file
@ -0,0 +1,501 @@
|
|||||||
|
package memorystore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Whenever changed, update MarshalJSON as well!
|
||||||
|
type CheckpointMetrics struct {
|
||||||
|
Data []util.Float `json:"data"`
|
||||||
|
Frequency int64 `json:"frequency"`
|
||||||
|
Start int64 `json:"start"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CheckpointFile struct {
|
||||||
|
Metrics map[string]*CheckpointMetrics `json:"metrics"`
|
||||||
|
Children map[string]*CheckpointFile `json:"children"`
|
||||||
|
From int64 `json:"from"`
|
||||||
|
To int64 `json:"to"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastCheckpoint time.Time
|
||||||
|
|
||||||
|
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
|
lastCheckpoint = time.Now()
|
||||||
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
if d <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ticks := func() <-chan time.Time {
|
||||||
|
if d <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return time.NewTicker(d).C
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticks:
|
||||||
|
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
||||||
|
now := time.Now()
|
||||||
|
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
|
||||||
|
lastCheckpoint.Unix(), now.Unix())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("checkpointing failed: %s\n", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Printf("done: %d checkpoint files created\n", n)
|
||||||
|
lastCheckpoint = now
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// As `Float` implements a custom MarshalJSON() function,
|
||||||
|
// serializing an array of such types has more overhead
|
||||||
|
// than one would assume (because of extra allocations, interfaces and so on).
|
||||||
|
func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
|
||||||
|
buf := make([]byte, 0, 128+len(cm.Data)*8)
|
||||||
|
buf = append(buf, `{"frequency":`...)
|
||||||
|
buf = strconv.AppendInt(buf, cm.Frequency, 10)
|
||||||
|
buf = append(buf, `,"start":`...)
|
||||||
|
buf = strconv.AppendInt(buf, cm.Start, 10)
|
||||||
|
buf = append(buf, `,"data":[`...)
|
||||||
|
for i, x := range cm.Data {
|
||||||
|
if i != 0 {
|
||||||
|
buf = append(buf, ',')
|
||||||
|
}
|
||||||
|
if x.IsNaN() {
|
||||||
|
buf = append(buf, `null`...)
|
||||||
|
} else {
|
||||||
|
buf = strconv.AppendFloat(buf, float64(x), 'f', 1, 32)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf = append(buf, `]}`...)
|
||||||
|
return buf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics stored at the lowest 2 levels are not stored away (root and cluster)!
|
||||||
|
// On a per-host basis a new JSON file is created. I have no idea if this will scale.
|
||||||
|
// The good thing: Only a host at a time is locked, so this function can run
|
||||||
|
// in parallel to writes/reads.
|
||||||
|
func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
|
||||||
|
levels := make([]*Level, 0)
|
||||||
|
selectors := make([][]string, 0)
|
||||||
|
m.root.lock.RLock()
|
||||||
|
for sel1, l1 := range m.root.children {
|
||||||
|
l1.lock.RLock()
|
||||||
|
for sel2, l2 := range l1.children {
|
||||||
|
levels = append(levels, l2)
|
||||||
|
selectors = append(selectors, []string{sel1, sel2})
|
||||||
|
}
|
||||||
|
l1.lock.RUnlock()
|
||||||
|
}
|
||||||
|
m.root.lock.RUnlock()
|
||||||
|
|
||||||
|
type workItem struct {
|
||||||
|
level *Level
|
||||||
|
dir string
|
||||||
|
selector []string
|
||||||
|
}
|
||||||
|
|
||||||
|
n, errs := int32(0), int32(0)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(NumWorkers)
|
||||||
|
work := make(chan workItem, NumWorkers*2)
|
||||||
|
for worker := 0; worker < NumWorkers; worker++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for workItem := range work {
|
||||||
|
if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil {
|
||||||
|
if err == ErrNoNewData {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error())
|
||||||
|
atomic.AddInt32(&errs, 1)
|
||||||
|
} else {
|
||||||
|
atomic.AddInt32(&n, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(levels); i++ {
|
||||||
|
dir := path.Join(dir, path.Join(selectors[i]...))
|
||||||
|
work <- workItem{
|
||||||
|
level: levels[i],
|
||||||
|
dir: dir,
|
||||||
|
selector: selectors[i],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(work)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if errs > 0 {
|
||||||
|
return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n)
|
||||||
|
}
|
||||||
|
return int(n), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) {
|
||||||
|
l.lock.RLock()
|
||||||
|
defer l.lock.RUnlock()
|
||||||
|
|
||||||
|
retval := &CheckpointFile{
|
||||||
|
From: from,
|
||||||
|
To: to,
|
||||||
|
Metrics: make(map[string]*CheckpointMetrics),
|
||||||
|
Children: make(map[string]*CheckpointFile),
|
||||||
|
}
|
||||||
|
|
||||||
|
for metric, minfo := range m.Metrics {
|
||||||
|
b := l.metrics[minfo.Offset]
|
||||||
|
if b == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
allArchived := true
|
||||||
|
b.iterFromTo(from, to, func(b *buffer) error {
|
||||||
|
if !b.archived {
|
||||||
|
allArchived = false
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if allArchived {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
data := make([]util.Float, (to-from)/b.frequency+1)
|
||||||
|
data, start, end, err := b.read(from, to, data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := int((end - start) / b.frequency); i < len(data); i++ {
|
||||||
|
data[i] = util.NaN
|
||||||
|
}
|
||||||
|
|
||||||
|
retval.Metrics[metric] = &CheckpointMetrics{
|
||||||
|
Frequency: b.frequency,
|
||||||
|
Start: start,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, child := range l.children {
|
||||||
|
val, err := child.toCheckpointFile(from, to, m)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if val != nil {
|
||||||
|
retval.Children[name] = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(retval.Children) == 0 && len(retval.Metrics) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return retval, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
||||||
|
cf, err := l.toCheckpointFile(from, to, m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cf == nil {
|
||||||
|
return ErrNoNewData
|
||||||
|
}
|
||||||
|
|
||||||
|
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
|
||||||
|
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
|
||||||
|
if err != nil && os.IsNotExist(err) {
|
||||||
|
err = os.MkdirAll(dir, 0o755)
|
||||||
|
if err == nil {
|
||||||
|
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
bw := bufio.NewWriter(f)
|
||||||
|
if err = json.NewEncoder(bw).Encode(cf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bw.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
|
||||||
|
// This function can only be called once and before the very first write or read.
|
||||||
|
// Different host's data is loaded to memory in parallel.
|
||||||
|
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
work := make(chan [2]string, NumWorkers)
|
||||||
|
n, errs := int32(0), int32(0)
|
||||||
|
|
||||||
|
wg.Add(NumWorkers)
|
||||||
|
for worker := 0; worker < NumWorkers; worker++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for host := range work {
|
||||||
|
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
|
||||||
|
nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error while loading checkpoints: %s", err.Error())
|
||||||
|
atomic.AddInt32(&errs, 1)
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&n, int32(nn))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
clustersDir, err := os.ReadDir(dir)
|
||||||
|
for _, clusterDir := range clustersDir {
|
||||||
|
if !clusterDir.IsDir() {
|
||||||
|
err = errors.New("expected only directories at first level of checkpoints/ directory")
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
|
||||||
|
if e != nil {
|
||||||
|
err = e
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, hostDir := range hostsDir {
|
||||||
|
if !hostDir.IsDir() {
|
||||||
|
err = errors.New("expected only directories at second level of checkpoints/ directory")
|
||||||
|
goto done
|
||||||
|
}
|
||||||
|
|
||||||
|
i++
|
||||||
|
if i%NumWorkers == 0 && i > 100 {
|
||||||
|
// Forcing garbage collection runs here regulary during the loading of checkpoints
|
||||||
|
// will decrease the total heap size after loading everything back to memory is done.
|
||||||
|
// While loading data, the heap will grow fast, so the GC target size will double
|
||||||
|
// almost always. By forcing GCs here, we can keep it growing more slowly so that
|
||||||
|
// at the end, less memory is wasted.
|
||||||
|
runtime.GC()
|
||||||
|
}
|
||||||
|
|
||||||
|
work <- [2]string{clusterDir.Name(), hostDir.Name()}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done:
|
||||||
|
close(work)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return int(n), err
|
||||||
|
}
|
||||||
|
|
||||||
|
if errs > 0 {
|
||||||
|
return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n)
|
||||||
|
}
|
||||||
|
return int(n), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
||||||
|
for name, metric := range cf.Metrics {
|
||||||
|
n := len(metric.Data)
|
||||||
|
b := &buffer{
|
||||||
|
frequency: metric.Frequency,
|
||||||
|
start: metric.Start,
|
||||||
|
data: metric.Data[0:n:n], // Space is wasted here :(
|
||||||
|
prev: nil,
|
||||||
|
next: nil,
|
||||||
|
archived: true,
|
||||||
|
}
|
||||||
|
b.close()
|
||||||
|
|
||||||
|
minfo, ok := m.Metrics[name]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
// return errors.New("Unkown metric: " + name)
|
||||||
|
}
|
||||||
|
|
||||||
|
prev := l.metrics[minfo.Offset]
|
||||||
|
if prev == nil {
|
||||||
|
l.metrics[minfo.Offset] = b
|
||||||
|
} else {
|
||||||
|
if prev.start > b.start {
|
||||||
|
return errors.New("wooops")
|
||||||
|
}
|
||||||
|
|
||||||
|
b.prev = prev
|
||||||
|
prev.next = b
|
||||||
|
}
|
||||||
|
l.metrics[minfo.Offset] = b
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cf.Children) > 0 && l.children == nil {
|
||||||
|
l.children = make(map[string]*Level)
|
||||||
|
}
|
||||||
|
|
||||||
|
for sel, childCf := range cf.Children {
|
||||||
|
child, ok := l.children[sel]
|
||||||
|
if !ok {
|
||||||
|
child = &Level{
|
||||||
|
metrics: make([]*buffer, len(m.Metrics)),
|
||||||
|
children: nil,
|
||||||
|
}
|
||||||
|
l.children[sel] = child
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := child.loadFile(childCf, m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
|
||||||
|
direntries, err := os.ReadDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonFiles := make([]fs.DirEntry, 0)
|
||||||
|
filesLoaded := 0
|
||||||
|
for _, e := range direntries {
|
||||||
|
if e.IsDir() {
|
||||||
|
child := &Level{
|
||||||
|
metrics: make([]*buffer, len(m.Metrics)),
|
||||||
|
children: make(map[string]*Level),
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m)
|
||||||
|
filesLoaded += files
|
||||||
|
if err != nil {
|
||||||
|
return filesLoaded, err
|
||||||
|
}
|
||||||
|
|
||||||
|
l.children[e.Name()] = child
|
||||||
|
} else if strings.HasSuffix(e.Name(), ".json") {
|
||||||
|
jsonFiles = append(jsonFiles, e)
|
||||||
|
} else {
|
||||||
|
return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := findFiles(jsonFiles, from, true)
|
||||||
|
if err != nil {
|
||||||
|
return filesLoaded, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, filename := range files {
|
||||||
|
f, err := os.Open(path.Join(dir, filename))
|
||||||
|
if err != nil {
|
||||||
|
return filesLoaded, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
br := bufio.NewReader(f)
|
||||||
|
cf := &CheckpointFile{}
|
||||||
|
if err = json.NewDecoder(br).Decode(cf); err != nil {
|
||||||
|
return filesLoaded, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cf.To != 0 && cf.To < from {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = l.loadFile(cf, m); err != nil {
|
||||||
|
return filesLoaded, err
|
||||||
|
}
|
||||||
|
|
||||||
|
filesLoaded += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return filesLoaded, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will probably get very slow over time!
|
||||||
|
// A solution could be some sort of an index file in which all other files
|
||||||
|
// and the timespan they contain is listed.
|
||||||
|
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
|
||||||
|
nums := map[string]int64{}
|
||||||
|
for _, e := range direntries {
|
||||||
|
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nums[e.Name()] = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(direntries, func(i, j int) bool {
|
||||||
|
a, b := direntries[i], direntries[j]
|
||||||
|
return nums[a.Name()] < nums[b.Name()]
|
||||||
|
})
|
||||||
|
|
||||||
|
filenames := make([]string, 0)
|
||||||
|
for i := 0; i < len(direntries); i++ {
|
||||||
|
e := direntries[i]
|
||||||
|
ts1 := nums[e.Name()]
|
||||||
|
|
||||||
|
if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 {
|
||||||
|
filenames = append(filenames, e.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
enext := direntries[i+1]
|
||||||
|
ts2 := nums[enext.Name()]
|
||||||
|
|
||||||
|
if findMoreRecentFiles {
|
||||||
|
if ts1 < t && t < ts2 {
|
||||||
|
filenames = append(filenames, e.Name())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if ts2 < t {
|
||||||
|
filenames = append(filenames, e.Name())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filenames, nil
|
||||||
|
}
|
@ -41,7 +41,7 @@ func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [
|
|||||||
depth += 1
|
depth += 1
|
||||||
objitems := 0
|
objitems := 0
|
||||||
for name, mc := range m.Metrics {
|
for name, mc := range m.Metrics {
|
||||||
if b := l.metrics[mc.offset]; b != nil {
|
if b := l.metrics[mc.Offset]; b != nil {
|
||||||
for i := 0; i < depth; i++ {
|
for i := 0; i < depth; i++ {
|
||||||
buf = append(buf, '\t')
|
buf = append(buf, '\t')
|
||||||
}
|
}
|
||||||
|
@ -12,9 +12,9 @@ import (
|
|||||||
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
|
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
|
||||||
// also hold data (in `metrics`).
|
// also hold data (in `metrics`).
|
||||||
type Level struct {
|
type Level struct {
|
||||||
|
children map[string]*Level
|
||||||
|
metrics []*buffer
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
metrics []*buffer // Every level can store metrics.
|
|
||||||
children map[string]*Level // Lower levels.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the correct level for the given selector, creating it if
|
// Find the correct level for the given selector, creating it if
|
||||||
@ -126,7 +126,7 @@ func (l *Level) findLevel(selector []string) *Level {
|
|||||||
return lvl.findLevel(selector[1:])
|
return lvl.findLevel(selector[1:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error {
|
func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error {
|
||||||
l.lock.RLock()
|
l.lock.RLock()
|
||||||
defer l.lock.RUnlock()
|
defer l.lock.RUnlock()
|
||||||
|
|
||||||
|
@ -1,9 +1,12 @@
|
|||||||
package memorystore
|
package memorystore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||||
@ -14,6 +17,16 @@ var (
|
|||||||
msInstance *MemoryStore
|
msInstance *MemoryStore
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var NumWorkers int = 4
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
maxWorkers := 10
|
||||||
|
NumWorkers = runtime.NumCPU()/2 + 1
|
||||||
|
if NumWorkers > maxWorkers {
|
||||||
|
NumWorkers = maxWorkers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Metric struct {
|
type Metric struct {
|
||||||
Name string
|
Name string
|
||||||
Value util.Float
|
Value util.Float
|
||||||
@ -21,8 +34,8 @@ type Metric struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MemoryStore struct {
|
type MemoryStore struct {
|
||||||
root Level // root of the tree structure
|
|
||||||
Metrics map[string]config.MetricConfig
|
Metrics map[string]config.MetricConfig
|
||||||
|
root Level
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new, initialized instance of a MemoryStore.
|
// Create a new, initialized instance of a MemoryStore.
|
||||||
@ -61,6 +74,54 @@ func GetMemoryStore() *MemoryStore {
|
|||||||
return msInstance
|
return msInstance
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Shutdown() {
|
||||||
|
ms := GetMemoryStore()
|
||||||
|
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
|
||||||
|
files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
||||||
|
}
|
||||||
|
log.Printf("Done! (%d files written)\n", files)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
||||||
|
ms := GetMemoryStore()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
d, err := time.ParseDuration(config.Keys.RetentionInMemory)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
if d <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ticks := func() <-chan time.Time {
|
||||||
|
d := d / 2
|
||||||
|
if d <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return time.NewTicker(d).C
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticks:
|
||||||
|
t := time.Now().Add(-d)
|
||||||
|
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
||||||
|
freed, err := ms.Free(nil, t.Unix())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("freeing up buffers failed: %s\n", err.Error())
|
||||||
|
} else {
|
||||||
|
log.Printf("done: %d buffers freed\n", freed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Write all values in `metrics` to the level specified by `selector` for time `ts`.
|
// Write all values in `metrics` to the level specified by `selector` for time `ts`.
|
||||||
// Look at `findLevelOrCreate` for how selectors work.
|
// Look at `findLevelOrCreate` for how selectors work.
|
||||||
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
|
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
|
||||||
@ -117,7 +178,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
|
|||||||
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
|
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
|
||||||
// The second and third return value are the actual from/to for the data. Those can be different from
|
// The second and third return value are the actual from/to for the data. Those can be different from
|
||||||
// the range asked for if no data was available.
|
// the range asked for if no data was available.
|
||||||
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) {
|
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) {
|
||||||
if from > to {
|
if from > to {
|
||||||
return nil, 0, 0, errors.New("invalid time range")
|
return nil, 0, 0, errors.New("invalid time range")
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
|
|||||||
// Returns statistics for the requested metric on the selected node/level.
|
// Returns statistics for the requested metric on the selected node/level.
|
||||||
// Data is aggregated to the selected level the same way as in `MemoryStore.Read`.
|
// Data is aggregated to the selected level the same way as in `MemoryStore.Read`.
|
||||||
// If `Stats.Samples` is zero, the statistics should not be considered as valid.
|
// If `Stats.Samples` is zero, the statistics should not be considered as valid.
|
||||||
func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) {
|
func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error) {
|
||||||
if from > to {
|
if from > to {
|
||||||
return nil, 0, 0, errors.New("invalid time range")
|
return nil, 0, 0, errors.New("invalid time range")
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package memorystore
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@ -6,9 +6,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type SelectorElement struct {
|
type SelectorElement struct {
|
||||||
Any bool
|
|
||||||
String string
|
String string
|
||||||
Group []string
|
Group []string
|
||||||
|
Any bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (se *SelectorElement) UnmarshalJSON(input []byte) error {
|
func (se *SelectorElement) UnmarshalJSON(input []byte) error {
|
Loading…
Reference in New Issue
Block a user