diff --git a/.github/workflows/Release.yml b/.github/workflows/Release.yml index f75a167..f7143cb 100644 --- a/.github/workflows/Release.yml +++ b/.github/workflows/Release.yml @@ -27,7 +27,9 @@ jobs: # Use dnf to install development packages - name: Install development packages - run: dnf --assumeyes group install "Development Tools" "RPM Development Tools" + run: | + dnf --assumeyes group install "Development Tools" "RPM Development Tools" + dnf --assumeyes install wget openssl-devel diffutils delve which # Checkout git repository and submodules # fetch-depth must be 0 to use git describe @@ -41,9 +43,11 @@ jobs: # Use dnf to install build dependencies - name: Install build dependencies run: | - dnf --assumeyes install 'dnf-command(builddep)' - dnf --assumeyes install which - dnf --assumeyes builddep scripts/cc-metric-collector.spec + wget -q http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.18.2-1.module_el8.7.0+1173+5d37c0fd.noarch.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm + rpm -i go*.rpm - name: RPM build MetricCollector id: rpmbuild @@ -93,7 +97,7 @@ jobs: # Use dnf to install development packages - name: Install development packages - run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git + run: dnf --assumeyes --disableplugin=subscription-manager install rpm-build go-srpm-macros rpm-build-libs rpm-libs gcc make python38 git wget openssl-devel diffutils delve which # Checkout git repository and submodules # fetch-depth must be 0 to use git describe @@ -106,7 +110,12 @@ jobs: # Use dnf to install build dependencies - name: Install build dependencies - run: dnf --assumeyes --disableplugin=subscription-manager builddep scripts/cc-metric-collector.spec + run: | + wget -q http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.18.2-1.module_el8.7.0+1173+5d37c0fd.noarch.rpm \ + http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.18.2-1.module_el8.7.0+1173+5d37c0fd.x86_64.rpm + rpm -i go*.rpm - name: RPM build MetricCollector id: rpmbuild diff --git a/.github/workflows/runonce.yml b/.github/workflows/runonce.yml index a12e002..2036179 100644 --- a/.github/workflows/runonce.yml +++ b/.github/workflows/runonce.yml @@ -31,56 +31,4 @@ jobs: run: make - name: Run MetricCollector once - run: ./cc-metric-collector --once --config .github/ci-config.json - - # - # Job build-1-17 - # Build on latest Ubuntu using golang version 1.17 - # - build-1-17: - runs-on: ubuntu-latest - steps: - # See: https://github.com/marketplace/actions/checkout - # Checkout git repository and submodules - - name: Checkout - uses: actions/checkout@v2 - with: - submodules: recursive - - # See: https://github.com/marketplace/actions/setup-go-environment - - name: Setup Golang - uses: actions/setup-go@v3 - with: - go-version: '1.17.7' - - - name: Build MetricCollector - run: make - - - name: Run MetricCollector once - run: ./cc-metric-collector --once --config .github/ci-config.json - - # - # Job build-1-16 - # Build on latest Ubuntu using golang version 1.16 - # - build-1-16: - runs-on: ubuntu-latest - steps: - # See: https://github.com/marketplace/actions/checkout - # Checkout git repository and submodules - - name: Checkout - uses: actions/checkout@v2 - with: - submodules: recursive - - # See: https://github.com/marketplace/actions/setup-go-environment - - name: Setup Golang - uses: actions/setup-go@v3 - with: - go-version: '1.16.7' # The version AlmaLinux 8.5 uses - - - name: Build MetricCollector - run: make - - - name: Run MetricCollector once - run: ./cc-metric-collector --once --config .github/ci-config.json + run: ./cc-metric-collector --once --config .github/ci-config.json \ No newline at end of file diff --git a/collectors.json b/collectors.json index be3eea7..42cb833 100644 --- a/collectors.json +++ b/collectors.json @@ -12,6 +12,7 @@ "proc_total" ] }, + "memstat": {}, "netstat": { "include_devices": [ "enp5s0" @@ -33,5 +34,8 @@ "type-id": "1" } } + }, + "topprocs": { + "num_procs": 5 } } \ No newline at end of file diff --git a/collectors/cpufreqMetric.go b/collectors/cpufreqMetric.go index 3099900..e6a0081 100644 --- a/collectors/cpufreqMetric.go +++ b/collectors/cpufreqMetric.go @@ -63,7 +63,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error { m.meta = map[string]string{ "source": m.name, "group": "CPU", - "unit": "MHz", + "unit": "Hz", } // Loop for all CPU directories diff --git a/go.mod b/go.mod index a4aacb8..37e242f 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,36 @@ module github.com/ClusterCockpit/cc-metric-collector go 1.17 require ( - github.com/ClusterCockpit/cc-units v0.0.0-20220318130935-92a0c6442220 + github.com/ClusterCockpit/cc-units v0.3.0 + github.com/ClusterCockpit/go-rocm-smi v0.3.0 github.com/NVIDIA/go-nvml v0.11.6-0 - github.com/PaesslerAG/gval v1.1.2 + github.com/PaesslerAG/gval v1.2.0 github.com/gorilla/mux v1.8.0 - github.com/influxdata/influxdb-client-go/v2 v2.8.1 + github.com/influxdata/influxdb-client-go/v2 v2.9.0 github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf - github.com/nats-io/nats.go v1.14.0 - github.com/prometheus/client_golang v1.12.1 + github.com/nats-io/nats.go v1.16.0 + github.com/prometheus/client_golang v1.12.2 github.com/stmcginnis/gofish v0.13.0 - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad + golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.32.1 // indirect + github.com/prometheus/procfs v0.7.3 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect + golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) require ( diff --git a/go.mod.1.17+ b/go.mod.1.17+ index b3dc6ca..5c15a61 100644 --- a/go.mod.1.17+ +++ b/go.mod.1.17+ @@ -3,14 +3,15 @@ module github.com/ClusterCockpit/cc-metric-collector go 1.17 require ( + github.com/ClusterCockpit/cc-units v0.3.0 + github.com/ClusterCockpit/go-rocm-smi v0.3.0 github.com/NVIDIA/go-nvml v0.11.6-0 - github.com/PaesslerAG/gval v1.1.2 + github.com/PaesslerAG/gval v1.2.0 github.com/gorilla/mux v1.8.0 - github.com/influxdata/influxdb-client-go/v2 v2.8.1 + github.com/influxdata/influxdb-client-go/v2 v2.9.0 github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf - github.com/nats-io/nats-server/v2 v2.8.0 // indirect - github.com/nats-io/nats.go v1.14.0 - github.com/prometheus/client_golang v1.12.1 + github.com/nats-io/nats.go v1.16.0 + github.com/prometheus/client_golang v1.12.2 github.com/stmcginnis/gofish v0.13.0 - golang.org/x/sys v0.0.0-20220412211240-33da011f77ad + golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 ) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 212647d..90f8da8 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -29,15 +29,23 @@ type InfluxSink struct { Password string `json:"password,omitempty"` Organization string `json:"organization,omitempty"` SSL bool `json:"ssl,omitempty"` - // Maximum number of points sent to server in single request. Default 100 + // Maximum number of points sent to server in single request. + // Default: 1000 BatchSize int `json:"batch_size,omitempty"` - // Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s + // Time interval for delayed sending of metrics. + // If the buffers are already filled before the end of this interval, + // the metrics are sent without further delay. + // Default: 1s FlushInterval string `json:"flush_delay,omitempty"` + // Number of metrics that are dropped when buffer is full + // Default: 100 + DropRate int `json:"drop_rate,omitempty"` } - batch []*write.Point - flushTimer *time.Timer - flushDelay time.Duration - lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + flushTimerMutex sync.Mutex // Ensure only one flush timer is running } // connect connects to the InfluxDB server @@ -62,7 +70,10 @@ func (s *InfluxSink) connect() error { } else { auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) } - cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) + cclog.ComponentDebug(s.name, + "Using URI='"+uri+"'", + "Org='"+s.config.Organization+"'", + "Bucket='"+s.config.Database+"'") // Set influxDB client options clientOptions := influxdb2.DefaultOptions() @@ -93,47 +104,64 @@ func (s *InfluxSink) connect() error { func (s *InfluxSink) Write(m lp.CCMetric) error { - if len(s.batch) == 0 && s.flushDelay != 0 { - // This is the first write since the last flush, start the flushTimer! - if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") - } - - // Run a batched flush for all lines that have arrived in the last flush delay interval + if s.flushDelay != 0 && s.flushTimerMutex.TryLock() { + // Run a batched flush for all metrics that arrived in the last flush delay interval + cclog.ComponentDebug(s.name, "Starting new flush timer") s.flushTimer = time.AfterFunc( s.flushDelay, func() { + defer s.flushTimerMutex.Unlock() + cclog.ComponentDebug(s.name, "Starting flush in flush timer") if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) + cclog.ComponentError(s.name, "Flush timer: flush failed:", err) } }) } + // Lock access to batch slice + s.batchMutex.Lock() + + // batch slice full, dropping oldest metric(s) + // e.g. when previous flushes failed and batch slice was not cleared + if len(s.batch) == s.config.BatchSize { + newSize := s.config.BatchSize - s.config.DropRate + + for i := 0; i < newSize; i++ { + s.batch[i] = s.batch[i+s.config.DropRate] + } + for i := newSize; i < s.config.BatchSize; i++ { + s.batch[i] = nil + } + s.batch = s.batch[:newSize] + cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)") + } + // Append metric to batch slice p := m.ToPoint(s.meta_as_tags) - s.lock.Lock() s.batch = append(s.batch, p) - s.lock.Unlock() // Flush synchronously if "flush_delay" is zero - if s.flushDelay == 0 { - return s.Flush() - } - + // or // Flush if batch size is reached - if len(s.batch) == s.config.BatchSize { + if s.flushDelay == 0 || + len(s.batch) == s.config.BatchSize { + // Unlock access to batch slice + s.batchMutex.Unlock() return s.Flush() } + // Unlock access to batch slice + s.batchMutex.Unlock() return nil } // Flush sends all metrics buffered in batch slice to InfluxDB server func (s *InfluxSink) Flush() error { + cclog.ComponentDebug(s.name, "Flushing") // Lock access to batch slice - s.lock.Lock() - defer s.lock.Unlock() + s.batchMutex.Lock() + defer s.batchMutex.Unlock() // Nothing to do, batch slice is empty if len(s.batch) == 0 { @@ -143,7 +171,7 @@ func (s *InfluxSink) Flush() error { // Send metrics from batch slice err := s.writeApi.WritePoint(context.Background(), s.batch...) if err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) + cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err) return err } @@ -160,6 +188,9 @@ func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") s.flushTimer.Stop() s.Flush() + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "Close(): Flush failed:", err) + } s.client.Close() } @@ -169,31 +200,32 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set config default values - s.config.BatchSize = 100 + s.config.BatchSize = 1000 s.config.FlushInterval = "1s" + s.config.DropRate = 100 // Read config if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { - return nil, err + return s, err } } if len(s.config.Host) == 0 { - return nil, errors.New("Missing host configuration required by InfluxSink") + return s, errors.New("Missing host configuration required by InfluxSink") } if len(s.config.Port) == 0 { - return nil, errors.New("Missing port configuration required by InfluxSink") + return s, errors.New("Missing port configuration required by InfluxSink") } if len(s.config.Database) == 0 { - return nil, errors.New("Missing database configuration required by InfluxSink") + return s, errors.New("Missing database configuration required by InfluxSink") } if len(s.config.Organization) == 0 { - return nil, errors.New("Missing organization configuration required by InfluxSink") + return s, errors.New("Missing organization configuration required by InfluxSink") } if len(s.config.Password) == 0 { - return nil, errors.New("Missing password configuration required by InfluxSink") + return s, errors.New("Missing password configuration required by InfluxSink") } // Create lookup map to use meta infos as tags in the output metric @@ -210,12 +242,24 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { } } + if !(s.config.BatchSize > 0) { + return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize) + } + if !(s.config.DropRate > 0) { + return s, fmt.Errorf("drop_rate=%d in InfluxDB config must be > 0", s.config.DropRate) + } + if !(s.config.BatchSize > s.config.DropRate) { + return s, fmt.Errorf( + "batch_size=%d must be greater then drop_rate=%d in InfluxDB config", + s.config.BatchSize, s.config.DropRate) + } + // allocate batch slice s.batch = make([]*write.Point, 0, s.config.BatchSize) // Connect to InfluxDB server if err := s.connect(); err != nil { - return nil, fmt.Errorf("unable to connect: %v", err) + return s, fmt.Errorf("unable to connect: %v", err) } return s, nil } diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index f531f5d..6af8614 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -76,11 +76,17 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error { for name, raw := range rawConfigs { err = sm.AddOutput(name, raw) if err != nil { - cclog.ComponentError("SinkManager", err.Error()) + cclog.ComponentError("SinkManager", err) continue } } + // Check that at least one sink is running + if !(len(sm.sinks) > 0) { + cclog.ComponentError("SinkManager", "Found no usable sinks") + return fmt.Errorf("Found no usable sinks") + } + return nil }