diff --git a/collectors/infinibandMetric.go b/collectors/infinibandMetric.go index 5be095d..274e669 100644 --- a/collectors/infinibandMetric.go +++ b/collectors/infinibandMetric.go @@ -18,13 +18,18 @@ import ( const IB_BASEPATH = "/sys/class/infiniband/" +type InfinibandCollectorMetric struct { + path string + unit string +} + type InfinibandCollectorInfo struct { - LID string // IB local Identifier (LID) - device string // IB device - port string // IB device port - portCounterFiles map[string]string // mapping counter name -> sysfs file - tagSet map[string]string // corresponding tag list - lastState map[string]int64 // State from last measurement + LID string // IB local Identifier (LID) + device string // IB device + port string // IB device port + portCounterFiles map[string]InfinibandCollectorMetric // mapping counter name -> InfinibandCollectorMetric + tagSet map[string]string // corresponding tag list + lastState map[string]int64 // State from last measurement } type InfinibandCollector struct { @@ -106,16 +111,16 @@ func (m *InfinibandCollector) Init(config json.RawMessage) error { // Check access to counter files countersDir := filepath.Join(path, "counters") - portCounterFiles := map[string]string{ - "ib_recv": filepath.Join(countersDir, "port_rcv_data"), - "ib_xmit": filepath.Join(countersDir, "port_xmit_data"), - "ib_recv_pkts": filepath.Join(countersDir, "port_rcv_packets"), - "ib_xmit_pkts": filepath.Join(countersDir, "port_xmit_packets"), + portCounterFiles := map[string]InfinibandCollectorMetric{ + "ib_recv": {path: filepath.Join(countersDir, "port_rcv_data"), unit: "bytes"}, + "ib_xmit": {path: filepath.Join(countersDir, "port_xmit_data"), unit: "bytes"}, + "ib_recv_pkts": {path: filepath.Join(countersDir, "port_rcv_packets"), unit: "packets"}, + "ib_xmit_pkts": {path: filepath.Join(countersDir, "port_xmit_packets"), unit: "packets"}, } - for _, counterFile := range portCounterFiles { - err := unix.Access(counterFile, unix.R_OK) + for _, counter := range portCounterFiles { + err := unix.Access(counter.path, unix.R_OK) if err != nil { - return fmt.Errorf("unable to access %s: %v", counterFile, err) + return fmt.Errorf("unable to access %s: %v", counter.path, err) } } @@ -165,14 +170,14 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr m.lastTimestamp = now for _, info := range m.info { - for counterName, counterFile := range info.portCounterFiles { + for counterName, counterDef := range info.portCounterFiles { // Read counter file - line, err := ioutil.ReadFile(counterFile) + line, err := ioutil.ReadFile(counterDef.path) if err != nil { cclog.ComponentError( m.name, - fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterFile, err)) + fmt.Sprintf("Read(): Failed to read from file '%s': %v", counterDef.path, err)) continue } data := strings.TrimSpace(string(line)) @@ -189,6 +194,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr // Send absolut values if m.config.SendAbsoluteValues { if y, err := lp.New(counterName, info.tagSet, m.meta, map[string]interface{}{"value": v}, now); err == nil { + y.AddMeta("unit", counterDef.unit) output <- y } } @@ -198,6 +204,7 @@ func (m *InfinibandCollector) Read(interval time.Duration, output chan lp.CCMetr if info.lastState[counterName] >= 0 { rate := float64((v - info.lastState[counterName])) / timeDiff if y, err := lp.New(counterName+"_bw", info.tagSet, m.meta, map[string]interface{}{"value": rate}, now); err == nil { + y.AddMeta("unit", counterDef.unit+"/sec") output <- y } } diff --git a/collectors/likwidMetric.go b/collectors/likwidMetric.go index 3bbc1b5..d808bad 100644 --- a/collectors/likwidMetric.go +++ b/collectors/likwidMetric.go @@ -16,6 +16,7 @@ import ( "math" "os" "os/signal" + "sort" "strconv" "strings" "sync" @@ -54,6 +55,7 @@ type LikwidEventsetConfig struct { gid C.int eorder []*C.char estr *C.char + go_estr string results map[int]map[string]interface{} metrics map[int]map[string]float64 } @@ -101,8 +103,14 @@ func eventsToEventStr(events map[string]string) string { func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig { tmplist := make([]string, 0) + clist := make([]string, 0) + for k := range input.Events { + clist = append(clist, k) + } + sort.Strings(clist) elist := make([]*C.char, 0) - for k, v := range input.Events { + for _, k := range clist { + v := input.Events[k] tmplist = append(tmplist, fmt.Sprintf("%s:%s", v, k)) c_counter := C.CString(k) elist = append(elist, c_counter) @@ -124,6 +132,7 @@ func genLikwidEventSet(input LikwidCollectorEventsetConfig) LikwidEventsetConfig gid: -1, eorder: elist, estr: C.CString(estr), + go_estr: estr, results: res, metrics: met, } @@ -193,7 +202,7 @@ func (m *LikwidCollector) Init(config json.RawMessage) error { } m.setup() - m.meta = map[string]string{"source": m.name, "group": "PerfCounter"} + m.meta = map[string]string{"group": "PerfCounter"} cclog.ComponentDebug(m.name, "Get cpulist and init maps and lists") cpulist := topo.CpuList() m.cpulist = make([]C.int, len(cpulist)) @@ -425,6 +434,9 @@ func (m *LikwidCollector) calcGlobalMetrics(interval time.Duration, output chan func (m *LikwidCollector) LateInit() error { var ret C.int + if m.initialized { + return nil + } switch m.config.AccessMode { case "direct": C.HPMmode(0) @@ -475,7 +487,17 @@ func (m *LikwidCollector) LateInit() error { for i, evset := range m.config.Eventsets { var gid C.int if len(evset.Events) > 0 { + skip := false likwidGroup := genLikwidEventSet(evset) + for _, g := range m.likwidGroups { + if likwidGroup.go_estr == g.go_estr { + skip = true + break + } + } + if skip { + continue + } // Now we add the list of events to likwid gid = C.perfmon_addEventSet(likwidGroup.estr) if gid >= 0 { @@ -520,9 +542,14 @@ func (m *LikwidCollector) Read(interval time.Duration, output chan lp.CCMetric) } if !m.initialized { - if m.LateInit() != nil { + m.lock.Lock() + err = m.LateInit() + if err != nil { + m.lock.Unlock() return } + m.initialized = true + m.lock.Unlock() } if m.initialized && !skip { diff --git a/internal/metricRouter/metricRouter.go b/internal/metricRouter/metricRouter.go index f9b3faa..8875d0e 100644 --- a/internal/metricRouter/metricRouter.go +++ b/internal/metricRouter/metricRouter.go @@ -48,7 +48,6 @@ type metricRouter struct { done chan bool // channel to finish / stop metric router wg *sync.WaitGroup // wait group for all goroutines in cc-metric-collector timestamp time.Time // timestamp periodically updated by ticker each interval - timerdone chan bool // channel to finish / stop timestamp updater ticker mct.MultiChanTicker // periodically ticking once each interval config metricRouterConfig // json encoded config for metric router cache MetricCache // pointer to MetricCache @@ -124,29 +123,6 @@ func (r *metricRouter) Init(ticker mct.MultiChanTicker, wg *sync.WaitGroup, rout return nil } -// StartTimer starts a timer which updates timestamp periodically -func (r *metricRouter) StartTimer() { - m := make(chan time.Time) - r.ticker.AddChannel(m) - r.timerdone = make(chan bool) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - for { - select { - case <-r.timerdone: - close(r.timerdone) - cclog.ComponentDebug("MetricRouter", "TIMER DONE") - return - case t := <-m: - r.timestamp = t - } - } - }() - cclog.ComponentDebug("MetricRouter", "TIMER START") -} - func getParamMap(point lp.CCMetric) map[string]interface{} { params := make(map[string]interface{}) params["metric"] = point @@ -235,8 +211,9 @@ func (r *metricRouter) dropMetric(point lp.CCMetric) bool { func (r *metricRouter) Start() { // start timer if configured r.timestamp = time.Now() + timeChan := make(chan time.Time) if r.config.IntervalStamp { - r.StartTimer() + r.ticker.AddChannel(timeChan) } // Router manager is done @@ -316,6 +293,10 @@ func (r *metricRouter) Start() { done() return + case timestamp := <-timeChan: + r.timestamp = timestamp + cclog.ComponentDebug("MetricRouter", "Update timestamp", r.timestamp.UnixNano()) + case p := <-r.coll_input: coll_forward(p) for i := 0; len(r.coll_input) > 0 && i < (r.maxForward-1); i++ { @@ -361,14 +342,6 @@ func (r *metricRouter) Close() { // wait for close of channel r.done <-r.done - // stop timer - if r.config.IntervalStamp { - cclog.ComponentDebug("MetricRouter", "TIMER CLOSE") - r.timerdone <- true - // wait for close of channel r.timerdone - <-r.timerdone - } - // stop metric cache if r.config.NumCacheIntervals > 0 { cclog.ComponentDebug("MetricRouter", "CACHE CLOSE") diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 398eaf3..7713638 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -42,13 +42,13 @@ func (s *HttpSink) Write(m lp.CCMetric) error { if s.buffer.Len() == 0 && s.flushDelay != 0 { // This is the first write since the last flush, start the flushTimer! if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug("HttpSink", "unexpected: the flushTimer was already running?") + cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") } // Run a batched flush for all lines that have arrived in the last second s.flushTimer = time.AfterFunc(s.flushDelay, func() { if err := s.Flush(); err != nil { - cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + cclog.ComponentError(s.name, "flush failed:", err.Error()) } }) } @@ -60,6 +60,7 @@ func (s *HttpSink) Write(m lp.CCMetric) error { s.lock.Unlock() // defer does not work here as Flush() takes the lock as well if err != nil { + cclog.ComponentError(s.name, "encoding failed:", err.Error()) return err } @@ -84,6 +85,7 @@ func (s *HttpSink) Flush() error { // Create new request to send buffer req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) if err != nil { + cclog.ComponentError(s.name, "failed to create request:", err.Error()) return err } @@ -100,12 +102,15 @@ func (s *HttpSink) Flush() error { // Handle transport/tcp errors if err != nil { + cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) return err } // Handle application errors if res.StatusCode != http.StatusOK { - return errors.New(res.Status) + err = errors.New(res.Status) + cclog.ComponentError(s.name, "application error:", err.Error()) + return err } return nil @@ -114,7 +119,7 @@ func (s *HttpSink) Flush() error { func (s *HttpSink) Close() { s.flushTimer.Stop() if err := s.Flush(); err != nil { - cclog.ComponentError("HttpSink", "flush failed:", err.Error()) + cclog.ComponentError(s.name, "flush failed:", err.Error()) } s.client.CloseIdleConnections() } diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 213f2d6..c2956fc 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -28,10 +28,11 @@ type InfluxAsyncSinkConfig struct { BatchSize uint `json:"batch_size,omitempty"` // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms FlushInterval uint `json:"flush_interval,omitempty"` - InfluxRetryInterval string `json:"retry_interval"` - InfluxExponentialBase uint `json:"retry_exponential_base"` - InfluxMaxRetries uint `json:"max_retries"` - InfluxMaxRetryTime string `json:"max_retry_time"` + InfluxRetryInterval string `json:"retry_interval,omitempty"` + InfluxExponentialBase uint `json:"retry_exponential_base,omitempty"` + InfluxMaxRetries uint `json:"max_retries,omitempty"` + InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` + CustomFlushInterval string `json:"custom_flush_interval,omitempty"` } type InfluxAsyncSink struct { @@ -42,6 +43,8 @@ type InfluxAsyncSink struct { config InfluxAsyncSinkConfig influxRetryInterval uint influxMaxRetryTime uint + customFlushInterval time.Duration + flushTimer *time.Timer } func (s *InfluxAsyncSink) connect() error { @@ -60,20 +63,34 @@ func (s *InfluxAsyncSink) connect() error { cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) clientOptions := influxdb2.DefaultOptions() if s.config.BatchSize != 0 { + cclog.ComponentDebug(s.name, "Batch size", s.config.BatchSize) clientOptions.SetBatchSize(s.config.BatchSize) } if s.config.FlushInterval != 0 { + cclog.ComponentDebug(s.name, "Flush interval", s.config.FlushInterval) clientOptions.SetFlushInterval(s.config.FlushInterval) } + if s.influxRetryInterval != 0 { + cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) + clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + } + if s.influxMaxRetryTime != 0 { + cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) + clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + } + if s.config.InfluxExponentialBase != 0 { + cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) + clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + } + if s.config.InfluxMaxRetries != 0 { + cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) + clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + } clientOptions.SetTLSConfig( &tls.Config{ InsecureSkipVerify: true, }, - ) - clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + ).SetPrecision(time.Second) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) @@ -88,6 +105,15 @@ func (s *InfluxAsyncSink) connect() error { } func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { + if s.customFlushInterval != 0 && s.flushTimer == nil { + // Run a batched flush for all lines that have arrived in the defined interval + s.flushTimer = time.AfterFunc(s.customFlushInterval, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + + }) + } s.writeApi.WritePoint( m.ToPoint(s.meta_as_tags), ) @@ -95,7 +121,11 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { } func (s *InfluxAsyncSink) Flush() error { + cclog.ComponentDebug(s.name, "Flushing") s.writeApi.Flush() + if s.customFlushInterval != 0 && s.flushTimer != nil { + s.flushTimer = nil + } return nil } @@ -110,13 +140,16 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set default for maximum number of points sent to server in single request. - s.config.BatchSize = 100 - s.influxRetryInterval = uint(time.Duration(1) * time.Second) - s.config.InfluxRetryInterval = "1s" - s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) - s.config.InfluxMaxRetryTime = "168h" - s.config.InfluxMaxRetries = 20 - s.config.InfluxExponentialBase = 2 + s.config.BatchSize = 0 + s.influxRetryInterval = 0 + //s.config.InfluxRetryInterval = "1s" + s.influxMaxRetryTime = 0 + //s.config.InfluxMaxRetryTime = "168h" + s.config.InfluxMaxRetries = 0 + s.config.InfluxExponentialBase = 0 + s.config.FlushInterval = 0 + s.config.CustomFlushInterval = "" + s.customFlushInterval = time.Duration(0) // Default retry intervals (in seconds) // 1 2 @@ -168,6 +201,15 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + // Use a own timer for calling Flush() + if len(s.config.CustomFlushInterval) > 0 { + t, err := time.ParseDuration(s.config.CustomFlushInterval) + if err != nil { + return nil, fmt.Errorf("invalid duration in 'custom_flush_interval': %v", err) + } + s.customFlushInterval = t + } + // Connect to InfluxDB server if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 1987342..e8b16d8 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -6,28 +6,32 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" ) type InfluxSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` - InfluxRetryInterval string `json:"retry_interval"` - InfluxExponentialBase uint `json:"retry_exponential_base"` - InfluxMaxRetries uint `json:"max_retries"` - InfluxMaxRetryTime string `json:"max_retry_time"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` + BatchSize int `json:"batch_size,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` + // InfluxRetryInterval string `json:"retry_interval"` + // InfluxExponentialBase uint `json:"retry_exponential_base"` + // InfluxMaxRetries uint `json:"max_retries"` + // InfluxMaxRetryTime string `json:"max_retry_time"` //InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it } @@ -38,6 +42,10 @@ type InfluxSink struct { config InfluxSinkConfig influxRetryInterval uint influxMaxRetryTime uint + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer //influxMaxRetryDelay uint } @@ -56,16 +64,31 @@ func (s *InfluxSink) connect() error { } cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) clientOptions := influxdb2.DefaultOptions() + + // if s.influxRetryInterval != 0 { + // cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) + // clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + // } + // if s.influxMaxRetryTime != 0 { + // cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) + // clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + // } + // if s.config.InfluxExponentialBase != 0 { + // cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) + // clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + // } + // if s.config.InfluxMaxRetries != 0 { + // cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) + // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + // } + clientOptions.SetTLSConfig( &tls.Config{ InsecureSkipVerify: true, }, ) - clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + clientOptions.SetPrecision(time.Second) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) @@ -80,38 +103,76 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Write(m lp.CCMetric) error { - err := - s.writeApi.WritePoint( - context.Background(), - m.ToPoint(s.meta_as_tags), - ) - return err + // err := + // s.writeApi.WritePoint( + // context.Background(), + // m.ToPoint(s.meta_as_tags), + // ) + if len(s.batch) == 0 && s.flushDelay != 0 { + // This is the first write since the last flush, start the flushTimer! + if s.flushTimer != nil && s.flushTimer.Stop() { + cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") + } + + // Run a batched flush for all lines that have arrived in the last second + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) + } + p := m.ToPoint(s.meta_as_tags) + s.lock.Lock() + s.batch = append(s.batch, p) + s.lock.Unlock() + + // Flush synchronously if "flush_delay" is zero + if s.flushDelay == 0 { + return s.Flush() + } + + return nil } func (s *InfluxSink) Flush() error { + s.lock.Lock() + defer s.lock.Unlock() + if len(s.batch) == 0 { + return nil + } + err := s.writeApi.WritePoint(context.Background(), s.batch...) + if err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + return err + } + s.batch = s.batch[:0] return nil } func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") + s.flushTimer.Stop() + s.Flush() s.client.Close() } func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxSink) s.name = fmt.Sprintf("InfluxSink(%s)", name) + s.config.BatchSize = 100 + s.config.FlushDelay = "1s" if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { return nil, err } } - s.influxRetryInterval = uint(time.Duration(1) * time.Second) - s.config.InfluxRetryInterval = "1s" - s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) - s.config.InfluxMaxRetryTime = "168h" - s.config.InfluxMaxRetries = 20 - s.config.InfluxExponentialBase = 2 + s.influxRetryInterval = 0 + s.influxMaxRetryTime = 0 + // s.config.InfluxRetryInterval = "" + // s.config.InfluxMaxRetryTime = "" + // s.config.InfluxMaxRetries = 0 + // s.config.InfluxExponentialBase = 0 if len(s.config.Host) == 0 || len(s.config.Port) == 0 || @@ -126,15 +187,25 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } - toUint := func(duration string, def uint) uint { - t, err := time.ParseDuration(duration) + // toUint := func(duration string, def uint) uint { + // if len(duration) > 0 { + // t, err := time.ParseDuration(duration) + // if err == nil { + // return uint(t.Milliseconds()) + // } + // } + // return def + // } + // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) + // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) if err == nil { - return uint(t.Milliseconds()) + s.flushDelay = t } - return def } - s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) - s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + s.batch = make([]*write.Point, 0, s.config.BatchSize) // Connect to InfluxDB server if err := s.connect(); err != nil { diff --git a/sinks/influxSink.md b/sinks/influxSink.md index a099895..8f9ce83 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -17,10 +17,8 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de "password" : "examplepw", "organization": "myorg", "ssl": true, - "retry_interval" : "1s", - "retry_exponential_base" : 2, - "max_retries": 20, - "max_retry_time" : "168h" + "flush_delay" : "1s", + "batch_size" : 100 } } ``` @@ -34,9 +32,6 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `password`: Password for basic authentification - `organization`: Organization in the InfluxDB - `ssl`: Use SSL connection -- `retry_interval`: Base retry interval for failed write requests, default 1s -- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2 -- `max_retries`: Maximal number of retry attempts -- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week) +- `flush_delay`: Group metrics coming in to a single batch +- `batch_size`: Maximal batch size -For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes) \ No newline at end of file