diff --git a/archive.go b/archive.go index 2b67a0c..6a18f8d 100644 --- a/archive.go +++ b/archive.go @@ -32,16 +32,16 @@ type ArchiveFile struct { func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) { levels := make([]*level, 0) selectors := make([][]string, 0) - m.root.lock.Lock() + m.root.lock.RLock() for sel1, l1 := range m.root.children { - l1.lock.Lock() + l1.lock.RLock() for sel2, l2 := range l1.children { levels = append(levels, l2) selectors = append(selectors, []string{sel1, sel2}) } - l1.lock.Unlock() + l1.lock.RUnlock() } - m.root.lock.Unlock() + m.root.lock.RUnlock() for i := 0; i < len(levels); i++ { dir := path.Join(archiveRoot, path.Join(selectors[i]...)) @@ -55,8 +55,8 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) } func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { - l.lock.Lock() - defer l.lock.Unlock() + l.lock.RLock() + defer l.lock.RUnlock() retval := &ArchiveFile{ From: from, diff --git a/memstore.go b/memstore.go index 9e37ea7..ee79492 100644 --- a/memstore.go +++ b/memstore.go @@ -116,31 +116,43 @@ func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { // Can be both a leaf or a inner node. In this tree structue, inner nodes can // also hold data (in `metrics`). type level struct { - lock sync.Mutex // There is performance to be gained by having different locks for `metrics` and `children` (Spinlock?). + lock sync.RWMutex // metrics map[string]*buffer // Every level can store metrics. - children map[string]*level // Sub-granularities/nodes. Use `sync.Map`? + children map[string]*level // Lower levels. } -// Caution: the lock of the returned level will be LOCKED. // Find the correct level for the given selector, creating it if // it does not exist. Example selector in the context of the // ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" } // This function would probably benefit a lot from `level.children` beeing a `sync.Map`? func (l *level) findLevelOrCreate(selector []string) *level { - l.lock.Lock() if len(selector) == 0 { return l } + // Allow concurrent reads: + l.lock.RLock() child, ok := l.children[selector[0]] - if !ok { - child = &level{ - metrics: make(map[string]*buffer), - children: make(map[string]*level), - } - l.children[selector[0]] = child + l.lock.RUnlock() + if ok { + return child.findLevelOrCreate(selector[1:]) } + // The level does not exist, take write lock for unqiue access: + l.lock.Lock() + // While this thread waited for the write lock, another thread + // could have created the child node. + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:]) + } + + child = &level{ + metrics: make(map[string]*buffer), + children: make(map[string]*level), + } + l.children[selector[0]] = child l.lock.Unlock() return child.findLevelOrCreate(selector[1:]) } @@ -238,6 +250,7 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { l := m.root.findLevelOrCreate(selector) + l.lock.Lock() defer l.lock.Unlock() for _, metric := range metrics { @@ -268,7 +281,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) { l := m.root.findLevelOrCreate(selector) - defer l.lock.Unlock() + l.lock.RLock() + defer l.lock.RUnlock() if from > to { return nil, 0, 0, errors.New("invalid time range")