diff --git a/.gitignore b/.gitignore index 66fd13c..c8e6d39 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +/cc-metric-store # Test binary, built with `go test -c` *.test diff --git a/api.go b/api.go new file mode 100644 index 0000000..faefe10 --- /dev/null +++ b/api.go @@ -0,0 +1,104 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" +) + +type HostData struct { + Host string `json:"host"` + Start int64 `json:"start"` + Data []float64 `json:"data"` +} + +type MetricData struct { + Hosts []HostData `json:"hosts"` +} + +type TimeseriesNodeResponse map[string]MetricData + +func handleTimeseriesNode(rw http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + cluster := vars["cluster"] + from, err := strconv.ParseInt(vars["from"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + to, err := strconv.ParseInt(vars["to"], 10, 64) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + + values := r.URL.Query() + hosts := values["host"] + metrics := values["metric"] + if len(hosts) < 1 || len(metrics) < 1 { + http.Error(rw, "no hosts or metrics specified", http.StatusBadRequest) + return + } + + response := TimeseriesNodeResponse{} + store := metricStores["node"] + for _, metric := range metrics { + hostsdata := []HostData{} + for _, host := range hosts { + key := cluster + ":" + host + data, start, err := store.GetMetric(key, metric, from, to) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + hostsdata = append(hostsdata, HostData{ + Host: host, + Start: start, + Data: data, + }) + } + response[metric] = MetricData{ + Hosts: hostsdata, + } + } + + rw.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(rw).Encode(response) + if err != nil { + log.Println(err.Error()) + } +} + +func StartApiServer(address string, done chan bool) error { + r := mux.NewRouter() + + r.HandleFunc("/api/{cluster}/timeseries/node/{from:[0-9]+}/{to:[0-9]+}", handleTimeseriesNode) + + server := &http.Server{ + Handler: r, + Addr: address, + WriteTimeout: 15 * time.Second, + ReadTimeout: 15 * time.Second, + } + + go func() { + log.Printf("API http endpoint listening on '%s'\n", address) + err := server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + log.Println(err) + } + }() + + for { + _ = <-done + err := server.Shutdown(context.Background()) + log.Println("API server shut down") + return err + } +} diff --git a/go.mod b/go.mod index c130b71..9339fad 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.5.2 // indirect + github.com/gorilla/mux v1.8.0 github.com/nats-io/nats-server/v2 v2.2.6 // indirect github.com/nats-io/nats.go v1.11.0 ) diff --git a/go.sum b/go.sum index 128881f..18f2209 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= diff --git a/lineprotocol/receiver.go b/lineprotocol/receiver.go index 041292f..7209936 100644 --- a/lineprotocol/receiver.go +++ b/lineprotocol/receiver.go @@ -89,10 +89,10 @@ func ReceiveNats(address string, handleLine func(line *Line), done chan bool) er return err } + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) for { - stop := <-done - if stop { - return nil - } + _ = <-done + log.Println("NATS connection closed") + return nil } } diff --git a/metric-store.go b/metric-store.go index 8ebff1c..38ec923 100644 --- a/metric-store.go +++ b/metric-store.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "sync" "syscall" "github.com/ClusterCockpit/cc-metric-store/lineprotocol" @@ -61,7 +62,7 @@ func buildKey(line *lineprotocol.Line) (string, error) { } func handleLine(line *lineprotocol.Line) { - // log.Printf("line: %v\n", line) + log.Printf("line: %v (t=%d)\n", line, line.Ts.Unix()) store := metricStores[line.Measurement] key, err := buildKey(line) @@ -85,10 +86,22 @@ func main() { go func() { _ = <-sigs done <- true + close(done) + log.Println("shuting down") + }() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + StartApiServer(":8080", done) + wg.Done() }() err := lineprotocol.ReceiveNats("nats://localhost:4222", handleLine, done) if err != nil { log.Fatal(err) } + + wg.Wait() }