mirror of
				https://github.com/ClusterCockpit/cc-metric-store.git
				synced 2025-11-04 02:35:08 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			279 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package main
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/ed25519"
 | 
						|
	"encoding/base64"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"log"
 | 
						|
	"net/http"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang-jwt/jwt/v4"
 | 
						|
	"github.com/gorilla/mux"
 | 
						|
)
 | 
						|
 | 
						|
// Example:
 | 
						|
//	{
 | 
						|
//		"metrics": ["flops_sp", "flops_dp"]
 | 
						|
//		"selectors": [["emmy", "host123", "cpu", "0"], ["emmy", "host123", "cpu", "1"]]
 | 
						|
//	}
 | 
						|
type ApiRequestBody struct {
 | 
						|
	Metrics   []string   `json:"metrics"`
 | 
						|
	Selectors []Selector `json:"selectors"`
 | 
						|
}
 | 
						|
 | 
						|
type ApiMetricData struct {
 | 
						|
	From int64   `json:"from"`
 | 
						|
	To   int64   `json:"to"`
 | 
						|
	Data []Float `json:"data"`
 | 
						|
}
 | 
						|
 | 
						|
type ApiStatsData struct {
 | 
						|
	From    int64 `json:"from"`
 | 
						|
	To      int64 `json:"to"`
 | 
						|
	Samples int   `json:"samples"`
 | 
						|
	Avg     Float `json:"avg"`
 | 
						|
	Min     Float `json:"min"`
 | 
						|
	Max     Float `json:"max"`
 | 
						|
}
 | 
						|
 | 
						|
func handleTimeseries(rw http.ResponseWriter, r *http.Request) {
 | 
						|
	vars := mux.Vars(r)
 | 
						|
	from, err := strconv.ParseInt(vars["from"], 10, 64)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	to, err := strconv.ParseInt(vars["to"], 10, 64)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if r.Method != http.MethodPost {
 | 
						|
		http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	bodyDec := json.NewDecoder(r.Body)
 | 
						|
	var reqBody ApiRequestBody
 | 
						|
	err = bodyDec.Decode(&reqBody)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	res := make([]map[string]ApiMetricData, 0, len(reqBody.Selectors))
 | 
						|
	for _, selector := range reqBody.Selectors {
 | 
						|
		metrics := make(map[string]ApiMetricData)
 | 
						|
		for _, metric := range reqBody.Metrics {
 | 
						|
			data, f, t, err := memoryStore.Read(selector, metric, from, to)
 | 
						|
			if err != nil {
 | 
						|
				http.Error(rw, err.Error(), http.StatusInternalServerError)
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			metrics[metric] = ApiMetricData{
 | 
						|
				From: f,
 | 
						|
				To:   t,
 | 
						|
				Data: data,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		res = append(res, metrics)
 | 
						|
	}
 | 
						|
 | 
						|
	rw.Header().Set("Content-Type", "application/json")
 | 
						|
	err = json.NewEncoder(rw).Encode(res)
 | 
						|
	if err != nil {
 | 
						|
		log.Println(err.Error())
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func handleStats(rw http.ResponseWriter, r *http.Request) {
 | 
						|
	vars := mux.Vars(r)
 | 
						|
	from, err := strconv.ParseInt(vars["from"], 10, 64)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	to, err := strconv.ParseInt(vars["to"], 10, 64)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if r.Method != http.MethodPost {
 | 
						|
		http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	bodyDec := json.NewDecoder(r.Body)
 | 
						|
	var reqBody ApiRequestBody
 | 
						|
	err = bodyDec.Decode(&reqBody)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	res := make([]map[string]ApiStatsData, 0, len(reqBody.Selectors))
 | 
						|
	for _, selector := range reqBody.Selectors {
 | 
						|
		metrics := make(map[string]ApiStatsData)
 | 
						|
		for _, metric := range reqBody.Metrics {
 | 
						|
			stats, f, t, err := memoryStore.Stats(selector, metric, from, to)
 | 
						|
			if err != nil {
 | 
						|
				http.Error(rw, err.Error(), http.StatusInternalServerError)
 | 
						|
				return
 | 
						|
			}
 | 
						|
 | 
						|
			metrics[metric] = ApiStatsData{
 | 
						|
				From:    f,
 | 
						|
				To:      t,
 | 
						|
				Samples: stats.Samples,
 | 
						|
				Avg:     stats.Avg,
 | 
						|
				Min:     stats.Min,
 | 
						|
				Max:     stats.Max,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		res = append(res, metrics)
 | 
						|
	}
 | 
						|
 | 
						|
	rw.Header().Set("Content-Type", "application/json")
 | 
						|
	err = json.NewEncoder(rw).Encode(res)
 | 
						|
	if err != nil {
 | 
						|
		log.Println(err.Error())
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func handleFree(rw http.ResponseWriter, r *http.Request) {
 | 
						|
	vars := mux.Vars(r)
 | 
						|
	to, err := strconv.ParseInt(vars["to"], 10, 64)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: lastCheckpoint might be modified by different go-routines.
 | 
						|
	// Load it using the sync/atomic package?
 | 
						|
	freeUpTo := lastCheckpoint.Unix()
 | 
						|
	if to < freeUpTo {
 | 
						|
		freeUpTo = to
 | 
						|
	}
 | 
						|
 | 
						|
	if r.Method != http.MethodPost {
 | 
						|
		http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	bodyDec := json.NewDecoder(r.Body)
 | 
						|
	var selectors []Selector
 | 
						|
	err = bodyDec.Decode(&selectors)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusBadRequest)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	n := 0
 | 
						|
	for _, sel := range selectors {
 | 
						|
		bn, err := memoryStore.Free(sel, freeUpTo)
 | 
						|
		if err != nil {
 | 
						|
			http.Error(rw, err.Error(), http.StatusInternalServerError)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		n += bn
 | 
						|
	}
 | 
						|
 | 
						|
	rw.WriteHeader(http.StatusOK)
 | 
						|
	rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
 | 
						|
}
 | 
						|
 | 
						|
func handlePeek(rw http.ResponseWriter, r *http.Request) {
 | 
						|
	vars := mux.Vars(r)
 | 
						|
	cluster := vars["cluster"]
 | 
						|
	res, err := memoryStore.Peek(cluster)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(rw, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rw.Header().Set("Content-Type", "application/json")
 | 
						|
	err = json.NewEncoder(rw).Encode(res)
 | 
						|
	if err != nil {
 | 
						|
		log.Println(err.Error())
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler {
 | 
						|
	return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
 | 
						|
		authheader := r.Header.Get("Authorization")
 | 
						|
		if authheader == "" || !strings.HasPrefix(authheader, "Bearer ") {
 | 
						|
			http.Error(rw, "Use JWT Authentication", http.StatusUnauthorized)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// The actual token is ignored for now.
 | 
						|
		// In case expiration and so on are specified, the Parse function
 | 
						|
		// already returns an error for expired tokens.
 | 
						|
		_, err := jwt.Parse(authheader[len("Bearer "):], func(t *jwt.Token) (interface{}, error) {
 | 
						|
			if t.Method != jwt.SigningMethodEdDSA {
 | 
						|
				return nil, errors.New("only Ed25519/EdDSA supported")
 | 
						|
			}
 | 
						|
 | 
						|
			return publicKey, nil
 | 
						|
		})
 | 
						|
 | 
						|
		if err != nil {
 | 
						|
			http.Error(rw, err.Error(), http.StatusUnauthorized)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Let request through...
 | 
						|
		next.ServeHTTP(rw, r)
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func StartApiServer(address string, ctx context.Context) error {
 | 
						|
	r := mux.NewRouter()
 | 
						|
 | 
						|
	r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
 | 
						|
	r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats)
 | 
						|
	r.HandleFunc("/api/{to:[0-9]+}/free", handleFree)
 | 
						|
	r.HandleFunc("/api/{cluster}/peek", handlePeek)
 | 
						|
 | 
						|
	server := &http.Server{
 | 
						|
		Handler:      r,
 | 
						|
		Addr:         address,
 | 
						|
		WriteTimeout: 15 * time.Second,
 | 
						|
		ReadTimeout:  15 * time.Second,
 | 
						|
	}
 | 
						|
 | 
						|
	if len(conf.JwtPublicKey) > 0 {
 | 
						|
		buf, err := base64.StdEncoding.DecodeString(conf.JwtPublicKey)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		publicKey := ed25519.PublicKey(buf)
 | 
						|
		server.Handler = authentication(server.Handler, publicKey)
 | 
						|
	}
 | 
						|
 | 
						|
	go func() {
 | 
						|
		log.Printf("API http endpoint listening on '%s'\n", address)
 | 
						|
		err := server.ListenAndServe()
 | 
						|
		if err != nil && err != http.ErrServerClosed {
 | 
						|
			log.Println(err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		<-ctx.Done()
 | 
						|
		err := server.Shutdown(context.Background())
 | 
						|
		log.Println("API server shut down")
 | 
						|
		return err
 | 
						|
	}
 | 
						|
}
 |