diff --git a/go.mod b/go.mod index 130f5cc..f381168 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( require ( github.com/PaesslerAG/gval v1.1.2 github.com/golang/protobuf v1.5.2 // indirect + github.com/mattn/go-sqlite3 v1.14.11 github.com/nats-io/nats-server/v2 v2.7.0 // indirect google.golang.org/protobuf v1.27.1 // indirect ) diff --git a/go.sum b/go.sum index 44be790..2504160 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-sqlite3 v1.14.11 h1:gt+cp9c0XGqe9S/wAHTL3n/7MqY+siPWgWJgqdsFrzQ= +github.com/mattn/go-sqlite3 v1.14.11/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= diff --git a/sinks/sqliteSink.go b/sinks/sqliteSink.go index b050ff7..343f5e3 100644 --- a/sinks/sqliteSink.go +++ b/sinks/sqliteSink.go @@ -2,20 +2,32 @@ package sinks import ( "database/sql" + "errors" "fmt" - _ "github.com/mattn/go-sqlite3" "log" "sort" "strings" - "time" - "errors" - lp "github.com/influxdata/line-protocol" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + _ "github.com/mattn/go-sqlite3" ) +const SQLITE3_TIMESTAMP_NAME = `timestamp` +const SQLITE3_TIMESTAMP_TYPE = `TIMESTAMP NOT NULL` + +type SqliteTable struct { + columns []string + coltypes []string + createQuery string + insertQuery string + primkeys []string +} + type SqliteSink struct { - Sink - db *sql.DB - columns map[string][]string + sink + db *sql.DB + tables map[string]SqliteTable } type StrList []string @@ -35,10 +47,10 @@ func (list StrList) Less(i, j int) bool { return si_lower < sj_lower } -func (s *SqliteSink) Init(config SinkConfig) error { +func (s *SqliteSink) Init(config sinkConfig) error { var err error if len(config.Database) == 0 { - return errors.New("Not all configuration variables set required by SqliteSink") + return errors.New("not all configuration variables set required by SqliteSink") } s.host = config.Host s.port = config.Port @@ -47,100 +59,187 @@ func (s *SqliteSink) Init(config SinkConfig) error { s.user = config.User s.password = config.Password log.Print("Opening Sqlite3 database ", s.database) - s.db, err = sql.Open("sqlite3", fmt.Sprintf("./%s.db", s.database)) + uri := fmt.Sprintf("file:./%s.db", s.database) + if len(s.user) > 0 && len(s.password) > 0 { + uri += fmt.Sprintf("?_auth&_auth_user=%s&_auth_pass=%s", s.user, s.password) + } + s.db, err = sql.Open("sqlite3", uri) if err != nil { log.Fatal(err) s.db = nil return err } + s.tables = make(map[string]SqliteTable) return nil } -func (s *SqliteSink) PrepareLists(measurement string, tags map[string]string, fields map[string]interface{}, t time.Time) ([]string, []string, []string) { +func getkeylist(point lp.CCMetric, include_meta bool) []string { keys := make([]string, 0) - values := make([]string, 0) - keytype := make([]string, 0) - - keys = append(keys, "time") - values = append(values, fmt.Sprintf("%d", t.Unix())) - keytype = append(keytype, fmt.Sprintf("time INT8")) - for k, v := range tags { + for k := range point.Tags() { keys = append(keys, k) - values = append(values, fmt.Sprintf("%q", v)) - keytype = append(keytype, fmt.Sprintf("%s TEXT", k)) } - for k, v := range fields { - keys = append(keys, k) - switch v.(type) { - case float32: - keytype = append(keytype, fmt.Sprintf("%s FLOAT", k)) - values = append(values, fmt.Sprintf("%f", v)) - case float64: - keytype = append(keytype, fmt.Sprintf("%s DOUBLE", k)) - values = append(values, fmt.Sprintf("%f", v)) - case int64: - keytype = append(keytype, fmt.Sprintf("%s INT8", k)) - values = append(values, fmt.Sprintf("%d", v)) - case int: - keytype = append(keytype, fmt.Sprintf("%s INT", k)) - values = append(values, fmt.Sprintf("%d", v)) - case string: - keytype = append(keytype, fmt.Sprintf("%s TEXT", k)) - values = append(values, fmt.Sprintf("%q", v)) + if include_meta { + for k := range point.Meta() { + keys = append(keys, k) } } - sort.Sort(StrList(keytype)) - return keytype, keys, values -} - -func Tags2Map(metric lp.MutableMetric) map[string]string { - tags := make(map[string]string) - for _, t := range metric.TagList() { - tags[t.Key] = t.Value + for k := range point.Fields() { + keys = append(keys, k) } - return tags + keys = append(keys, SQLITE3_TIMESTAMP_NAME) + sort.Sort(StrList(keys)) + return keys } -func Fields2Map(metric lp.MutableMetric) map[string]interface{} { - fields := make(map[string]interface{}) - for _, f := range metric.FieldList() { - fields[f.Key] = f.Value +func getvaluelist(point lp.CCMetric, keys []string) []string { + values := make([]string, 0) + for _, key := range keys { + if key == SQLITE3_TIMESTAMP_NAME { + values = append(values, point.Time().String()) + } else if val, ok := point.GetTag(key); ok { + values = append(values, val) + } else if val, ok := point.GetMeta(key); ok { + values = append(values, val) + } else if ival, ok := point.GetField(key); ok { + values = append(values, fmt.Sprintf("%v", ival)) + } else { + values = append(values, "NULL") + } } - return fields + return values +} +func gettypelist(point lp.CCMetric, keys []string) []string { + types := make([]string, 0) + for _, key := range keys { + if key == SQLITE3_TIMESTAMP_NAME { + types = append(types, SQLITE3_TIMESTAMP_TYPE) + continue + } + if point.HasTag(key) { + types = append(types, "TEXT") + continue + } + if point.HasMeta(key) { + types = append(types, "TEXT") + continue + } + ival, ok := point.GetField(key) + if ok { + switch ival.(type) { + case float64: + types = append(types, "DOUBLE") + case float32: + types = append(types, "FLOAT") + case string: + types = append(types, "TEXT") + case int: + types = append(types, "INT") + case int64: + types = append(types, "INT8") + } + } + } + return types } -func (s *SqliteSink) Write(point lp.MutableMetric) error { - if s.db != nil { - measurement := point.Name() - keytype, keys, values := s.PrepareLists(measurement, Tags2Map(point), Fields2Map(point), point.Time()) - prim_key := []string{"time", "host"} - if measurement == "cpu" { - prim_key = append(prim_key, "cpu") - } else if measurement == "socket" { - prim_key = append(prim_key, "socket") - } - tx, err := s.db.Begin() - if err == nil { - c := fmt.Sprintf("create table if not exists %s (%s, PRIMARY KEY (%s));", measurement, - strings.Join(keytype, ","), - strings.Join(prim_key, ",")) - i := fmt.Sprintf("insert into %s (%s) values(%s);", measurement, strings.Join(keys, ","), strings.Join(values, ",")) - _, err = tx.Exec(c) - if err != nil { - log.Println(err) - } - _, err = tx.Exec(i) - if err != nil { - log.Println(err) - } - err = tx.Commit() - if err != nil { - log.Println(err) - } - } else { - log.Println(err) - } +func getprimkey(keys []string) []string { + primkeys := make([]string, 0) + primkeys = append(primkeys, SQLITE3_TIMESTAMP_NAME) + for _, key := range keys { + switch key { + case "hostname": + primkeys = append(primkeys, "hostname") + case "type": + primkeys = append(primkeys, "type") + case "type-id": + primkeys = append(primkeys, "type-id") + } + } + return primkeys +} + +func newCreateQuery(tablename string, keys []string, types []string, primkeys []string) string { + + keytypelist := make([]string, 0) + for i, key := range keys { + keytypelist = append(keytypelist, fmt.Sprintf("%s %s", key, types[i])) + } + keytypelist = append(keytypelist, fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(primkeys, ","))) + stmt := fmt.Sprintf("create table if not exists %s (%s);", tablename, keytypelist) + return stmt +} + +func newInsertQuery(tablename string, keys []string) string { + v := strings.Repeat("?,", len(keys)) + "?" + stmt := fmt.Sprintf("insert into %s (%s) values(%s);", tablename, strings.Join(keys, ","), v) + return stmt +} + +func (s *SqliteSink) Write(point lp.CCMetric) error { + + if s.db != nil { + measurement := point.Name() + if tab, ok := s.tables[measurement]; !ok { + var tab SqliteTable + tab.columns = getkeylist(point, s.meta_as_tags) + tab.coltypes = gettypelist(point, tab.columns) + tab.primkeys = getprimkey(tab.columns) + tab.createQuery = newCreateQuery(measurement, tab.columns, tab.coltypes, tab.primkeys) + tab.insertQuery = newInsertQuery(measurement, tab.columns) + + tx, err := s.db.Begin() + if err != nil { + cclog.ComponentError("SqliteSink", "Init DB session failed:", err.Error()) + return err + } + _, err = tx.Exec(tab.createQuery) + if err != nil { + cclog.ComponentError("SqliteSink", "Execute CreateQuery failed:", err.Error()) + return err + } + stmt, err := tx.Prepare(tab.insertQuery) + if err != nil { + cclog.ComponentError("SqliteSink", "Prepare InsertQuery failed:", err.Error()) + return err + } + defer stmt.Close() + _, err = stmt.Exec(getvaluelist(point, tab.columns)) + if err != nil { + cclog.ComponentError("SqliteSink", "Execute InsertQuery failed:", err.Error()) + return err + } + tx.Commit() + s.tables[measurement] = tab + + } else { + + keys := getkeylist(point, s.meta_as_tags) + if len(keys) > len(tab.columns) { + cclog.ComponentDebug("SqliteSink", "Metric", measurement, "has different keys as creation keys, ignoring addition keys") + } else if len(keys) < len(tab.columns) { + cclog.ComponentDebug("SqliteSink", "Metric", measurement, "has different keys as creation keys, setting missing values with 'NULL'") + } + values := getvaluelist(point, tab.columns) + tx, err := s.db.Begin() + if err != nil { + cclog.ComponentError("SqliteSink", "Init DB session failed:", err.Error()) + return err + } + stmt, err := tx.Prepare(tab.insertQuery) + if err != nil { + cclog.ComponentError("SqliteSink", "Prepare InsertQuery failed:", err.Error()) + return err + } + defer stmt.Close() + _, err = stmt.Exec(values) + if err != nil { + cclog.ComponentError("SqliteSink", "Execute InsertQuery failed:", err.Error()) + return err + } + tx.Commit() + + } } return nil } @@ -150,8 +249,8 @@ func (s *SqliteSink) Flush() error { } func (s *SqliteSink) Close() { - log.Print("Closing Sqlite3 database ", s.database) - if s.db != nil { - s.db.Close() - } + log.Print("Closing Sqlite3 database ", s.database) + if s.db != nil { + s.db.Close() + } }