mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2026-03-13 03:57:30 +01:00
Port to newest cclib. Use metricstore as library.
This commit is contained in:
25
cmd/cc-metric-store/cli.go
Normal file
25
cmd/cc-metric-store/cli.go
Normal file
@@ -0,0 +1,25 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-metric-store.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package main provides the entry point for the ClusterCockpit metric store server.
|
||||
// This file defines all command-line flags and their default values.
|
||||
package main
|
||||
|
||||
import "flag"
|
||||
|
||||
var (
|
||||
flagGops, flagVersion, flagDev, flagLogDateTime bool
|
||||
flagConfigFile, flagLogLevel string
|
||||
)
|
||||
|
||||
func cliInit() {
|
||||
flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)")
|
||||
flag.BoolVar(&flagDev, "dev", false, "Enable development component: Swagger UI")
|
||||
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
|
||||
flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages")
|
||||
flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`")
|
||||
flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug, info, warn (default), err, crit]`")
|
||||
flag.Parse()
|
||||
}
|
||||
@@ -1,32 +1,25 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved.
|
||||
// All rights reserved. This file is part of cc-metric-store.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/api"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/nats"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/runtime"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/runtimeEnv"
|
||||
"github.com/google/gops/agent"
|
||||
httpSwagger "github.com/swaggo/http-swagger"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -35,147 +28,148 @@ var (
|
||||
version string
|
||||
)
|
||||
|
||||
func main() {
|
||||
var configFile string
|
||||
var enableGopsAgent, flagVersion, flagDev bool
|
||||
flag.StringVar(&configFile, "config", "./config.json", "configuration file")
|
||||
flag.BoolVar(&enableGopsAgent, "gops", false, "Listen via github.com/google/gops/agent")
|
||||
flag.BoolVar(&flagDev, "dev", false, "Enable development Swagger UI component")
|
||||
flag.BoolVar(&flagVersion, "version", false, "Show version information and exit")
|
||||
flag.Parse()
|
||||
func printVersion() {
|
||||
fmt.Printf("Version:\t%s\n", version)
|
||||
fmt.Printf("Git hash:\t%s\n", commit)
|
||||
fmt.Printf("Build time:\t%s\n", date)
|
||||
}
|
||||
|
||||
if flagVersion {
|
||||
fmt.Printf("Version:\t%s\n", version)
|
||||
fmt.Printf("Git hash:\t%s\n", commit)
|
||||
fmt.Printf("Build time:\t%s\n", date)
|
||||
os.Exit(0)
|
||||
func initGops() error {
|
||||
if !flagGops && !config.Keys.Debug.EnableGops {
|
||||
return nil
|
||||
}
|
||||
|
||||
startupTime := time.Now()
|
||||
config.Init(configFile)
|
||||
memorystore.Init(config.Keys.Metrics)
|
||||
ms := memorystore.GetMemoryStore()
|
||||
if err := agent.Listen(agent.Options{}); err != nil {
|
||||
return fmt.Errorf("starting gops agent: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if enableGopsAgent || config.Keys.Debug.EnableGops {
|
||||
if err := agent.Listen(agent.Options{}); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
func initConfiguration() error {
|
||||
ccconf.Init(flagConfigFile)
|
||||
|
||||
cfg := ccconf.GetPackageConfig("main")
|
||||
if cfg == nil {
|
||||
return fmt.Errorf("main configuration must be present")
|
||||
}
|
||||
|
||||
d, err := time.ParseDuration(config.Keys.Checkpoints.Restore)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
config.Init(cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func initSubsystems() error {
|
||||
// Initialize nats client
|
||||
natsConfig := ccconf.GetPackageConfig("nats")
|
||||
if err := nats.Init(natsConfig); err != nil {
|
||||
cclog.Warnf("initializing (optional) nats client: %s", err.Error())
|
||||
}
|
||||
nats.Connect()
|
||||
|
||||
restoreFrom := startupTime.Add(-d)
|
||||
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
||||
files, err := ms.FromCheckpointFiles(config.Keys.Checkpoints.RootDir, restoreFrom.Unix())
|
||||
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
|
||||
if err != nil {
|
||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||
} else {
|
||||
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
|
||||
}
|
||||
|
||||
// Try to use less memory by forcing a GC run here and then
|
||||
// lowering the target percentage. The default of 100 means
|
||||
// that only once the ratio of new allocations execeds the
|
||||
// previously active heap, a GC is triggered.
|
||||
// Forcing a GC here will set the "previously active heap"
|
||||
// to a minumum.
|
||||
runtime.GC()
|
||||
if loadedData > 1000 && os.Getenv("GOGC") == "" {
|
||||
debug.SetGCPercent(10)
|
||||
}
|
||||
|
||||
ctx, shutdown := context.WithCancel(context.Background())
|
||||
return nil
|
||||
}
|
||||
|
||||
func runServer(ctx context.Context) error {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
|
||||
memorystore.Retention(&wg, ctx)
|
||||
memorystore.Checkpointing(&wg, ctx)
|
||||
memorystore.Archiving(&wg, ctx)
|
||||
avro.DataStaging(&wg, ctx)
|
||||
|
||||
r := http.NewServeMux()
|
||||
api.MountRoutes(r)
|
||||
|
||||
if flagDev {
|
||||
log.Print("Enable Swagger UI!")
|
||||
r.HandleFunc("GET /swagger/", httpSwagger.Handler(
|
||||
httpSwagger.URL("http://"+config.Keys.HttpConfig.Address+"/swagger/doc.json")))
|
||||
}
|
||||
|
||||
server := &http.Server{
|
||||
Handler: r,
|
||||
Addr: config.Keys.HttpConfig.Address,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
// Start http or https server
|
||||
listener, err := net.Listen("tcp", config.Keys.HttpConfig.Address)
|
||||
if err != nil {
|
||||
log.Fatalf("starting http listener failed: %v", err)
|
||||
}
|
||||
|
||||
if config.Keys.HttpConfig.CertFile != "" && config.Keys.HttpConfig.KeyFile != "" {
|
||||
cert, err := tls.LoadX509KeyPair(config.Keys.HttpConfig.CertFile, config.Keys.HttpConfig.KeyFile)
|
||||
if err != nil {
|
||||
log.Fatalf("loading X509 keypair failed: %v", err)
|
||||
}
|
||||
listener = tls.NewListener(listener, &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
CipherSuites: []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
},
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
})
|
||||
fmt.Printf("HTTPS server listening at %s...\n", config.Keys.HttpConfig.Address)
|
||||
// Initialize metric store if configuration is provided
|
||||
mscfg := ccconf.GetPackageConfig("metric-store")
|
||||
if mscfg != nil {
|
||||
metricstore.Init(mscfg, &wg)
|
||||
} else {
|
||||
fmt.Printf("HTTP server listening at %s...\n", config.Keys.HttpConfig.Address)
|
||||
return fmt.Errorf("missing metricstore configuration")
|
||||
}
|
||||
|
||||
// Initialize HTTP server
|
||||
srv, err := NewServer(version, commit, date)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating server: %w", err)
|
||||
}
|
||||
|
||||
// Channel to collect errors from server
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// Start HTTP server
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err = server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatalf("starting server failed: %v", err)
|
||||
if err := srv.Start(ctx); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// Handle shutdown signals
|
||||
wg.Add(1)
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-sigs
|
||||
runtimeEnv.SystemdNotifiy(false, "Shutting down ...")
|
||||
server.Shutdown(context.Background())
|
||||
shutdown()
|
||||
memorystore.Shutdown()
|
||||
select {
|
||||
case <-sigs:
|
||||
cclog.Info("Shutdown signal received")
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
runtime.SystemdNotify(false, "Shutting down ...")
|
||||
srv.Shutdown(ctx)
|
||||
}()
|
||||
|
||||
if config.Keys.Nats != nil {
|
||||
for _, natsConf := range config.Keys.Nats {
|
||||
// TODO: When multiple nats configs share a URL, do a single connect.
|
||||
wg.Add(1)
|
||||
nc := natsConf
|
||||
go func() {
|
||||
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
|
||||
err := api.ReceiveNats(nc, ms, 1, ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
runtime.SystemdNotify(true, "running")
|
||||
|
||||
// Wait for completion or errors
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errChan)
|
||||
}()
|
||||
|
||||
// Wait for either server startup error or shutdown completion
|
||||
if err := <-errChan; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
runtimeEnv.SystemdNotifiy(true, "running")
|
||||
wg.Wait()
|
||||
log.Print("Graceful shutdown completed!")
|
||||
cclog.Print("Graceful shutdown completed!")
|
||||
return nil
|
||||
}
|
||||
|
||||
func run() error {
|
||||
cliInit()
|
||||
|
||||
if flagVersion {
|
||||
printVersion()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialize logger
|
||||
cclog.Init(flagLogLevel, flagLogDateTime)
|
||||
|
||||
// Initialize gops agent
|
||||
if err := initGops(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize subsystems in dependency order:
|
||||
// 1. Load configuration from config.json
|
||||
// 2. Initialize subsystems like nats
|
||||
|
||||
// Load configuration
|
||||
if err := initConfiguration(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize subsystems (nats, etc.)
|
||||
if err := initSubsystems(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Run server with context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
return runServer(ctx)
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := run(); err != nil {
|
||||
cclog.Error(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
142
cmd/cc-metric-store/server.go
Normal file
142
cmd/cc-metric-store/server.go
Normal file
@@ -0,0 +1,142 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-metric-store.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package main provides the entry point for the ClusterCockpit metric store server.
|
||||
// This file contains HTTP server setup, routing configuration.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/nats"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/runtime"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/api"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
httpSwagger "github.com/swaggo/http-swagger"
|
||||
)
|
||||
|
||||
// Server encapsulates the HTTP server state and dependencies
|
||||
type Server struct {
|
||||
router *http.ServeMux
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
// NewServer creates and initializes a new Server instance
|
||||
func NewServer(version, commit, buildDate string) (*Server, error) {
|
||||
s := &Server{
|
||||
router: http.NewServeMux(),
|
||||
}
|
||||
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) init() error {
|
||||
api.MountRoutes(s.router)
|
||||
|
||||
if flagDev {
|
||||
cclog.Print("Enable Swagger UI!")
|
||||
s.router.HandleFunc("GET /swagger/", httpSwagger.Handler(
|
||||
httpSwagger.URL("http://"+config.Keys.Address+"/swagger/doc.json")))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Server timeout defaults (in seconds)
|
||||
const (
|
||||
defaultReadTimeout = 30
|
||||
defaultWriteTimeout = 30
|
||||
)
|
||||
|
||||
func (s *Server) Start(ctx context.Context) error {
|
||||
// Use configurable timeouts with defaults
|
||||
readTimeout := time.Duration(defaultReadTimeout) * time.Second
|
||||
writeTimeout := time.Duration(defaultWriteTimeout) * time.Second
|
||||
|
||||
s.server = &http.Server{
|
||||
ReadTimeout: readTimeout,
|
||||
WriteTimeout: writeTimeout,
|
||||
Handler: s.router,
|
||||
Addr: config.Keys.Address,
|
||||
}
|
||||
|
||||
// Start http or https server
|
||||
listener, err := net.Listen("tcp", config.Keys.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("starting listener on '%s': %w", config.Keys.Address, err)
|
||||
}
|
||||
|
||||
if config.Keys.CertFile != "" && config.Keys.KeyFile != "" {
|
||||
cert, err := tls.LoadX509KeyPair(
|
||||
config.Keys.CertFile, config.Keys.KeyFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("loading X509 keypair (check 'https-cert-file' and 'https-key-file' in config.json): %w", err)
|
||||
}
|
||||
listener = tls.NewListener(listener, &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
CipherSuites: []uint16{
|
||||
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
|
||||
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
|
||||
},
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
})
|
||||
cclog.Infof("HTTPS server listening at %s...", config.Keys.Address)
|
||||
} else {
|
||||
cclog.Infof("HTTP server listening at %s...", config.Keys.Address)
|
||||
}
|
||||
|
||||
// Because this program will want to bind to a privileged port (like 80), the listener must
|
||||
// be established first, then the user can be changed, and after that,
|
||||
// the actual http server can be started.
|
||||
if err := runtime.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil {
|
||||
return fmt.Errorf("dropping privileges: %w", err)
|
||||
}
|
||||
|
||||
// Handle context cancellation for graceful shutdown
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||
cclog.Errorf("Server shutdown error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err = s.server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("server failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) Shutdown(ctx context.Context) {
|
||||
// Create a shutdown context with timeout
|
||||
shutdownCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
nc := nats.GetClient()
|
||||
if nc != nil {
|
||||
nc.Close()
|
||||
}
|
||||
|
||||
// First shut down the server gracefully (waiting for all ongoing requests)
|
||||
if err := s.server.Shutdown(shutdownCtx); err != nil {
|
||||
cclog.Errorf("Server shutdown error: %v", err)
|
||||
}
|
||||
|
||||
// Archive all the metric store data
|
||||
metricstore.Shutdown()
|
||||
}
|
||||
Reference in New Issue
Block a user