mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-31 07:55:06 +01:00 
			
		
		
		
	Fix to avro reader
This commit is contained in:
		| @@ -78,7 +78,7 @@ | |||||||
|   ], |   ], | ||||||
|   "metric-store": { |   "metric-store": { | ||||||
|     "checkpoints": { |     "checkpoints": { | ||||||
|       "file-format": "json", |       "file-format": "avro", | ||||||
|       "interval": "1h", |       "interval": "1h", | ||||||
|       "directory": "./var/checkpoints", |       "directory": "./var/checkpoints", | ||||||
|       "restore": "48h" |       "restore": "48h" | ||||||
|   | |||||||
| @@ -28,6 +28,52 @@ var NumAvroWorkers int = 4 | |||||||
|  |  | ||||||
| var ErrNoNewData error = errors.New("no data in the pool") | var ErrNoNewData error = errors.New("no data in the pool") | ||||||
|  |  | ||||||
|  | func UpdateAvroFile(f *os.File, insertCount int64) error { | ||||||
|  | 	filePath := f.Name() | ||||||
|  | 	f.Close() // close the original handle immediately | ||||||
|  |  | ||||||
|  | 	// Reopen fresh for reading | ||||||
|  | 	readFile, err := os.Open(filePath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to reopen file for reading: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer readFile.Close() | ||||||
|  |  | ||||||
|  | 	reader, err := goavro.NewOCFReader(readFile) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to create OCF reader: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	codec := reader.Codec() | ||||||
|  |  | ||||||
|  | 	// Now reopen again for appending | ||||||
|  | 	appendFile, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND, 0o644) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to reopen file for appending: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer appendFile.Close() | ||||||
|  |  | ||||||
|  | 	recordList := make([]map[string]any, insertCount) | ||||||
|  | 	for i := range recordList { | ||||||
|  | 		recordList[i] = make(map[string]any) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	writer, err := goavro.NewOCFWriter(goavro.OCFConfig{ | ||||||
|  | 		W:               appendFile, | ||||||
|  | 		Codec:           codec, | ||||||
|  | 		CompressionName: goavro.CompressionDeflateLabel, | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to create OCF writer: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := writer.Append(recordList); err != nil { | ||||||
|  | 		return fmt.Errorf("failed to append record: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { | func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) { | ||||||
| 	levels := make([]*AvroLevel, 0) | 	levels := make([]*AvroLevel, 0) | ||||||
| 	selectors := make([][]string, 0) | 	selectors := make([][]string, 0) | ||||||
|   | |||||||
| @@ -7,7 +7,6 @@ package memorystore | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" |  | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-lib/schema" | 	"github.com/ClusterCockpit/cc-lib/schema" | ||||||
| @@ -188,8 +187,6 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6 | |||||||
| 		i++ | 		i++ | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fmt.Printf("Given From : %d, To: %d\n", from, to) |  | ||||||
|  |  | ||||||
| 	return data[:i], from, t, nil | 	return data[:i], from, t, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -492,6 +492,32 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { | |||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	interval, err := time.ParseDuration(Keys.Checkpoints.Interval) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Printf("error while parsing interval: %#v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	now := time.Now().Unix() | ||||||
|  | 	cutOff := time.Unix(fromTimestamp, 0).Add(interval).Unix() | ||||||
|  |  | ||||||
|  | 	newCount := (min(now, cutOff) - fromTimestamp) / resolution | ||||||
|  |  | ||||||
|  | 	if recordCounter < newCount { | ||||||
|  | 		// fmt.Printf("Record Count: %d, Required Count: %d\n", recordCounter, newCount) | ||||||
|  |  | ||||||
|  | 		insertCount := newCount - recordCounter | ||||||
|  | 		for range insertCount { | ||||||
|  | 			for key := range metricsData { | ||||||
|  | 				metricsData[key] = append(metricsData[key], schema.ConvertToFloat(0.0)) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		err := UpdateAvroFile(f, insertCount) | ||||||
|  | 		if err != nil { | ||||||
|  | 			fmt.Printf("error while inserting blanks into avro: %s\n", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	for key, floatArray := range metricsData { | 	for key, floatArray := range metricsData { | ||||||
| 		metricName := ReplaceKey(key) | 		metricName := ReplaceKey(key) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -10,7 +10,6 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" |  | ||||||
| 	"os" | 	"os" | ||||||
| 	"os/signal" | 	"os/signal" | ||||||
| 	"runtime" | 	"runtime" | ||||||
| @@ -387,8 +386,6 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso | |||||||
|  |  | ||||||
| 	n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) | 	n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1) | ||||||
|  |  | ||||||
| 	fmt.Printf("Requested From : %d, To: %d\n", from, to) |  | ||||||
|  |  | ||||||
| 	err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { | 	err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { | ||||||
| 		cdata, cfrom, cto, err := b.read(from, to, data) | 		cdata, cfrom, cto, err := b.read(from, to, data) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user