mirror of
				https://github.com/ClusterCockpit/cc-metric-store.git
				synced 2025-11-04 10:45:07 +01:00 
			
		
		
		
	Use Golangs contextes
This commit is contained in:
		
							
								
								
									
										4
									
								
								api.go
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								api.go
									
									
									
									
									
								
							@@ -187,7 +187,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n)))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func StartApiServer(address string, done chan bool) error {
 | 
			
		||||
func StartApiServer(address string, ctx context.Context) error {
 | 
			
		||||
	r := mux.NewRouter()
 | 
			
		||||
 | 
			
		||||
	r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries)
 | 
			
		||||
@@ -210,7 +210,7 @@ func StartApiServer(address string, done chan bool) error {
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		_ = <-done
 | 
			
		||||
		_ = <-ctx.Done()
 | 
			
		||||
		err := server.Shutdown(context.Background())
 | 
			
		||||
		log.Println("API server shut down")
 | 
			
		||||
		return err
 | 
			
		||||
 
 | 
			
		||||
@@ -65,7 +65,7 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for metric, b := range l.metrics {
 | 
			
		||||
		data := make([]Float, (to-from)/b.frequency)
 | 
			
		||||
		data := make([]Float, (to-from)/b.frequency+1)
 | 
			
		||||
		data, start, end, err := b.read(from, to, data)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
@@ -175,7 +176,7 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err
 | 
			
		||||
// Connect to a nats server and subscribe to "updates". This is a blocking
 | 
			
		||||
// function. handleLine will be called for each line recieved via nats.
 | 
			
		||||
// Send `true` through the done channel for gracefull termination.
 | 
			
		||||
func ReceiveNats(address string, handleLine func(line *Line), workers int, done chan bool) error {
 | 
			
		||||
func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error {
 | 
			
		||||
	nc, err := nats.Connect(nats.DefaultURL)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
@@ -194,6 +195,14 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done
 | 
			
		||||
 | 
			
		||||
			handleLine(line)
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
 | 
			
		||||
 | 
			
		||||
		_ = <-ctx.Done()
 | 
			
		||||
		err = sub.Unsubscribe()
 | 
			
		||||
	} else {
 | 
			
		||||
		msgs := make(chan *nats.Msg, 16)
 | 
			
		||||
		var wg sync.WaitGroup
 | 
			
		||||
@@ -210,14 +219,22 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done
 | 
			
		||||
 | 
			
		||||
					handleLine(line)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				wg.Done()
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
 | 
			
		||||
			msgs <- m
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_ = <-done
 | 
			
		||||
		log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
 | 
			
		||||
 | 
			
		||||
		_ = <-ctx.Done()
 | 
			
		||||
		err = sub.Unsubscribe()
 | 
			
		||||
		close(msgs)
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
	}
 | 
			
		||||
@@ -226,12 +243,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
 | 
			
		||||
	for {
 | 
			
		||||
		_ = <-done
 | 
			
		||||
		sub.Unsubscribe()
 | 
			
		||||
		nc.Close()
 | 
			
		||||
		log.Println("NATS connection closed")
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	nc.Close()
 | 
			
		||||
	log.Println("NATS connection closed")
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
@@ -62,7 +63,7 @@ func handleLine(line *Line) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ts := line.Ts.Unix()
 | 
			
		||||
	log.Printf("ts=%d, tags=%v\n", ts, selector)
 | 
			
		||||
	// log.Printf("ts=%d, tags=%v\n", ts, selector)
 | 
			
		||||
	err := memoryStore.Write(selector, ts, line.Fields)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Printf("error: %s\n", err.Error())
 | 
			
		||||
@@ -87,15 +88,15 @@ func main() {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, shutdown := context.WithCancel(context.Background())
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	sigs := make(chan os.Signal, 1)
 | 
			
		||||
	done := make(chan bool, 1)
 | 
			
		||||
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
 | 
			
		||||
	go func() {
 | 
			
		||||
		_ = <-sigs
 | 
			
		||||
		log.Println("Shuting down...")
 | 
			
		||||
		done <- true
 | 
			
		||||
		close(done)
 | 
			
		||||
		shutdown()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	lastCheckpoint = startupTime
 | 
			
		||||
@@ -106,7 +107,7 @@ func main() {
 | 
			
		||||
			ticks := time.Tick(d)
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case <-done:
 | 
			
		||||
				case <-ctx.Done():
 | 
			
		||||
					wg.Done()
 | 
			
		||||
					return
 | 
			
		||||
				case <-ticks:
 | 
			
		||||
@@ -137,7 +138,7 @@ func main() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		err := StartApiServer(":8080", done)
 | 
			
		||||
		err := StartApiServer(":8080", ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -145,7 +146,7 @@ func main() {
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, done)
 | 
			
		||||
		err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user