diff --git a/cc-metric-store.go b/cmd/cc-metric-store/main.go similarity index 61% rename from cc-metric-store.go rename to cmd/cc-metric-store/main.go index 8e69c95..5d0f1a3 100644 --- a/cc-metric-store.go +++ b/cmd/cc-metric-store/main.go @@ -3,9 +3,7 @@ package main import ( "bufio" "context" - "encoding/json" "flag" - "fmt" "io" "log" "os" @@ -16,119 +14,22 @@ import ( "syscall" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/api" + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/memstore" "github.com/google/gops/agent" ) -// For aggregation over multiple values at different cpus/sockets/..., not time! -type AggregationStrategy int - -const ( - NoAggregation AggregationStrategy = iota - SumAggregation - AvgAggregation +var ( + conf config.Config + memoryStore *memstore.MemoryStore = nil + lastCheckpoint time.Time ) -func (as *AggregationStrategy) UnmarshalJSON(data []byte) error { - var str string - if err := json.Unmarshal(data, &str); err != nil { - return err - } - - switch str { - case "": - *as = NoAggregation - case "sum": - *as = SumAggregation - case "avg": - *as = AvgAggregation - default: - return fmt.Errorf("invalid aggregation strategy: %#v", str) - } - return nil -} - -type MetricConfig struct { - // Interval in seconds at which measurements will arive. - Frequency int64 `json:"frequency"` - - // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. - Aggregation AggregationStrategy `json:"aggregation"` - - // Private, used internally... - offset int -} - -type HttpConfig struct { - // Address to bind to, for example "0.0.0.0:8081" - Address string `json:"address"` - - // If not the empty string, use https with this as the certificate file - CertFile string `json:"https-cert-file"` - - // If not the empty string, use https with this as the key file - KeyFile string `json:"https-key-file"` -} - -type NatsConfig struct { - // Address of the nats server - Address string `json:"address"` - - // Username/Password, optional - Username string `json:"username"` - Password string `json:"password"` - - Subscriptions []struct { - // Channel name - SubscribeTo string `json:"subscribe-to"` - - // Allow lines without a cluster tag, use this as default, optional - ClusterTag string `json:"cluster-tag"` - } `json:"subscriptions"` -} - -type Config struct { - Metrics map[string]MetricConfig `json:"metrics"` - RetentionInMemory string `json:"retention-in-memory"` - Nats []*NatsConfig `json:"nats"` - JwtPublicKey string `json:"jwt-public-key"` - HttpConfig *HttpConfig `json:"http-api"` - Checkpoints struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - Restore string `json:"restore"` - } `json:"checkpoints"` - Archive struct { - Interval string `json:"interval"` - RootDir string `json:"directory"` - DeleteInstead bool `json:"delete-instead"` - } `json:"archive"` - Debug struct { - EnableGops bool `json:"gops"` - DumpToFile string `json:"dump-to-file"` - } `json:"debug"` -} - -var conf Config -var memoryStore *MemoryStore = nil -var lastCheckpoint time.Time - -var debugDumpLock sync.Mutex -var debugDump io.Writer = io.Discard - -func loadConfiguration(file string) Config { - var config Config - configFile, err := os.Open(file) - if err != nil { - log.Fatal(err) - } - defer configFile.Close() - dec := json.NewDecoder(configFile) - dec.DisallowUnknownFields() - if err := dec.Decode(&config); err != nil { - log.Fatal(err) - } - return config -} +var ( + debugDumpLock sync.Mutex + debugDump io.Writer = io.Discard +) func intervals(wg *sync.WaitGroup, ctx context.Context) { wg.Add(3) @@ -222,7 +123,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) + n, err := memstore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) if err != nil { log.Printf("archiving failed: %s\n", err.Error()) } else { @@ -241,8 +142,8 @@ func main() { flag.Parse() startupTime := time.Now() - conf = loadConfiguration(configFile) - memoryStore = NewMemoryStore(conf.Metrics) + conf = config.LoadConfiguration(configFile) + memoryStore = memstore.NewMemoryStore(conf.Metrics) if enableGopsAgent || conf.Debug.EnableGops { if err := agent.Listen(agent.Options{}); err != nil { @@ -298,7 +199,7 @@ func main() { continue } - log.Println("Shuting down...") + log.Println("Shutting down...") shutdown() } }() @@ -308,7 +209,7 @@ func main() { wg.Add(1) go func() { - err := StartApiServer(ctx, conf.HttpConfig) + err := api.StartApiServer(ctx, conf.HttpConfig) if err != nil { log.Fatal(err) } @@ -322,8 +223,7 @@ func main() { nc := natsConf go func() { // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) - err := ReceiveNats(nc, decodeLine, 1, ctx) - + err := api.ReceiveNats(nc, decodeLine, 1, ctx) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 41b6df0..8bfedc2 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,21 @@ module github.com/ClusterCockpit/cc-metric-store -go 1.16 +go 1.19 require ( github.com/golang-jwt/jwt/v4 v4.0.0 - github.com/golang/protobuf v1.5.2 // indirect github.com/google/gops v0.3.22 github.com/gorilla/mux v1.8.0 github.com/influxdata/line-protocol/v2 v2.2.0 - github.com/nats-io/nats-server/v2 v2.2.6 // indirect github.com/nats-io/nats.go v1.11.0 ) + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.2.6 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect + golang.org/x/sys v0.0.0-20210902050250-f475640dd07b // indirect + google.golang.org/protobuf v1.26.0 // indirect +) diff --git a/go.sum b/go.sum index 479c8f2..17b80a9 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,12 @@ -github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1 h1:4dntyT+x6QTOSCIrgczbQ+ockAEha0cfxD5Wi0iCzjY= github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= -github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o= github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -25,24 +22,27 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gops v0.3.22 h1:lyvhDxfPLHAOR2xIYwjPhN387qHxyU21Sk9sz/GhmhQ= github.com/google/gops v0.3.22/go.mod h1:7diIdLsqpCihPSX3fQagksT/Ku/y4RL9LHTlKyEUDl8= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= github.com/influxdata/line-protocol/v2 v2.2.0 h1:UPmAqE15Hw5zu9E10SYhoXVLWnEJkWnuCbaCiRsA3c0= github.com/influxdata/line-protocol/v2 v2.2.0/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= -github.com/keybase/go-ps v0.0.0-20190827175125-91aafc93ba19 h1:WjT3fLi9n8YWh/Ih8Q1LHAPsTqGddPcHqscN+PJ3i68= github.com/keybase/go-ps v0.0.0-20190827175125-91aafc93ba19/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= @@ -61,15 +61,11 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/shirou/gopsutil/v3 v3.21.9 h1:Vn4MUz2uXhqLSiCbGFRc0DILbMVLAY92DSkT8bsYrHg= github.com/shirou/gopsutil/v3 v3.21.9/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= -github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= -github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk= github.com/xlab/treeprint v1.1.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -81,7 +77,6 @@ golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210902050250-f475640dd07b h1:S7hKs0Flbq0bbc9xgYt4stIEG1zNDFqyrPwAX2Wj/sE= @@ -92,6 +87,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -106,5 +102,4 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -rsc.io/goversion v1.2.0 h1:SPn+NLTiAG7w30IRK/DKp1BjvpWabYgxlLp/+kx5J8w= rsc.io/goversion v1.2.0/go.mod h1:Eih9y/uIBS3ulggl7KNJ09xGSLcuNaLgmvvqa07sgfo= diff --git a/api.go b/internal/api/api.go similarity index 98% rename from api.go rename to internal/api/api.go index dbe72cb..987f2c6 100644 --- a/api.go +++ b/internal/api/api.go @@ -1,4 +1,4 @@ -package main +package api import ( "bufio" @@ -262,15 +262,18 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { if query.SubType != nil { for _, subTypeId := range query.SubTypeIds { sels = append(sels, Selector{ - {String: req.Cluster}, {String: query.Hostname}, + {String: req.Cluster}, + {String: query.Hostname}, {String: *query.Type + typeId}, - {String: *query.SubType + subTypeId}}) + {String: *query.SubType + subTypeId}, + }) } } else { sels = append(sels, Selector{ {String: req.Cluster}, {String: query.Hostname}, - {String: *query.Type + typeId}}) + {String: *query.Type + typeId}, + }) } } } @@ -347,7 +350,6 @@ func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler return publicKey, nil }) - if err != nil { http.Error(rw, err.Error(), http.StatusUnauthorized) return diff --git a/lineprotocol.go b/internal/api/lineprotocol.go similarity index 92% rename from lineprotocol.go rename to internal/api/lineprotocol.go index b2e1692..9814463 100644 --- a/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -1,4 +1,4 @@ -package main +package api import ( "context" @@ -9,15 +9,17 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/util" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" ) type Metric struct { Name string - Value Float - - mc MetricConfig + Value util.Float + mc config.MetricConfig } // Currently unused, could be used to send messages via raw TCP. @@ -84,7 +86,7 @@ func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lin // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error { +func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error { var opts []nats.Option if conf.Username != "" && conf.Password != "" { opts = append(opts, nats.UserInfo(conf.Username, conf.Password)) @@ -175,7 +177,7 @@ func reorder(buf, prefix []byte) []byte { // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. -func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { +func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, clusterDefault string) error { // Reduce allocations in loop: t := time.Now() metric, metricBuf := Metric{}, make([]byte, 0, 16) @@ -292,11 +294,11 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } if val.Kind() == lineprotocol.Float { - metric.Value = Float(val.FloatV()) + metric.Value = util.Float(val.FloatV()) } else if val.Kind() == lineprotocol.Int { - metric.Value = Float(val.IntV()) + metric.Value = util.Float(val.IntV()) } else if val.Kind() == lineprotocol.Uint { - metric.Value = Float(val.UintV()) + metric.Value = util.Float(val.UintV()) } else { return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..54b64e0 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,112 @@ +package config + +import ( + "encoding/json" + "fmt" + "log" + "os" +) + +// For aggregation over multiple values at different cpus/sockets/..., not time! +type AggregationStrategy int + +const ( + NoAggregation AggregationStrategy = iota + SumAggregation + AvgAggregation +) + +func (as *AggregationStrategy) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + switch str { + case "": + *as = NoAggregation + case "sum": + *as = SumAggregation + case "avg": + *as = AvgAggregation + default: + return fmt.Errorf("invalid aggregation strategy: %#v", str) + } + return nil +} + +type MetricConfig struct { + // Interval in seconds at which measurements will arive. + Frequency int64 `json:"frequency"` + + // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. + Aggregation AggregationStrategy `json:"aggregation"` + + // Private, used internally... + offset int +} + +type HttpConfig struct { + // Address to bind to, for example "0.0.0.0:8081" + Address string `json:"address"` + + // If not the empty string, use https with this as the certificate file + CertFile string `json:"https-cert-file"` + + // If not the empty string, use https with this as the key file + KeyFile string `json:"https-key-file"` +} + +type NatsConfig struct { + // Address of the nats server + Address string `json:"address"` + + // Username/Password, optional + Username string `json:"username"` + Password string `json:"password"` + + Subscriptions []struct { + // Channel name + SubscribeTo string `json:"subscribe-to"` + + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` + } `json:"subscriptions"` +} + +type Config struct { + Metrics map[string]MetricConfig `json:"metrics"` + HttpConfig *HttpConfig `json:"http-api"` + Checkpoints struct { + Interval string `json:"interval"` + RootDir string `json:"directory"` + Restore string `json:"restore"` + } `json:"checkpoints"` + Debug struct { + DumpToFile string `json:"dump-to-file"` + EnableGops bool `json:"gops"` + } `json:"debug"` + RetentionInMemory string `json:"retention-in-memory"` + JwtPublicKey string `json:"jwt-public-key"` + Archive struct { + Interval string `json:"interval"` + RootDir string `json:"directory"` + DeleteInstead bool `json:"delete-instead"` + } `json:"archive"` + Nats []*NatsConfig `json:"nats"` +} + +func LoadConfiguration(file string) Config { + var config Config + configFile, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer configFile.Close() + dec := json.NewDecoder(configFile) + dec.DisallowUnknownFields() + if err := dec.Decode(&config); err != nil { + log.Fatal(err) + } + return config +} diff --git a/archive.go b/internal/memstore/archive.go similarity index 99% rename from archive.go rename to internal/memstore/archive.go index 302afb3..e1654f7 100644 --- a/archive.go +++ b/internal/memstore/archive.go @@ -1,4 +1,4 @@ -package main +package memstore import ( "archive/zip" diff --git a/memstore.go b/internal/memstore/memstore.go similarity index 91% rename from memstore.go rename to internal/memstore/memstore.go index 79b3f05..99b6295 100644 --- a/memstore.go +++ b/internal/memstore/memstore.go @@ -1,9 +1,13 @@ -package main +package memstore import ( "errors" "sync" "unsafe" + + "github.com/ClusterCockpit/cc-metric-store/internal/api" + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/util" ) // Default buffer capacity. @@ -18,7 +22,7 @@ const ( var bufferPool sync.Pool = sync.Pool{ New: func() interface{} { return &buffer{ - data: make([]Float, 0, BUFFER_CAP), + data: make([]util.Float, 0, BUFFER_CAP), } }, } @@ -33,11 +37,11 @@ var ( // If `cap(data)` is reached, a new buffer is created and // becomes the new head of a buffer list. type buffer struct { - frequency int64 // Time between two "slots" - start int64 // Timestamp of when `data[0]` was written. - data []Float // The slice should never reallocacte as `cap(data)` is respected. - prev, next *buffer // `prev` contains older data, `next` newer data. - archived bool // If true, this buffer is already archived + frequency int64 // Time between two "slots" + start int64 // Timestamp of when `data[0]` was written. + data []util.Float // The slice should never reallocacte as `cap(data)` is respected. + prev, next *buffer // `prev` contains older data, `next` newer data. + archived bool // If true, this buffer is already archived closed bool /* @@ -66,7 +70,7 @@ func newBuffer(ts, freq int64) *buffer { // Otherwise, the existing buffer is returnd. // Normaly, only "newer" data should be written, but if the value would // end up in the same buffer anyways it is allowed. -func (b *buffer) write(ts int64, value Float) (*buffer, error) { +func (b *buffer) write(ts int64, value util.Float) (*buffer, error) { if ts < b.start { return nil, errors.New("cannot write value to buffer from past") } @@ -90,7 +94,7 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) { // Fill up unwritten slots with NaN for i := len(b.data); i < idx; i++ { - b.data = append(b.data, NaN) + b.data = append(b.data, util.NaN) } b.data = append(b.data, value) @@ -154,7 +158,7 @@ func (b *buffer) close() { // This function goes back the buffer chain if `from` is older than the currents buffer start. // The loaded values are added to `data` and `data` is returned, possibly with a shorter length. // If `data` is not long enough to hold all values, this function will panic! -func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, error) { +func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, int64, error) { if from < b.firstWrite() { if b.prev != nil { return b.prev.read(from, to, data) @@ -178,9 +182,9 @@ func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, erro if b.next == nil || to <= b.next.start { break } - data[i] += NaN + data[i] += util.NaN } else if t < b.start { - data[i] += NaN + data[i] += util.NaN // } else if b.data[idx].IsNaN() { // data[i] += interpolate(idx, b.data) } else { @@ -335,7 +339,7 @@ func (l *level) sizeInBytes() int64 { for _, b := range l.metrics { if b != nil { - size += b.count() * int64(unsafe.Sizeof(Float(0))) + size += b.count() * int64(unsafe.Sizeof(util.Float(0))) } } @@ -348,12 +352,12 @@ func (l *level) sizeInBytes() int64 { type MemoryStore struct { root level // root of the tree structure - metrics map[string]MetricConfig + metrics map[string]config.MetricConfig } // Return a new, initialized instance of a MemoryStore. // Will panic if values in the metric configurations are invalid. -func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { +func NewMemoryStore(metrics map[string]config.MetricConfig) *MemoryStore { offset := 0 for key, config := range metrics { if config.Frequency == 0 { @@ -379,7 +383,7 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. -func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { +func (m *MemoryStore) Write(selector []string, ts int64, metrics []api.Metric) error { var ok bool for i, metric := range metrics { if metric.mc.Frequency == 0 { @@ -399,7 +403,7 @@ func (m *MemoryStore) GetLevel(selector []string) *level { } // Assumes that `minfo` in `metrics` is filled in! -func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error { +func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []api.Metric) error { l = l.findLevelOrCreate(selector, len(m.metrics)) l.lock.Lock() defer l.lock.Unlock() diff --git a/selector.go b/internal/memstore/selector.go similarity index 99% rename from selector.go rename to internal/memstore/selector.go index 25fe209..7bc498a 100644 --- a/selector.go +++ b/internal/memstore/selector.go @@ -1,4 +1,4 @@ -package main +package memstore import ( "encoding/json" diff --git a/float.go b/internal/util/float.go similarity index 92% rename from float.go rename to internal/util/float.go index eae2d98..603bc91 100644 --- a/float.go +++ b/internal/util/float.go @@ -1,4 +1,4 @@ -package main +package util import ( "math" @@ -11,8 +11,10 @@ import ( // we have to use our own type which implements encoding/json.Marshaler itself. type Float float64 -var NaN Float = Float(math.NaN()) -var nullAsBytes []byte = []byte("null") +var ( + NaN Float = Float(math.NaN()) + nullAsBytes []byte = []byte("null") +) func (f Float) IsNaN() bool { return math.IsNaN(float64(f)) @@ -55,7 +57,6 @@ func (fa FloatArray) MarshalJSON() ([]byte, error) { buf = append(buf, `null`...) } else { buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', 3, 64) - } } buf = append(buf, ']') diff --git a/stats.go b/internal/util/stats.go similarity index 99% rename from stats.go rename to internal/util/stats.go index 510891b..8e0f41f 100644 --- a/stats.go +++ b/internal/util/stats.go @@ -1,4 +1,4 @@ -package main +package util import ( "errors" @@ -94,7 +94,6 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* n += 1 return nil }) - if err != nil { return nil, 0, 0, err } diff --git a/lineprotocol_test.go b/lineprotocol_test.go deleted file mode 100644 index a6df786..0000000 --- a/lineprotocol_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package main - -import ( - "bytes" - "log" - "strconv" - "testing" - - "github.com/influxdata/line-protocol/v2/lineprotocol" -) - -const TestDataClassicFormat string = ` -m1,cluster=ctest,hostname=htest1,type=node value=1 123456789 -m2,cluster=ctest,hostname=htest1,type=node value=2 123456789 -m3,hostname=htest2,type=node value=3 123456789 -m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789 -m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789 -` - -const BenchmarkLineBatch string = ` -nm1,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm2,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm3,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm4,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm5,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm6,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm7,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm8,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -nm9,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 -cm1,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm2,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm3,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm4,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm5,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm6,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm7,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm8,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm9,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 -cm1,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm2,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm3,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm4,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm5,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm6,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm7,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm8,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm9,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 -cm1,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm2,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm3,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm4,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm5,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm6,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm7,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm8,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm9,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 -cm1,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm2,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm3,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm4,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm5,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm6,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm7,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm8,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -cm9,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 -` - -func TestLineprotocolDecoder(t *testing.T) { - prevMemoryStore := memoryStore - t.Cleanup(func() { - memoryStore = prevMemoryStore - }) - - memoryStore = NewMemoryStore(map[string]MetricConfig{ - "m1": {Frequency: 1}, - "m2": {Frequency: 1}, - "m3": {Frequency: 1}, - "m4": {Frequency: 1}, - }) - - dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat)) - if err := decodeLine(dec, "ctest"); err != nil { - log.Fatal(err) - } - - // memoryStore.DebugDump(bufio.NewWriter(os.Stderr)) - - h1 := memoryStore.GetLevel([]string{"ctest", "htest1"}) - h1b1 := h1.metrics[memoryStore.metrics["m1"].offset] - h1b2 := h1.metrics[memoryStore.metrics["m2"].offset] - if h1b1.data[0] != 1.0 || h1b2.data[0] != 2.0 { - log.Fatal() - } - - h2 := memoryStore.GetLevel([]string{"ctest", "htest2"}) - h2b3 := h2.metrics[memoryStore.metrics["m3"].offset] - if h2b3.data[0] != 3.0 { - log.Fatal() - } - - h2c1 := memoryStore.GetLevel([]string{"ctest", "htest2", "core1"}) - h2c1b4 := h2c1.metrics[memoryStore.metrics["m4"].offset] - h2c2 := memoryStore.GetLevel([]string{"ctest", "htest2", "core2"}) - h2c2b4 := h2c2.metrics[memoryStore.metrics["m4"].offset] - if h2c1b4.data[0] != 4.0 || h2c2b4.data[0] != 5.0 { - log.Fatal() - } -} - -func BenchmarkLineprotocolDecoder(b *testing.B) { - b.StopTimer() - memoryStore = NewMemoryStore(map[string]MetricConfig{ - "nm1": {Frequency: 1}, - "nm2": {Frequency: 1}, - "nm3": {Frequency: 1}, - "nm4": {Frequency: 1}, - "nm5": {Frequency: 1}, - "nm6": {Frequency: 1}, - "nm7": {Frequency: 1}, - "nm8": {Frequency: 1}, - "nm9": {Frequency: 1}, - "cm1": {Frequency: 1}, - "cm2": {Frequency: 1}, - "cm3": {Frequency: 1}, - "cm4": {Frequency: 1}, - "cm5": {Frequency: 1}, - "cm6": {Frequency: 1}, - "cm7": {Frequency: 1}, - "cm8": {Frequency: 1}, - "cm9": {Frequency: 1}, - }) - - for i := 0; i < b.N; i++ { - data := []byte(BenchmarkLineBatch) - data = bytes.ReplaceAll(data, []byte("123456789"), []byte(strconv.Itoa(i+123456789))) - dec := lineprotocol.NewDecoderWithBytes(data) - - b.StartTimer() - if err := decodeLine(dec, "ctest"); err != nil { - b.Fatal(err) - } - b.StopTimer() - } -} diff --git a/memoryStore_test.go.orig b/memoryStore_test.go.orig deleted file mode 100644 index b64c85b..0000000 --- a/memoryStore_test.go.orig +++ /dev/null @@ -1,504 +0,0 @@ -package main - -import ( - "fmt" - "log" - "math" - "testing" - - "github.com/ClusterCockpit/cc-metric-store/lineprotocol" -) - -var testMetrics [][]lineprotocol.Metric = [][]lineprotocol.Metric{ - {{"flops", 100.5}, {"mem_bw", 2088.67}}, - {{"flops", 180.5}, {"mem_bw", 4078.32}, {"mem_capacity", 1020}}, - {{"flops", 980.5}, {"mem_bw", 9078.32}, {"mem_capacity", 5010}}, - {{"flops", 940.5}, {"mem_bw", 9278.32}, {"mem_capacity", 6010}}, - {{"flops", 930.5}, {"mem_bw", 9378.32}, {"mem_capacity", 7010}}, - {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, - {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, - {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, - {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}, - {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}} - -var testMetricsAlt [][]lineprotocol.Metric = [][]lineprotocol.Metric{ - {{"flops", 120.5}, {"mem_bw", 2080.67}}, - {{"flops", 130.5}, {"mem_bw", 4071.32}, {"mem_capacity", 1120}}, - {{"flops", 940.5}, {"mem_bw", 9072.32}, {"mem_capacity", 5210}}, - {{"flops", 950.5}, {"mem_bw", 9273.32}, {"mem_capacity", 6310}}, - {{"flops", 960.5}, {"mem_bw", 9374.32}, {"mem_capacity", 7410}}, - {{"flops", 970.5}, {"mem_bw", 9475.32}, {"mem_capacity", 8510}}, - {{"flops", 990.5}, {"mem_bw", 9476.32}, {"mem_capacity", 8610}}, - {{"flops", 910.5}, {"mem_bw", 9477.32}, {"mem_capacity", 8710}}, - {{"flops", 920.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2810}}, - {{"flops", 930.5}, {"mem_bw", 9179.32}, {"mem_capacity", 2910}}} - -func dumpStoreBuffer(s *storeBuffer) { - log.Printf("Start TS %d\n", s.start) - ctr := 0 - - for _, val := range s.store { - fmt.Printf("%f\t", val) - ctr++ - - if ctr == 10 { - fmt.Printf("\n") - ctr = 0 - } - } -} - -func printMemStore(m *MemoryStore) { - log.Println("########################") - log.Printf("Frequency %d, Metrics %d Slots %d\n", - m.frequency, m.numMetrics, m.numSlots) - log.Println("##Offsets") - for key, val := range m.offsets { - log.Printf("\t%s = %d\n", key, val) - } - log.Println("##Containers") - for key, c := range m.containers { - log.Printf("ID %s\n", key) - log.Println("###current") - dumpStoreBuffer(c.current) - log.Println("###next") - dumpStoreBuffer(c.next) - } - log.Println("########################") -} - -//############################ -//#### Whitebox tests ######## -//############################ -func TestAddMetricSimple(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - // printMemStore(m) - - m.AddMetrics(key, 1584022800, testMetrics[0]) - m.AddMetrics(key, 1584022890, testMetrics[1]) - - want := testMetrics[0][0].Value - got := m.containers[key].current.store[0] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - - want = testMetrics[1][2].Value - got = m.containers[key].current.store[21] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - // printMemStore(m) -} - -func TestAddMetricReplace(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - // printMemStore(m) - - m.AddMetrics(key, 1584022800, testMetrics[0]) - m.AddMetrics(key, 1584022800, testMetrics[1]) - - want := testMetrics[1][0].Value - got := m.containers[key].current.store[0] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - - m.AddMetrics(key, 1584022850, testMetrics[0]) - want = testMetrics[0][0].Value - got = m.containers[key].current.store[0] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - - m.AddMetrics(key, 1584022860, testMetrics[1]) - want = testMetrics[0][0].Value - got = m.containers[key].current.store[0] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - // printMemStore(m) -} - -func TestAddMetricSwitch(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - // printMemStore(m) - - m.AddMetrics(key, 1584023000, testMetrics[0]) - m.AddMetrics(key, 1584023580, testMetrics[1]) - - want := testMetrics[1][2].Value - got := m.containers[key].current.store[29] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - - m.AddMetrics(key, 1584023600, testMetrics[2]) - want = testMetrics[2][2].Value - got = m.containers[key].current.store[20] - if got != want { - t.Errorf("Want %f got %f\n", want, got) - } - - // printMemStore(m) -} - -//############################ -//#### Blackbox tests ######## -//############################ - -func TestAddMetricOutOfBounds(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 30, 60) - - err := m.AddMetrics(key, 1584023000, testMetrics[0]) - if err != nil { - t.Errorf("Got error 1584023000\n") - } - err = m.AddMetrics(key, 1584026600, testMetrics[0]) - if err == nil { - t.Errorf("Got no error 1584026600\n") - } - err = m.AddMetrics(key, 1584021580, testMetrics[1]) - if err == nil { - t.Errorf("Got no error 1584021580\n") - } - err = m.AddMetrics(key, 1584024580, testMetrics[1]) - if err != nil { - t.Errorf("Got error 1584024580\n") - } - err = m.AddMetrics(key, 1584091580, testMetrics[1]) - if err == nil { - t.Errorf("Got no error 1584091580\n") - } - err = m.AddMetrics(key, 1584024780, testMetrics[0]) - if err != nil { - t.Errorf("Got error 1584024780\n") - } -} - -func TestGetMetricPlainCurrent(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - - // printMemStore(m) - val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 9 { - t.Errorf("Want 9. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - if val[8] != 970.5 { - t.Errorf("Want 970.5 Got %f\n", val[9]) - } -} - -func TestGetMetricPlainNext(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 9 { - t.Errorf("Want 9. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - if val[8] != 970.5 { - t.Errorf("Want 970.5 Got %f\n", val[9]) - } -} - -func TestGetMetricGap(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*120), testMetrics[i]) - } - - val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023600) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 10 { - t.Errorf("Want 10. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - if !math.IsNaN(float64(val[1])) { - t.Errorf("Want NaN Got %f\n", val[1]) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - - // fmt.Println(val) -} - -func TestGetMetricSplit(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584023200, 1584023860) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023200 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 11 { - t.Errorf("Want 11. Got %d\n", len(val)) - } - if val[0] != 940.5 { - t.Errorf("Want 940.5 Got %f\n", val[0]) - } - if val[10] != 950.5 { - t.Errorf("Want 950.5 Got %f\n", val[0]) - } -} - -func TestGetMetricExceedNext(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023400) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 6 { - t.Errorf("Want 6. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - if val[5] != 980.5 { - t.Errorf("Want 980.5 Got %f\n", val[5]) - } -} - -func TestGetMetricExceedNextSplit(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023900) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 15 { - t.Errorf("Want 14. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 100.5 Got %f\n", val[0]) - } - if val[14] != 960.5 { - t.Errorf("Want 960.5 Got %f\n", val[13]) - } -} - -func TestGetMetricExceedCurrent(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584023800, 1584027900) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023800 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 7 { - t.Errorf("Want 6. Got %d\n", len(val)) - } - if val[0] != 950.5 { - t.Errorf("Want 950.5 Got %f\n", val[0]) - } - if val[6] != 930.5 { - t.Errorf("Want 930.5 Got %f\n", val[5]) - } -} - -func TestGetMetricExceedCurrentSplit(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584023120, 1584027900) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023120 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 18 { - t.Errorf("Want 18. Got %d\n", len(val)) - } - if val[0] != 980.5 { - t.Errorf("Want 950.5 Got %f\n", val[0]) - } - if val[17] != 930.5 { - t.Errorf("Want 930.5 Got %f\n", val[17]) - } -} - -func TestGetMetricExceedBoth(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584027900) - - if err != nil { - t.Errorf("Got error\n") - } - if tsFrom != 1584023000 { - t.Errorf("Start ts differs: %d\n", tsFrom) - } - if len(val) != 20 { - t.Errorf("Want 20. Got %d\n", len(val)) - } - if val[0] != 100.5 { - t.Errorf("Want 950.5 Got %f\n", val[0]) - } - if val[19] != 930.5 { - t.Errorf("Want 930.5 Got %f\n", val[17]) - } -} - -func TestGetMetricOutUpper(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - _, _, err := m.GetMetric(key, "flops", 1584032800, 1584037900) - - if err == nil { - t.Errorf("Got no error\n") - } -} - -func TestGetMetricOutLower(t *testing.T) { - key := "m1220" - m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) - - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) - } - for i := 0; i < len(testMetrics); i++ { - m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) - } - - // printMemStore(m) - - _, _, err := m.GetMetric(key, "flops", 1584002800, 1584007900) - - if err == nil { - t.Errorf("Got no error\n") - } -} diff --git a/memstore_test.go b/memstore_test.go deleted file mode 100644 index 8821e7a..0000000 --- a/memstore_test.go +++ /dev/null @@ -1,512 +0,0 @@ -package main - -import ( - "fmt" - "math" - "math/rand" - "sync" - "testing" -) - -func TestMemoryStoreBasics(t *testing.T) { - frequency := int64(10) - start, count := int64(100), int64(5000) - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: frequency}, - "b": {Frequency: frequency * 2}, - }) - - for i := int64(0); i < count; i++ { - err := store.Write([]string{"testhost"}, start+i*frequency, []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i / 2)}, - }) - if err != nil { - t.Error(err) - return - } - } - - sel := Selector{{String: "testhost"}} - adata, from, to, err := store.Read(sel, "a", start, start+count*frequency) - if err != nil || from != start || to != start+count*frequency { - t.Error(err) - return - } - bdata, _, _, err := store.Read(sel, "b", start, start+count*frequency) - if err != nil { - t.Error(err) - return - } - - if len(adata) != int(count) || len(bdata) != int(count/2) { - t.Error("unexpected count of returned values") - return - } - - for i := 0; i < int(count); i++ { - if adata[i] != Float(i) { - t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], Float(i)) - return - } - } - - for i := 0; i < int(count/2); i++ { - if bdata[i] != Float(i) && bdata[i] != Float(i-1) { - t.Errorf("incorrect value for metric b (%f) at index %d", bdata[i], i) - return - } - } - -} - -func TestMemoryStoreTooMuchWrites(t *testing.T) { - frequency := int64(10) - count := BUFFER_CAP*3 + 10 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: frequency}, - "b": {Frequency: frequency * 2}, - "c": {Frequency: frequency / 2}, - "d": {Frequency: frequency * 3}, - }) - - start := int64(100) - for i := 0; i < count; i++ { - if err := store.Write([]string{"test"}, start+int64(i)*frequency, []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i / 2)}, - {Name: "c", Value: Float(i * 2)}, - {Name: "d", Value: Float(i / 3)}, - }); err != nil { - t.Fatal(err) - } - } - - end := start + int64(count)*frequency - data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end) - if len(data) != count || from != start || to != end || err != nil { - t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) - } - - data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end) - if len(data) != count/2 || from != start || to != end || err != nil { - t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) - } - - data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end) - if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil { - t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) - } - - data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end) - if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil { - t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3) - t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) - } -} - -func TestMemoryStoreOutOfBounds(t *testing.T) { - count := 2000 - toffset := 1000 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 60}, - }) - - for i := 0; i < count; i++ { - if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []Metric{ - {Name: "a", Value: Float(i)}, - }); err != nil { - t.Fatal(err) - } - } - - sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} - data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) - if err != nil { - t.Fatal(err) - } - - if from/60 != int64(toffset)/60 || to/60 != int64(toffset+count*60)/60 { - t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60) - } - - if len(data) != count || data[0] != 0 || data[len(data)-1] != Float((count-1)) { - t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)", - len(data), data[0], data[len(data)-1], count, 0., Float(count-1)) - } - - testfrom, testlen := int64(100000000), int64(10000) - data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) - if len(data) != 0 || from != testfrom || to != testfrom || err != nil { - t.Fatal("Unexpected data returned when reading range after valid data") - } - - testfrom, testlen = 0, 10 - data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) - if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil { - t.Fatal("Unexpected data returned when reading range before valid data") - } -} - -func TestMemoryStoreMissingDatapoints(t *testing.T) { - count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1}, - }) - - for i := 0; i < count; i++ { - if i%3 != 0 { - continue - } - - err := store.Write([]string{"testhost"}, int64(i), []Metric{ - {Name: "a", Value: Float(i)}, - }) - if err != nil { - t.Error(err) - return - } - } - - sel := Selector{{String: "testhost"}} - adata, _, _, err := store.Read(sel, "a", 0, int64(count)) - if err != nil { - t.Error(err) - return - } - - if len(adata) != count-2 { - t.Error("unexpected len") - return - } - - for i := 0; i < count-2; i++ { - if i%3 == 0 { - if adata[i] != Float(i) { - t.Error("unexpected value") - return - } - } else { - if !math.IsNaN(float64(adata[i])) { - t.Errorf("NaN expected (i = %d, value = %f)\n", i, adata[i]) - return - } - } - } -} - -func TestMemoryStoreAggregation(t *testing.T) { - count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1, Aggregation: SumAggregation}, - }) - - for i := 0; i < count; i++ { - err := store.Write([]string{"host0", "cpu0"}, int64(i), []Metric{ - {Name: "a", Value: Float(i) / 2.}, - }) - if err != nil { - t.Error(err) - return - } - - err = store.Write([]string{"host0", "cpu1"}, int64(i), []Metric{ - {Name: "a", Value: Float(i) * 2.}, - }) - if err != nil { - t.Error(err) - return - } - } - - adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) - if err != nil { - t.Error(err) - return - } - - if len(adata) != count || from != 0 || to != int64(count) { - t.Error("unexpected length or time range of returned data") - return - } - - for i := 0; i < count; i++ { - expected := Float(i)/2. + Float(i)*2. - if adata[i] != expected { - t.Errorf("expected: %f, got: %f", expected, adata[i]) - return - } - } -} - -func TestMemoryStoreStats(t *testing.T) { - count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1}, - "b": {Frequency: 1, Aggregation: AvgAggregation}, - }) - - sel1 := []string{"cluster", "host1"} - sel2 := []string{"cluster", "host2", "left"} - sel3 := []string{"cluster", "host2", "right"} - - samples := 0 - asum, amin, amax := 0., math.MaxFloat32, -math.MaxFloat32 - bsum, bmin, bmax := 0., math.MaxFloat32, -math.MaxFloat32 - - for i := 0; i < count; i++ { - if i%5 == 0 { - // Skip some writes, test if samples is calculated correctly - continue - } - - samples += 1 - a := float64(rand.Int()%100 - 50) - asum += a - amin = math.Min(amin, a) - amax = math.Max(amax, a) - b := float64(rand.Int()%100 - 50) - bsum += b * 2 - bmin = math.Min(bmin, b) - bmax = math.Max(bmax, b) - - store.Write(sel1, int64(i), []Metric{ - {Name: "a", Value: Float(a)}, - }) - store.Write(sel2, int64(i), []Metric{ - {Name: "b", Value: Float(b)}, - }) - store.Write(sel3, int64(i), []Metric{ - {Name: "b", Value: Float(b)}, - }) - } - - stats, from, to, err := store.Stats(Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count)) - if err != nil { - t.Fatal(err) - } - - if from != 1 || to != int64(count) || stats.Samples != samples { - t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples) - } - - if stats.Avg != Float(asum/float64(samples)) || stats.Min != Float(amin) || stats.Max != Float(amax) { - t.Fatalf("wrong stats: %#v\n", stats) - } - - stats, from, to, err = store.Stats(Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count)) - if err != nil { - t.Fatal(err) - } - - if from != 1 || to != int64(count) || stats.Samples != samples*2 { - t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2) - } - - if stats.Avg != Float(bsum/float64(samples*2)) || stats.Min != Float(bmin) || stats.Max != Float(bmax) { - t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax) - } -} - -func TestMemoryStoreArchive(t *testing.T) { - store1 := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1}, - "b": {Frequency: 1}, - }) - - count := 2000 - for i := 0; i < count; i++ { - err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i * 2)}, - }) - if err != nil { - t.Error(err) - return - } - } - - // store1.DebugDump(bufio.NewWriter(os.Stdout)) - - archiveRoot := t.TempDir() - _, err := store1.ToCheckpoint(archiveRoot, 100, 100+int64(count/2)) - if err != nil { - t.Error(err) - return - } - - _, err = store1.ToCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count)) - if err != nil { - t.Error(err) - return - } - - store2 := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1}, - "b": {Frequency: 1}, - }) - n, err := store2.FromCheckpoint(archiveRoot, 100) - if err != nil { - t.Error(err) - return - } - - sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} - adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count)) - if err != nil { - t.Error(err) - return - } - - if n != 2 || len(adata) != count || from != 100 || to != int64(100+count) { - t.Errorf("unexpected: n=%d, len=%d, from=%d, to=%d\n", n, len(adata), from, to) - return - } - - for i := 0; i < count; i++ { - expected := Float(i) - if adata[i] != expected { - t.Errorf("expected: %f, got: %f", expected, adata[i]) - } - } -} - -func TestMemoryStoreFree(t *testing.T) { - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1}, - "b": {Frequency: 2}, - }) - - count := 3000 - sel := []string{"cluster", "host", "1"} - for i := 0; i < count; i++ { - err := store.Write(sel, int64(i), []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i)}, - }) - if err != nil { - t.Fatal(err) - } - } - - n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100) - if err != nil { - t.Fatal(err) - } - - if n != 3 { - t.Fatal("two buffers expected to be released") - } - - adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) - if err != nil { - t.Fatal(err) - } - - if from != int64(BUFFER_CAP*2) || to != int64(count) || len(adata) != count-2*BUFFER_CAP { - t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata)) - } - - // bdata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count)) - // if err != nil { - // t.Fatal(err) - // } - - // if from != int64(BUFFER_CAP*2) || to != int64(count) || len(bdata) != (count-2*BUFFER_CAP)/2 { - // t.Fatalf("unexpected values from call to `Read`: from=%d (expected: %d), to=%d (expected: %d), len=%d (expected: %d)", - // from, BUFFER_CAP*2, to, count, len(bdata), (count-2*BUFFER_CAP)/2) - // } - - if adata[0] != Float(BUFFER_CAP*2) || adata[len(adata)-1] != Float(count-1) { - t.Fatal("wrong values") - } -} - -func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { - frequency := int64(5) - count := b.N - goroutines := 4 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: frequency}, - }) - - var wg sync.WaitGroup - wg.Add(goroutines) - - for g := 0; g < goroutines; g++ { - go func(g int) { - host := fmt.Sprintf("host%d", g) - for i := 0; i < count; i++ { - store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []Metric{ - {Name: "a", Value: Float(i)}, - }) - } - wg.Done() - }(g) - } - - wg.Wait() - b.StopTimer() - - for g := 0; g < goroutines; g++ { - host := fmt.Sprintf("host%d", g) - sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} - adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) - if err != nil { - b.Error(err) - return - } - - if len(adata) != count { - b.Error("unexpected count") - return - } - - for i := 0; i < count; i++ { - expected := Float(i) - if adata[i] != expected { - b.Error("incorrect value for metric a") - return - } - } - } -} - -func BenchmarkMemoryStoreAggregation(b *testing.B) { - b.StopTimer() - count := 2000 - store := NewMemoryStore(map[string]MetricConfig{ - "flops_any": {Frequency: 1, Aggregation: AvgAggregation}, - }) - - sel := []string{"testcluster", "host123", "cpu0"} - for i := 0; i < count; i++ { - sel[2] = "cpu0" - err := store.Write(sel, int64(i), []Metric{ - {Name: "flops_any", Value: Float(i)}, - }) - if err != nil { - b.Fatal(err) - } - - sel[2] = "cpu1" - err = store.Write(sel, int64(i), []Metric{ - {Name: "flops_any", Value: Float(i)}, - }) - if err != nil { - b.Fatal(err) - } - } - - b.StartTimer() - for n := 0; n < b.N; n++ { - data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) - if err != nil { - b.Fatal(err) - } - - if len(data) != count || from != 0 || to != int64(count) { - b.Fatal() - } - } -}