diff --git a/go.mod b/go.mod index 41b6df0..e95b389 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ClusterCockpit/cc-metric-store -go 1.16 +go 1.18 require ( github.com/golang-jwt/jwt/v4 v4.0.0 diff --git a/internal/checkpoints/checkpoints.go b/internal/checkpoints/checkpoints.go new file mode 100644 index 0000000..4830baf --- /dev/null +++ b/internal/checkpoints/checkpoints.go @@ -0,0 +1,134 @@ +package checkpoints + +import ( + "encoding/binary" + "io" + "reflect" + "unsafe" + + "github.com/ClusterCockpit/cc-metric-store/internal/types" +) + +const writeBufferSize int = 8192 + +// A single checkpoint file is defined by this structure. +type Checkpoint struct { + Reserved []byte // Reserved for future use/metadata. + From, To uint64 // Time covered by this checkpoint. + Root *CheckpointLevel // Root of the internal tree structure. +} + +// A single level in the tree of the in-memory store is defined by this structure. +type CheckpointLevel struct { + Reserved []byte // Reserved for future use/metadata. + Metrics map[string]*Metrics // Payload... + Sublevels map[string]*CheckpointLevel // Child in the internal tree structure. +} + +// Payload/Data +type Metrics struct { + Reserved []byte // Reserved for future use/metadata. + Frequency uint64 // Frequency of measurements for this metric. + From uint64 // Timestamp of the first measurement of this metric. + Data []types.Float // Payload... +} + +func (c *Checkpoint) Serialize(w io.Writer) error { + // Write magic value (8 byte): + if _, err := w.Write([]byte("CCMSv1.0")); err != nil { + return err + } + + // The header: + buf := make([]byte, 0, writeBufferSize*2) + buf = encodeBytes(buf, c.Reserved) + buf = encodeUint64(buf, c.From) + buf = encodeUint64(buf, c.To) + + // The rest: + buf, err := c.Root.serialize(w, buf) + if err != nil { + return err + } + + if _, err := w.Write(buf); err != nil { + return err + } + + return nil +} + +func (c *CheckpointLevel) serialize(w io.Writer, buf []byte) ([]byte, error) { + // The reserved data: + buf = encodeBytes(buf, c.Reserved) + + var err error + // metrics: + buf = encodeUint32(buf, uint32(len(c.Metrics))) + for key, metric := range c.Metrics { + buf = encodeString(buf, key) + buf, err = metric.serialize(w, buf) + if err != nil { + return nil, err + } + + if len(buf) >= writeBufferSize { + if _, err = w.Write(buf); err != nil { + return nil, err + } + + buf = buf[0:] + } + } + + // Sublevels: + buf = encodeUint32(buf, uint32(len(c.Sublevels))) + for key, sublevel := range c.Sublevels { + buf = encodeString(buf, key) + buf, err = sublevel.serialize(w, buf) + if err != nil { + return nil, err + } + } + + return buf, nil +} + +func (m *Metrics) serialize(w io.Writer, buf []byte) ([]byte, error) { + buf = encodeBytes(m.Reserved, buf) // Reserved + buf = encodeUint64(buf, m.Frequency) // Frequency + buf = encodeUint64(buf, m.From) // First measurmenet timestamp + buf = encodeUint32(buf, uint32(len(m.Data))) // Number of elements + + var x types.Float + elmsize := unsafe.Sizeof(x) + sh := (*reflect.SliceHeader)(unsafe.Pointer(&m.Data)) + bytes := unsafe.Slice((*byte)(unsafe.Pointer(sh.Data)), sh.Len*int(elmsize)) + + buf = append(buf, bytes...) + return buf, nil +} + +func encodeBytes(buf []byte, bytes []byte) []byte { + buf = encodeUint32(buf, uint32(len(bytes))) + buf = append(buf, bytes...) + return buf +} + +func encodeString(buf []byte, str string) []byte { + buf = encodeUint32(buf, uint32(len(str))) + buf = append(buf, str...) + return buf +} + +func encodeUint64(buf []byte, i uint64) []byte { + buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0) // 8 bytes + binary.LittleEndian.PutUint64(buf, i) + return buf[8:] +} + +func encodeUint32(buf []byte, i uint32) []byte { + buf = append(buf, 0, 0, 0, 0) // 4 bytes + binary.LittleEndian.PutUint32(buf, i) + return buf[4:] +} diff --git a/internal/types/float.go b/internal/types/float.go new file mode 100644 index 0000000..a8c52ad --- /dev/null +++ b/internal/types/float.go @@ -0,0 +1,63 @@ +package types + +import ( + "math" + "strconv" +) + +// Go's JSON encoder for floats does not support NaN (https://github.com/golang/go/issues/3480). +// This program uses NaN as a signal for missing data. +// For the HTTP JSON API to be able to handle NaN values, +// we have to use our own type which implements encoding/json.Marshaler itself. +type Float float64 + +var NaN Float = Float(math.NaN()) +var nullAsBytes []byte = []byte("null") + +func (f Float) IsNaN() bool { + return math.IsNaN(float64(f)) +} + +func (f Float) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + return nullAsBytes, nil + } + + return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 3, 64), nil +} + +func (f *Float) UnmarshalJSON(input []byte) error { + if string(input) == "null" { + *f = NaN + return nil + } + + val, err := strconv.ParseFloat(string(input), 64) + if err != nil { + return err + } + *f = Float(val) + return nil +} + +// Same as `[]Float`, but can be marshaled to JSON with less allocations. +type FloatArray []Float + +func (fa FloatArray) MarshalJSON() ([]byte, error) { + buf := make([]byte, 0, 2+len(fa)*8) + buf = append(buf, '[') + for i := 0; i < len(fa); i++ { + if i != 0 { + buf = append(buf, ',') + } + + if fa[i].IsNaN() { + buf = append(buf, `null`...) + } else { + buf = strconv.AppendFloat(buf, float64(fa[i]), 'f', 3, 64) + + } + } + buf = append(buf, ']') + return buf, nil +}