mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-11-01 00:15:05 +01:00
Revert avro files writing logic
This commit is contained in:
@@ -28,52 +28,6 @@ var NumAvroWorkers int = 4
|
||||
|
||||
var ErrNoNewData error = errors.New("no data in the pool")
|
||||
|
||||
func UpdateAvroFile(f *os.File, insertCount int64) error {
|
||||
filePath := f.Name()
|
||||
f.Close() // close the original handle immediately
|
||||
|
||||
// Reopen fresh for reading
|
||||
readFile, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reopen file for reading: %v", err)
|
||||
}
|
||||
defer readFile.Close()
|
||||
|
||||
reader, err := goavro.NewOCFReader(readFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF reader: %v", err)
|
||||
}
|
||||
|
||||
codec := reader.Codec()
|
||||
|
||||
// Now reopen again for appending
|
||||
appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reopen file for appending: %v", err)
|
||||
}
|
||||
defer appendFile.Close()
|
||||
|
||||
recordList := make([]map[string]any, insertCount)
|
||||
for i := range recordList {
|
||||
recordList[i] = make(map[string]any)
|
||||
}
|
||||
|
||||
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
|
||||
W: appendFile,
|
||||
Codec: codec,
|
||||
CompressionName: goavro.CompressionDeflateLabel,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF writer: %v", err)
|
||||
}
|
||||
|
||||
if err := writer.Append(recordList); err != nil {
|
||||
return fmt.Errorf("failed to append record: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
|
||||
levels := make([]*AvroLevel, 0)
|
||||
selectors := make([][]string, 0)
|
||||
|
||||
@@ -492,32 +492,6 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
interval, err := time.ParseDuration(Keys.Checkpoints.Interval)
|
||||
if err != nil {
|
||||
fmt.Printf("error while parsing interval: %#v", err)
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix()
|
||||
|
||||
newCount := (min(now, cutOff) - fromTimestamp) / resolution
|
||||
|
||||
if recordCounter < newCount {
|
||||
// fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount)
|
||||
|
||||
insertCount := newCount - recordCounter
|
||||
for range insertCount {
|
||||
for key := range metricsData {
|
||||
metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0))
|
||||
}
|
||||
}
|
||||
|
||||
err := UpdateAvroFile(f, insertCount)
|
||||
if err != nil {
|
||||
fmt.Printf("error while inserting blanks into avro: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
for key, floatArray := range metricsData {
|
||||
metricName := ReplaceKey(key)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user