Merge latest development changes (#80)

* Cleanup: Remove unused code

* Use Golang duration parser for 'interval' and 'duration'
 in main config

* Update handling of LIKWID headers. Download only if not already present in the system. Fixes #73

* Units with cc-units (#64)

* Add option to normalize units with cc-unit

* Add unit conversion to router

* Add option to change unit prefix in the router

* Add to MetricRouter README

* Add order of operations in router to README

* Use second add_tags/del_tags only if metric gets renamed

* Skip disks in DiskstatCollector that have size=0

* Check readability of sensor files in TempCollector

* Fix for --once option

* Rename `cpu` type to `hwthread` (#69)

* Rename 'cpu' type to 'hwthread' to avoid naming clashes with MetricStore and CC-Webfrontend

* Collectors in parallel (#74)

* Provide info to CollectorManager whether the collector can be executed in parallel with others

* Split serial and parallel collectors. Read in parallel first

* Update NvidiaCollector with new metrics, MIG and NvLink support (#75)

* CC topology module update (#76)

* Rename CPU to hardware thread, write some comments

* Do renaming in other parts

* Remove CpuList and SocketList function from metricCollector. Available in ccTopology

* Option to use MIG UUID as subtype-id in NvidiaCollector

* Option to use MIG slice name as subtype-id in NvidiaCollector

* MetricRouter: Fix JSON in README

* Fix for Github Action to really use the selected version

* Remove Ganglia installation in runonce Action and add Go 1.18

* Fix daemon options in init script

* Add separate go.mod files to use it with deprecated 1.16

* Minor updates for Makefiles

* fix string comparison

* AMD ROCm SMI collector (#77)

* Add collector for AMD ROCm SMI metrics

* Fix import path

* Fix imports

* Remove Board Number

* store GPU index explicitly

* Remove board number from description

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

* Improved http sink (#78)

* automatic flush in NatsSink

* tweak default options of HttpSink

* shorter cirt. section and retries for HttpSink

* fix error handling

* Remove file added by mistake.

* Use http instead of ftp to download likwid

* Fix serial number in rocmCollector

Co-authored-by: Thomas Roehl <thomas.roehl@fau.de>

* Fix: When sending metrics failed the batch size could be exceeded

* Improved dropping of metrics failed to send

* Add memstats and topprocs metric

* Updated to latest modules

* Check that at least one sink is running

* Add drop rate, when send buffer is full

* Allow only one timer at a time

* Use mutex to ensure only on flush timer is running

* Fix for NvidiaCollector when devices are not in MiG mode

* Remove Golang version 1.16 an 1.17 from Action. Latest commits require Golang 1.18

* Use Golang 1.18 in Release action to build RPMs

* Change unit of CpufreqCollector to Hz. That's what the sysfs outputs

* Make wget quiet in Release action to reduce log size

Co-authored-by: Holger Obermaier <40787752+ho-ob@users.noreply.github.com>
Co-authored-by: Lou <lou.knauer@gmx.de>
This commit is contained in:
Thomas Gruber
2022-07-13 10:09:49 +02:00
committed by GitHub
parent 2adf9484a3
commit b3c27e0af5
8 changed files with 138 additions and 106 deletions

View File

@@ -29,15 +29,23 @@ type InfluxSink struct {
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
// Maximum number of points sent to server in single request. Default 100
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s
// Time interval for delayed sending of metrics.
// If the buffers are already filled before the end of this interval,
// the metrics are sent without further delay.
// Default: 1s
FlushInterval string `json:"flush_delay,omitempty"`
// Number of metrics that are dropped when buffer is full
// Default: 100
DropRate int `json:"drop_rate,omitempty"`
}
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
flushTimerMutex sync.Mutex // Ensure only one flush timer is running
}
// connect connects to the InfluxDB server
@@ -62,7 +70,10 @@ func (s *InfluxSink) connect() error {
} else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
}
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
cclog.ComponentDebug(s.name,
"Using URI='"+uri+"'",
"Org='"+s.config.Organization+"'",
"Bucket='"+s.config.Database+"'")
// Set influxDB client options
clientOptions := influxdb2.DefaultOptions()
@@ -93,47 +104,64 @@ func (s *InfluxSink) connect() error {
func (s *InfluxSink) Write(m lp.CCMetric) error {
if len(s.batch) == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last flush delay interval
if s.flushDelay != 0 && s.flushTimerMutex.TryLock() {
// Run a batched flush for all metrics that arrived in the last flush delay interval
cclog.ComponentDebug(s.name, "Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.flushDelay,
func() {
defer s.flushTimerMutex.Unlock()
cclog.ComponentDebug(s.name, "Starting flush in flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
cclog.ComponentError(s.name, "Flush timer: flush failed:", err)
}
})
}
// Lock access to batch slice
s.batchMutex.Lock()
// batch slice full, dropping oldest metric(s)
// e.g. when previous flushes failed and batch slice was not cleared
if len(s.batch) == s.config.BatchSize {
newSize := s.config.BatchSize - s.config.DropRate
for i := 0; i < newSize; i++ {
s.batch[i] = s.batch[i+s.config.DropRate]
}
for i := newSize; i < s.config.BatchSize; i++ {
s.batch[i] = nil
}
s.batch = s.batch[:newSize]
cclog.ComponentError(s.name, "Batch slice full, dropping", s.config.DropRate, "oldest metric(s)")
}
// Append metric to batch slice
p := m.ToPoint(s.meta_as_tags)
s.lock.Lock()
s.batch = append(s.batch, p)
s.lock.Unlock()
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
// or
// Flush if batch size is reached
if len(s.batch) == s.config.BatchSize {
if s.flushDelay == 0 ||
len(s.batch) == s.config.BatchSize {
// Unlock access to batch slice
s.batchMutex.Unlock()
return s.Flush()
}
// Unlock access to batch slice
s.batchMutex.Unlock()
return nil
}
// Flush sends all metrics buffered in batch slice to InfluxDB server
func (s *InfluxSink) Flush() error {
cclog.ComponentDebug(s.name, "Flushing")
// Lock access to batch slice
s.lock.Lock()
defer s.lock.Unlock()
s.batchMutex.Lock()
defer s.batchMutex.Unlock()
// Nothing to do, batch slice is empty
if len(s.batch) == 0 {
@@ -143,7 +171,7 @@ func (s *InfluxSink) Flush() error {
// Send metrics from batch slice
err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err)
return err
}
@@ -160,6 +188,9 @@ func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.flushTimer.Stop()
s.Flush()
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close(): Flush failed:", err)
}
s.client.Close()
}
@@ -169,31 +200,32 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set config default values
s.config.BatchSize = 100
s.config.BatchSize = 1000
s.config.FlushInterval = "1s"
s.config.DropRate = 100
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
return s, err
}
}
if len(s.config.Host) == 0 {
return nil, errors.New("Missing host configuration required by InfluxSink")
return s, errors.New("Missing host configuration required by InfluxSink")
}
if len(s.config.Port) == 0 {
return nil, errors.New("Missing port configuration required by InfluxSink")
return s, errors.New("Missing port configuration required by InfluxSink")
}
if len(s.config.Database) == 0 {
return nil, errors.New("Missing database configuration required by InfluxSink")
return s, errors.New("Missing database configuration required by InfluxSink")
}
if len(s.config.Organization) == 0 {
return nil, errors.New("Missing organization configuration required by InfluxSink")
return s, errors.New("Missing organization configuration required by InfluxSink")
}
if len(s.config.Password) == 0 {
return nil, errors.New("Missing password configuration required by InfluxSink")
return s, errors.New("Missing password configuration required by InfluxSink")
}
// Create lookup map to use meta infos as tags in the output metric
@@ -210,12 +242,24 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
}
}
if !(s.config.BatchSize > 0) {
return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize)
}
if !(s.config.DropRate > 0) {
return s, fmt.Errorf("drop_rate=%d in InfluxDB config must be > 0", s.config.DropRate)
}
if !(s.config.BatchSize > s.config.DropRate) {
return s, fmt.Errorf(
"batch_size=%d must be greater then drop_rate=%d in InfluxDB config",
s.config.BatchSize, s.config.DropRate)
}
// allocate batch slice
s.batch = make([]*write.Point, 0, s.config.BatchSize)
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("unable to connect: %v", err)
return s, fmt.Errorf("unable to connect: %v", err)
}
return s, nil
}

View File

@@ -76,11 +76,17 @@ func (sm *sinkManager) Init(wg *sync.WaitGroup, sinkConfigFile string) error {
for name, raw := range rawConfigs {
err = sm.AddOutput(name, raw)
if err != nil {
cclog.ComponentError("SinkManager", err.Error())
cclog.ComponentError("SinkManager", err)
continue
}
}
// Check that at least one sink is running
if !(len(sm.sinks) > 0) {
cclog.ComponentError("SinkManager", "Found no usable sinks")
return fmt.Errorf("Found no usable sinks")
}
return nil
}