diff --git a/internal/memstore/allocator.go b/internal/memstore/allocator.go index 271113c..b202726 100644 --- a/internal/memstore/allocator.go +++ b/internal/memstore/allocator.go @@ -1,7 +1,73 @@ package memstore -func RequestBytes(len int) []byte { - // TODO: Use mmap etc...! +import ( + "os" + "reflect" + "sync" + "unsafe" - return make([]byte, len) + "github.com/ClusterCockpit/cc-metric-store/internal/types" + "golang.org/x/sys/unix" +) + +const bufferSizeInFloats int = 512 +const bufferSizeInBytes int = bufferSizeInFloats * 8 + +// The allocator rarely used, so a single big lock should be fine! +var allocatorLock sync.Mutex +var allocatorPool [][]byte + +func RequestBytes(size int) []byte { + size = (size + bufferSizeInBytes - 1) / bufferSizeInBytes * bufferSizeInBytes + if size == bufferSizeInBytes { + allocatorLock.Lock() + if len(allocatorPool) > 0 { + bytes := allocatorPool[len(allocatorPool)-1] + allocatorPool = allocatorPool[:len(allocatorPool)-1] + allocatorLock.Unlock() + return bytes + } + allocatorLock.Unlock() + } + + pagesize := os.Getpagesize() + if size < pagesize || pagesize%size != 0 { + panic("page size and buffer size do not go with each other") + } + + bytes, err := unix.Mmap(-1, 0, size, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_ANONYMOUS) + if err != nil { + panic(err) + } + + return bytes +} + +func ReleaseBytes(bytes []byte) { + if cap(bytes)%bufferSizeInBytes != 0 { + panic("bytes released that do not match the buffer size constraints") + } + + allocatorLock.Lock() + defer allocatorLock.Unlock() + + n := cap(bytes) / bufferSizeInBytes + for i := 0; i < n; i++ { + chunk := bytes[i*bufferSizeInBytes : i*bufferSizeInBytes+bufferSizeInBytes] + allocatorPool = append(allocatorPool, chunk[0:0:bufferSizeInBytes]) + } +} + +func ReleaseFloatSlice(slice []types.Float) { + var x types.Float + sh := (*reflect.SliceHeader)(unsafe.Pointer(&slice)) + bytes := unsafe.Slice((*byte)(unsafe.Pointer(sh.Data)), sh.Cap*int(unsafe.Sizeof(x))) + ReleaseBytes(bytes) +} + +func RequestFloatSlice(size int) []types.Float { + var x types.Float + bytes := RequestBytes(size * int(unsafe.Sizeof(x))) + sh := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + return unsafe.Slice((*types.Float)(unsafe.Pointer(sh.Data)), size) } diff --git a/internal/checkpoints/checkpoints.go b/internal/memstore/checkpoints.go similarity index 98% rename from internal/checkpoints/checkpoints.go rename to internal/memstore/checkpoints.go index 98b04d1..eb79d75 100644 --- a/internal/checkpoints/checkpoints.go +++ b/internal/memstore/checkpoints.go @@ -1,4 +1,4 @@ -package checkpoints +package memstore import ( "bufio" @@ -7,7 +7,6 @@ import ( "reflect" "unsafe" - "github.com/ClusterCockpit/cc-metric-store/internal/memstore" "github.com/ClusterCockpit/cc-metric-store/internal/types" ) @@ -211,7 +210,7 @@ func (m *Metrics) deserialize(buf []byte, r *bufio.Reader) error { var x types.Float elmsize := unsafe.Sizeof(x) - bytes = memstore.RequestBytes(int(n) * int(elmsize)) + bytes = RequestBytes(int(n) * int(elmsize)) if _, err := io.ReadFull(r, bytes); err != nil { return fmt.Errorf("reading payload (n=%d, elmsize=%d): %w", n, elmsize, err) } diff --git a/internal/checkpoints/checkpoints_test.go b/internal/memstore/checkpoints_test.go similarity index 99% rename from internal/checkpoints/checkpoints_test.go rename to internal/memstore/checkpoints_test.go index 8fc6dd9..ff81d14 100644 --- a/internal/checkpoints/checkpoints_test.go +++ b/internal/memstore/checkpoints_test.go @@ -1,4 +1,4 @@ -package checkpoints +package memstore import ( "bufio"