2022-02-07 16:51:46 +01:00
package sinks
import (
2022-02-25 13:51:52 +01:00
"context"
2022-02-07 16:51:46 +01:00
"crypto/tls"
"encoding/json"
"errors"
"fmt"
2022-04-04 11:48:54 +02:00
"strings"
2022-03-11 13:43:03 +01:00
"time"
2022-02-07 16:51:46 +01:00
2022-10-10 11:53:11 +02:00
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
2022-02-07 16:51:46 +01:00
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
2022-04-04 11:48:54 +02:00
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
2022-02-07 16:51:46 +01:00
)
type InfluxAsyncSinkConfig struct {
defaultSinkConfig
Host string ` json:"host,omitempty" `
Port string ` json:"port,omitempty" `
Database string ` json:"database,omitempty" `
User string ` json:"user,omitempty" `
Password string ` json:"password,omitempty" `
Organization string ` json:"organization,omitempty" `
SSL bool ` json:"ssl,omitempty" `
2022-02-10 09:43:02 +01:00
// Maximum number of points sent to server in single request. Default 5000
BatchSize uint ` json:"batch_size,omitempty" `
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
2022-03-11 13:43:03 +01:00
FlushInterval uint ` json:"flush_interval,omitempty" `
2022-04-01 17:26:56 +02:00
InfluxRetryInterval string ` json:"retry_interval,omitempty" `
InfluxExponentialBase uint ` json:"retry_exponential_base,omitempty" `
InfluxMaxRetries uint ` json:"max_retries,omitempty" `
InfluxMaxRetryTime string ` json:"max_retry_time,omitempty" `
2022-04-04 02:56:23 +02:00
CustomFlushInterval string ` json:"custom_flush_interval,omitempty" `
2022-04-04 11:48:54 +02:00
MaxRetryAttempts uint ` json:"max_retry_attempts,omitempty" `
2022-02-07 16:51:46 +01:00
}
type InfluxAsyncSink struct {
sink
2022-03-11 13:43:03 +01:00
client influxdb2 . Client
writeApi influxdb2Api . WriteAPI
errors <- chan error
config InfluxAsyncSinkConfig
influxRetryInterval uint
influxMaxRetryTime uint
2022-04-04 02:56:23 +02:00
customFlushInterval time . Duration
flushTimer * time . Timer
2022-02-07 16:51:46 +01:00
}
func ( s * InfluxAsyncSink ) connect ( ) error {
var auth string
var uri string
if s . config . SSL {
uri = fmt . Sprintf ( "https://%s:%s" , s . config . Host , s . config . Port )
} else {
uri = fmt . Sprintf ( "http://%s:%s" , s . config . Host , s . config . Port )
}
if len ( s . config . User ) == 0 {
auth = s . config . Password
} else {
auth = fmt . Sprintf ( "%s:%s" , s . config . User , s . config . Password )
}
cclog . ComponentDebug ( s . name , "Using URI" , uri , "Org" , s . config . Organization , "Bucket" , s . config . Database )
2022-02-09 11:08:50 +01:00
clientOptions := influxdb2 . DefaultOptions ( )
2022-02-10 09:43:02 +01:00
if s . config . BatchSize != 0 {
2022-04-01 17:26:56 +02:00
cclog . ComponentDebug ( s . name , "Batch size" , s . config . BatchSize )
2022-02-10 09:43:02 +01:00
clientOptions . SetBatchSize ( s . config . BatchSize )
}
if s . config . FlushInterval != 0 {
2022-04-01 17:26:56 +02:00
cclog . ComponentDebug ( s . name , "Flush interval" , s . config . FlushInterval )
2022-02-10 09:43:02 +01:00
clientOptions . SetFlushInterval ( s . config . FlushInterval )
}
2022-04-01 17:26:56 +02:00
if s . influxRetryInterval != 0 {
cclog . ComponentDebug ( s . name , "MaxRetryInterval" , s . influxRetryInterval )
clientOptions . SetMaxRetryInterval ( s . influxRetryInterval )
}
if s . influxMaxRetryTime != 0 {
cclog . ComponentDebug ( s . name , "MaxRetryTime" , s . influxMaxRetryTime )
clientOptions . SetMaxRetryTime ( s . influxMaxRetryTime )
}
if s . config . InfluxExponentialBase != 0 {
cclog . ComponentDebug ( s . name , "Exponential Base" , s . config . InfluxExponentialBase )
clientOptions . SetExponentialBase ( s . config . InfluxExponentialBase )
}
if s . config . InfluxMaxRetries != 0 {
cclog . ComponentDebug ( s . name , "Max Retries" , s . config . InfluxMaxRetries )
clientOptions . SetMaxRetries ( s . config . InfluxMaxRetries )
}
2022-02-09 11:08:50 +01:00
clientOptions . SetTLSConfig (
& tls . Config {
2022-02-07 16:51:46 +01:00
InsecureSkipVerify : true ,
2022-02-09 11:08:50 +01:00
} ,
2022-04-01 17:26:56 +02:00
) . SetPrecision ( time . Second )
2022-03-11 13:43:03 +01:00
2022-02-09 11:08:50 +01:00
s . client = influxdb2 . NewClientWithOptions ( uri , auth , clientOptions )
2022-02-07 16:51:46 +01:00
s . writeApi = s . client . WriteAPI ( s . config . Organization , s . config . Database )
2022-02-25 13:51:52 +01:00
ok , err := s . client . Ping ( context . Background ( ) )
if err != nil {
return err
}
if ! ok {
return fmt . Errorf ( "connection to %s not healthy" , uri )
}
2022-04-04 11:48:54 +02:00
s . writeApi . SetWriteFailedCallback ( func ( batch string , err influxdb2ApiHttp . Error , retryAttempts uint ) bool {
mlist := strings . Split ( batch , "\n" )
cclog . ComponentError ( s . name , fmt . Sprintf ( "Failed to write batch with %d metrics %d times (max: %d): %s" , len ( mlist ) , retryAttempts , s . config . MaxRetryAttempts , err . Error ( ) ) )
return retryAttempts <= s . config . MaxRetryAttempts
} )
2022-02-07 16:51:46 +01:00
return nil
}
2022-02-23 14:56:29 +01:00
func ( s * InfluxAsyncSink ) Write ( m lp . CCMetric ) error {
2022-04-04 02:56:23 +02:00
if s . customFlushInterval != 0 && s . flushTimer == nil {
// Run a batched flush for all lines that have arrived in the defined interval
s . flushTimer = time . AfterFunc ( s . customFlushInterval , func ( ) {
if err := s . Flush ( ) ; err != nil {
cclog . ComponentError ( s . name , "flush failed:" , err . Error ( ) )
}
} )
}
2022-02-23 14:56:29 +01:00
s . writeApi . WritePoint (
2022-03-15 16:16:26 +01:00
m . ToPoint ( s . meta_as_tags ) ,
2022-02-23 14:56:29 +01:00
)
return nil
}
func ( s * InfluxAsyncSink ) Flush ( ) error {
2022-04-04 02:56:23 +02:00
cclog . ComponentDebug ( s . name , "Flushing" )
2022-02-23 14:56:29 +01:00
s . writeApi . Flush ( )
2022-04-04 02:56:23 +02:00
if s . customFlushInterval != 0 && s . flushTimer != nil {
s . flushTimer = nil
}
2022-02-23 14:56:29 +01:00
return nil
}
func ( s * InfluxAsyncSink ) Close ( ) {
cclog . ComponentDebug ( s . name , "Closing InfluxDB connection" )
s . writeApi . Flush ( )
s . client . Close ( )
}
func NewInfluxAsyncSink ( name string , config json . RawMessage ) ( Sink , error ) {
s := new ( InfluxAsyncSink )
2022-02-22 16:15:25 +01:00
s . name = fmt . Sprintf ( "InfluxSink(%s)" , name )
2022-02-10 09:43:02 +01:00
// Set default for maximum number of points sent to server in single request.
2022-04-01 17:26:56 +02:00
s . config . BatchSize = 0
s . influxRetryInterval = 0
//s.config.InfluxRetryInterval = "1s"
s . influxMaxRetryTime = 0
//s.config.InfluxMaxRetryTime = "168h"
s . config . InfluxMaxRetries = 0
s . config . InfluxExponentialBase = 0
s . config . FlushInterval = 0
2022-04-04 02:56:23 +02:00
s . config . CustomFlushInterval = ""
s . customFlushInterval = time . Duration ( 0 )
2022-04-04 11:48:54 +02:00
s . config . MaxRetryAttempts = 1
2022-03-11 13:43:03 +01:00
// Default retry intervals (in seconds)
// 1 2
// 2 4
// 4 8
// 8 16
// 16 32
// 32 64
// 64 128
// 128 256
// 256 512
// 512 1024
// 1024 2048
// 2048 4096
// 4096 8192
// 8192 16384
// 16384 32768
// 32768 65536
// 65536 131072
// 131072 262144
// 262144 524288
2022-02-10 09:43:02 +01:00
2022-02-07 16:51:46 +01:00
if len ( config ) > 0 {
err := json . Unmarshal ( config , & s . config )
if err != nil {
2022-02-23 14:56:29 +01:00
return nil , err
2022-02-07 16:51:46 +01:00
}
}
2022-05-06 11:44:57 +02:00
if len ( s . config . Port ) == 0 {
2023-07-17 15:20:12 +02:00
return nil , errors . New ( "missing port configuration required by InfluxSink" )
2022-05-06 11:44:57 +02:00
}
if len ( s . config . Database ) == 0 {
2023-07-17 15:20:12 +02:00
return nil , errors . New ( "missing database configuration required by InfluxSink" )
2022-05-06 11:44:57 +02:00
}
if len ( s . config . Organization ) == 0 {
2023-07-17 15:20:12 +02:00
return nil , errors . New ( "missing organization configuration required by InfluxSink" )
2022-05-06 11:44:57 +02:00
}
if len ( s . config . Password ) == 0 {
2023-07-17 15:20:12 +02:00
return nil , errors . New ( "missing password configuration required by InfluxSink" )
2022-02-07 16:51:46 +01:00
}
2022-03-15 16:16:26 +01:00
// Create lookup map to use meta infos as tags in the output metric
s . meta_as_tags = make ( map [ string ] bool )
for _ , k := range s . config . MetaAsTags {
s . meta_as_tags [ k ] = true
}
2022-02-09 11:08:50 +01:00
2022-03-11 13:43:03 +01:00
toUint := func ( duration string , def uint ) uint {
t , err := time . ParseDuration ( duration )
if err == nil {
return uint ( t . Milliseconds ( ) )
}
return def
}
s . influxRetryInterval = toUint ( s . config . InfluxRetryInterval , s . influxRetryInterval )
s . influxMaxRetryTime = toUint ( s . config . InfluxMaxRetryTime , s . influxMaxRetryTime )
2022-04-04 02:56:23 +02:00
// Use a own timer for calling Flush()
if len ( s . config . CustomFlushInterval ) > 0 {
t , err := time . ParseDuration ( s . config . CustomFlushInterval )
if err != nil {
return nil , fmt . Errorf ( "invalid duration in 'custom_flush_interval': %v" , err )
}
s . customFlushInterval = t
}
2022-02-09 11:08:50 +01:00
// Connect to InfluxDB server
2022-02-23 14:56:29 +01:00
if err := s . connect ( ) ; err != nil {
2022-02-25 13:51:52 +01:00
return nil , fmt . Errorf ( "unable to connect: %v" , err )
2022-02-23 14:56:29 +01:00
}
2022-02-09 11:08:50 +01:00
// Start background: Read from error channel
2022-02-07 16:51:46 +01:00
s . errors = s . writeApi . Errors ( )
go func ( ) {
for err := range s . errors {
cclog . ComponentError ( s . name , err . Error ( ) )
}
} ( )
2022-02-22 16:15:25 +01:00
return s , nil
}