From a03eb315f5d2ac59e6c7bebaa22682e12cc7a813 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Wed, 14 May 2025 12:24:58 +0200 Subject: [PATCH] AvroWriter advanced cases --- go.mod | 2 + go.sum | 10 + internal/avro/avroCheckpoint.go | 401 +++++++++++++++++++++++++++++ internal/avro/avroHelper.go | 14 +- internal/avro/avroStruct.go | 50 +++- internal/memorystore/checkpoint.go | 21 +- 6 files changed, 482 insertions(+), 16 deletions(-) create mode 100644 internal/avro/avroCheckpoint.go diff --git a/go.mod b/go.mod index e443edd..420c9a8 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,10 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/spec v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect + github.com/golang/snappy v0.0.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/linkedin/goavro/v2 v2.13.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect diff --git a/go.sum b/go.sum index ec38174..c029ae9 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6Xge github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= @@ -19,6 +20,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -44,6 +47,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I= +github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= @@ -59,6 +64,10 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE= @@ -117,6 +126,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/avro/avroCheckpoint.go b/internal/avro/avroCheckpoint.go new file mode 100644 index 0000000..f7d7c5f --- /dev/null +++ b/internal/avro/avroCheckpoint.go @@ -0,0 +1,401 @@ +package avro + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "os" + "path" + "sort" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" + "github.com/linkedin/goavro/v2" +) + +var NumWorkers int = 4 + +var ErrNoNewData error = errors.New("no data in the pool") + +func (as *AvroStore) ToCheckpoint(dir string) (int, error) { + levels := make([]*AvroLevel, 0) + selectors := make([][]string, 0) + as.root.lock.RLock() + // Cluster + for sel1, l1 := range as.root.children { + l1.lock.RLock() + // Node + for sel2, l2 := range l1.children { + l2.lock.RLock() + // Frequency + for sel3, l3 := range l1.children { + levels = append(levels, l3) + selectors = append(selectors, []string{sel1, sel2, sel3}) + } + l2.lock.RUnlock() + } + l1.lock.RUnlock() + } + as.root.lock.RUnlock() + + type workItem struct { + level *AvroLevel + dir string + selector []string + } + + n, errs := int32(0), int32(0) + + var wg sync.WaitGroup + wg.Add(NumWorkers) + work := make(chan workItem, NumWorkers*2) + for range NumWorkers { + go func() { + defer wg.Done() + + for workItem := range work { + var from int64 = getTimestamp(workItem.dir) + + if err := workItem.level.toCheckpoint(workItem.dir, from); err != nil { + if err == ErrNoNewData { + continue + } + + log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } + } + }() + } + + for i := range len(levels) { + dir := path.Join(dir, path.Join(selectors[i]...)) + work <- workItem{ + level: levels[i], + dir: dir, + selector: selectors[i], + } + } + + close(work) + wg.Wait() + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +// getTimestamp returns the timestamp from the directory name +func getTimestamp(dir string) int64 { + // Extract the timestamp from the directory name + // The existing avro file will be in epoch timestamp format + // iterate over all the files in the directory and find the maximum timestamp + // and return it + files, err := os.ReadDir(dir) + if err != nil { + return 0 + } + var maxTs int64 = 0 + + if len(files) == 0 { + return 0 + } + + for _, file := range files { + if file.IsDir() { + continue + } + name := file.Name() + if len(name) < 5 { + continue + } + ts, err := strconv.ParseInt(name[:len(name)-5], 10, 64) + if err != nil { + continue + } + if ts > maxTs { + maxTs = ts + } + } + + return maxTs +} + +func (l *AvroLevel) toCheckpoint(dir string, from int64) error { + + fmt.Printf("Checkpointing directory: %s\n", dir) + + // find smallest overall timestamp in l.data map and delete it from l.data + var minTs int64 = int64(1<<63 - 1) + for ts := range l.data { + if ts < minTs { + minTs = ts + } + } + + if from == 0 { + from = minTs + } + + var schema string + var codec *goavro.Codec + record_list := make([]map[string]interface{}, 0) + + var f *os.File + + filePath := path.Join(dir, fmt.Sprintf("%d.avro", from)) + + if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { + err = os.MkdirAll(dir, 0o755) + if err == nil { + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("failed to create new avro file: %v", err) + } + } + } else { + f, err = os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open existing avro file: %v", err) + } + reader, err := goavro.NewOCFReader(f) + if err != nil { + return fmt.Errorf("failed to create OCF reader: %v", err) + } + schema = reader.Codec().Schema() + + f.Close() + + f, err = os.OpenFile(filePath, os.O_APPEND|os.O_RDWR, 0o644) + if err != nil { + log.Fatalf("Failed to create file: %v", err) + } + } + defer f.Close() + + time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes) * time.Minute).Unix() + + for ts := range l.data { + if ts < time_ref { + schema_gen, err := generateSchema(l.data[ts]) + if err != nil { + return err + } + + flag, schema, err := compareSchema(schema, schema_gen) + if err != nil { + log.Fatalf("Failed to compare read and generated schema: %v", err) + } + if flag { + codec, err = goavro.NewCodec(schema) + if err != nil { + log.Fatalf("Failed to create codec after merged schema: %v", err) + } + + f.Close() + + f, err = os.Open(filePath) + if err != nil { + log.Fatalf("Failed to open Avro file: %v", err) + } + + ocfReader, err := goavro.NewOCFReader(f) + if err != nil { + log.Fatalf("Failed to create OCF reader: %v", err) + } + + for ocfReader.Scan() { + record, err := ocfReader.Read() + if err != nil { + log.Fatalf("Failed to read record: %v", err) + } + + record_list = append(record_list, record.(map[string]interface{})) + } + + f.Close() + + err = os.Remove(filePath) + if err != nil { + log.Fatalf("Failed to delete file: %v", err) + } + + f, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0o644) + if err != nil { + log.Fatalf("Failed to create file after deleting : %v", err) + } + } + + record_list = append(record_list, generateRecord(l.data[ts])) + delete(l.data, minTs) + } + } + + if len(record_list) == 0 { + return ErrNoNewData + } + + writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: f, + Codec: codec, + Schema: schema, + }) + if err != nil { + log.Fatalf("Failed to create OCF writer: %v", err) + } + + // Append the new record + if err := writer.Append(record_list); err != nil { + log.Fatalf("Failed to append record: %v", err) + } + + return nil +} + +func compareSchema(schemaRead, schemaGen string) (bool, string, error) { + var genSchema, readSchema AvroSchema + + if schemaRead == "" { + return false, schemaGen, nil + } + + // Unmarshal the schema strings into AvroSchema structs + if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil { + return false, "", fmt.Errorf("failed to parse generated schema: %v", err) + } + if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil { + return false, "", fmt.Errorf("failed to parse read schema: %v", err) + } + + sort.Slice(genSchema.Fields, func(i, j int) bool { + return genSchema.Fields[i].Name < genSchema.Fields[j].Name + }) + + sort.Slice(readSchema.Fields, func(i, j int) bool { + return readSchema.Fields[i].Name < readSchema.Fields[j].Name + }) + + // Check if schemas are identical + schemasEqual := true + if len(genSchema.Fields) <= len(readSchema.Fields) { + + for i := range genSchema.Fields { + if genSchema.Fields[i].Name != readSchema.Fields[i].Name { + schemasEqual = false + break + } + } + + // If schemas are identical, return the read schema + if schemasEqual { + return false, schemaRead, nil + } + } + + // Create a map to hold unique fields from both schemas + fieldMap := make(map[string]AvroField) + + // Add fields from the read schema + for _, field := range readSchema.Fields { + fieldMap[field.Name] = field + } + + // Add or update fields from the generated schema + for _, field := range genSchema.Fields { + fieldMap[field.Name] = field + } + + // Create a union schema by collecting fields from the map + var mergedFields []AvroField + for _, field := range fieldMap { + mergedFields = append(mergedFields, field) + } + + // Sort fields by name for consistency + sort.Slice(mergedFields, func(i, j int) bool { + return mergedFields[i].Name < mergedFields[j].Name + }) + + // Create the merged schema + mergedSchema := AvroSchema{ + Type: "record", + Name: genSchema.Name, + Fields: mergedFields, + } + + // Check if schemas are identical + schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields) + if schemasEqual { + for i := range mergedSchema.Fields { + if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name { + schemasEqual = false + break + } + } + + if schemasEqual { + return false, schemaRead, nil + } + } + + // Marshal the merged schema back to JSON + mergedSchemaJson, err := json.Marshal(mergedSchema) + if err != nil { + return false, "", fmt.Errorf("failed to marshal merged schema: %v", err) + } + + fmt.Printf("Merged Schema: %s\n", string(mergedSchemaJson)) + fmt.Printf("Read Schema: %s\n", schemaRead) + + return true, string(mergedSchemaJson), nil +} + +func generateSchema(data map[string]util.Float) (string, error) { + + // Define the Avro schema structure + schema := map[string]interface{}{ + "type": "record", + "name": "DataRecord", + "fields": []map[string]interface{}{}, + } + + fieldTracker := make(map[string]struct{}) + + for key := range data { + if _, exists := fieldTracker[key]; !exists { + field := map[string]interface{}{ + "name": key, + "type": "double", // Allows null or float + "default": 0.0, + } + schema["fields"] = append(schema["fields"].([]map[string]interface{}), field) + fieldTracker[key] = struct{}{} + } + } + + schemaString, err := json.Marshal(schema) + if err != nil { + return "", fmt.Errorf("failed to marshal schema: %v", err) + } + + return string(schemaString), nil +} +func generateRecord(data map[string]util.Float) map[string]interface{} { + + record := make(map[string]interface{}) + + // Iterate through each map in data + for key, value := range data { + // Set the value in the record + record[key] = value + } + + return record +} diff --git a/internal/avro/avroHelper.go b/internal/avro/avroHelper.go index 52f696f..1da8125 100644 --- a/internal/avro/avroHelper.go +++ b/internal/avro/avroHelper.go @@ -34,6 +34,18 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { continue } + metricName := "" + + for i, name := range val.Selector { + if i == 0 { + metricName += name + } else { + metricName += "_" + name + } + } + + metricName += val.MetricName + // Create a new selector for the Avro level // The selector is a slice of strings that represents the path to the // Avro level. It is created by appending the cluster, node, and metric @@ -52,7 +64,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) { copy(oldSelector, selector) } - avroLevel.addMetric(val.MetricName, val.Value, val.Timestamp) + avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq)) } } }() diff --git a/internal/avro/avroStruct.go b/internal/avro/avroStruct.go index 27f9876..e016c7d 100644 --- a/internal/avro/avroStruct.go +++ b/internal/avro/avroStruct.go @@ -36,6 +36,18 @@ type AvroLevel struct { lock sync.RWMutex } +type AvroField struct { + Name string `json:"name"` + Type interface{} `json:"type"` + Default interface{} `json:"default,omitempty"` +} + +type AvroSchema struct { + Type string `json:"type"` + Name string `json:"name"` + Fields []AvroField `json:"fields"` +} + func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { if len(selector) == 0 { return l @@ -82,28 +94,42 @@ func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel { return child.findAvroLevelOrCreate(selector[1:]) } -func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64) { +func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64, Freq int) { l.lock.Lock() defer l.lock.Unlock() - // Create a key value for the first time - if len(l.data) == 0 { - l.data[timestamp] = make(map[string]util.Float, 0) - l.data[timestamp][metricName] = value - fmt.Printf("Creating new timestamp because no data exists\n") + KeyCounter := int(CheckpointBufferMinutes * 60 / Freq) + + // Create keys in advance for the given amount of time + if len(l.data) != KeyCounter { + if len(l.data) == 0 { + for i := range KeyCounter { + l.data[timestamp+int64(i*Freq)] = make(map[string]util.Float, 0) + } + } else { + //Get the last timestamp + var lastTs int64 + for ts := range l.data { + if ts > lastTs { + lastTs = ts + } + } + // Create keys for the next KeyCounter timestamps + l.data[lastTs+int64(Freq)] = make(map[string]util.Float, 0) + } + fmt.Printf("Creating timestamp keys to store key-value\n") } // Iterate over timestamps and choose the one which is within range. // Since its epoch time, we check if the difference is less than 60 seconds. for ts := range l.data { - if (ts - timestamp) < 60 { + if (ts - timestamp) < int64(Freq) { l.data[ts][metricName] = value return } } - - // Create a new timestamp if none is found - l.data[timestamp] = make(map[string]util.Float, 0) - l.data[timestamp][metricName] = value - +} + +func GetAvroStore() *AvroStore { + return &avroStore } diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go index afd6cf3..2b161e7 100644 --- a/internal/memorystore/checkpoint.go +++ b/internal/memorystore/checkpoint.go @@ -41,8 +41,9 @@ type CheckpointFile struct { var lastCheckpoint time.Time func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { + lastCheckpoint = time.Now() + if config.Keys.Checkpoints.FileFormat == "json" { - lastCheckpoint = time.Now() ms := GetMemoryStore() go func() { @@ -82,11 +83,13 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } else { go func() { defer wg.Done() - d, err := time.ParseDuration("1m") + d, _ := time.ParseDuration("1m") + + d_cp, err := time.ParseDuration(config.Keys.Checkpoints.Interval) if err != nil { log.Fatal(err) } - if d <= 0 { + if d_cp <= 0 { return } @@ -95,6 +98,7 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { return case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute): // This is the first tick untill we collect the data for given minutes. + avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir) } ticks := func() <-chan time.Time { @@ -103,12 +107,23 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { } return time.NewTicker(d).C }() + + ticks_cp := func() <-chan time.Time { + if d_cp <= 0 { + return nil + } + return time.NewTicker(d_cp).C + }() + for { select { case <-ctx.Done(): return + case <-ticks_cp: + lastCheckpoint = time.Now() case <-ticks: // Regular ticks of 1 minute to write data. + avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir) } } }()