diff --git a/collectors/collectorManager.go b/collectors/collectorManager.go index 52e91e7..f91db20 100644 --- a/collectors/collectorManager.go +++ b/collectors/collectorManager.go @@ -115,6 +115,7 @@ func (cm *collectorManager) Start() { for _, c := range cm.collectors { c.Close() } + close(cm.done) cclog.ComponentDebug("CollectorManager", "DONE") } @@ -154,6 +155,8 @@ func (cm *collectorManager) AddOutput(output chan lp.CCMetric) { func (cm *collectorManager) Close() { cclog.ComponentDebug("CollectorManager", "CLOSE") cm.done <- true + // wait for close of channel cm.done + <-cm.done } // New creates a new initialized metric collector manager diff --git a/collectors/gpfsMetric.go b/collectors/gpfsMetric.go index bc1852b..53db1c2 100644 --- a/collectors/gpfsMetric.go +++ b/collectors/gpfsMetric.go @@ -130,14 +130,21 @@ func (m *GpfsCollector) Read(interval time.Duration, output chan lp.CCMetric) { continue } - timestampInt, err := strconv.ParseInt(key_value["_t_"]+key_value["_tu_"], 10, 64) - timestamp := time.UnixMicro(timestampInt) + sec, err := strconv.ParseInt(key_value["_t_"], 10, 64) if err != nil { fmt.Fprintf(os.Stderr, - "GpfsCollector.Read(): Failed to convert time stamp '%s': %s\n", - key_value["_t_"]+key_value["_tu_"], err.Error()) + "GpfsCollector.Read(): Failed to convert seconds to int '%s': %v\n", + key_value["_t_"], err) continue } + msec, err := strconv.ParseInt(key_value["_tu_"], 10, 64) + if err != nil { + fmt.Fprintf(os.Stderr, + "GpfsCollector.Read(): Failed to convert micro seconds to int '%s': %v\n", + key_value["_tu_"], err) + continue + } + timestamp := time.Unix(sec, msec*1000) // bytes read bytesRead, err := strconv.ParseInt(key_value["_br_"], 10, 64) diff --git a/go.mod b/go.mod index da4f3ea..0789f7e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ClusterCockpit/cc-metric-collector -go 1.17 +go 1.16 require ( github.com/NVIDIA/go-nvml v0.11.1-0 @@ -12,14 +12,7 @@ require ( ) require ( - github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/nats-io/nats-server/v2 v2.7.0 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect - github.com/nats-io/nuid v1.0.1 // indirect - github.com/pkg/errors v0.9.1 // indirect - golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect - golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v2 v2.3.0 // indirect ) diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index e75e77d..96a2f05 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -101,6 +101,7 @@ func (r *metricRouter) StartTimer() { for { select { case <-r.timerdone: + close(r.timerdone) cclog.ComponentDebug("MetricRouter", "TIMER DONE") return case t := <-m: @@ -195,6 +196,7 @@ func (r *metricRouter) Start() { // Router manager is done done := func() { + close(r.done) cclog.ComponentDebug("MetricRouter", "DONE") } @@ -257,9 +259,13 @@ func (r *metricRouter) AddOutput(output chan lp.CCMetric) { func (r *metricRouter) Close() { cclog.ComponentDebug("MetricRouter", "CLOSE") r.done <- true + // wait for close of channel r.done + <-r.done if r.config.IntervalStamp { cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") r.timerdone <- true + // wait for close of channel r.timerdone + <-r.timerdone } } diff --git a/internal/multiChanTicker/multiChanTicker.go b/internal/multiChanTicker/multiChanTicker.go index a9394ab..e0eca43 100644 --- a/internal/multiChanTicker/multiChanTicker.go +++ b/internal/multiChanTicker/multiChanTicker.go @@ -23,6 +23,7 @@ func (t *multiChanTicker) Init(duration time.Duration) { t.done = make(chan bool) go func() { done := func() { + close(t.done) cclog.ComponentDebug("MultiChanTicker", "DONE") } for { @@ -52,6 +53,8 @@ func (t *multiChanTicker) AddChannel(channel chan time.Time) { func (t *multiChanTicker) Close() { cclog.ComponentDebug("MultiChanTicker", "CLOSE") t.done <- true + // wait for close of channel t.done + <-t.done } func NewTicker(duration time.Duration) MultiChanTicker { diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index b4b3dc5..8d2872a 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -81,6 +81,7 @@ func (sm *sinkManager) Start() { s.Close() } + close(sm.done) cclog.ComponentDebug("SinkManager", "DONE") } @@ -149,6 +150,8 @@ func (sm *sinkManager) AddOutput(rawConfig json.RawMessage) error { func (sm *sinkManager) Close() { cclog.ComponentDebug("SinkManager", "CLOSE") sm.done <- true + // wait for close of channel sm.done + <-sm.done } // New creates a new initialized sink manager