Use RWLock instead of classical Mutex

This commit is contained in:
Lou Knauer 2021-09-01 08:47:57 +02:00
parent 63e45960e1
commit 61bc7df93a
2 changed files with 31 additions and 17 deletions

View File

@ -32,16 +32,16 @@ type ArchiveFile struct {
func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) {
levels := make([]*level, 0)
selectors := make([][]string, 0)
m.root.lock.Lock()
m.root.lock.RLock()
for sel1, l1 := range m.root.children {
l1.lock.Lock()
l1.lock.RLock()
for sel2, l2 := range l1.children {
levels = append(levels, l2)
selectors = append(selectors, []string{sel1, sel2})
}
l1.lock.Unlock()
l1.lock.RUnlock()
}
m.root.lock.Unlock()
m.root.lock.RUnlock()
for i := 0; i < len(levels); i++ {
dir := path.Join(archiveRoot, path.Join(selectors[i]...))
@ -55,8 +55,8 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error)
}
func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
l.lock.Lock()
defer l.lock.Unlock()
l.lock.RLock()
defer l.lock.RUnlock()
retval := &ArchiveFile{
From: from,

View File

@ -116,31 +116,43 @@ func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) {
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
// also hold data (in `metrics`).
type level struct {
lock sync.Mutex // There is performance to be gained by having different locks for `metrics` and `children` (Spinlock?).
lock sync.RWMutex //
metrics map[string]*buffer // Every level can store metrics.
children map[string]*level // Sub-granularities/nodes. Use `sync.Map`?
children map[string]*level // Lower levels.
}
// Caution: the lock of the returned level will be LOCKED.
// Find the correct level for the given selector, creating it if
// it does not exist. Example selector in the context of the
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" }
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`?
func (l *level) findLevelOrCreate(selector []string) *level {
l.lock.Lock()
if len(selector) == 0 {
return l
}
// Allow concurrent reads:
l.lock.RLock()
child, ok := l.children[selector[0]]
if !ok {
l.lock.RUnlock()
if ok {
return child.findLevelOrCreate(selector[1:])
}
// The level does not exist, take write lock for unqiue access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.
child, ok = l.children[selector[0]]
if ok {
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:])
}
child = &level{
metrics: make(map[string]*buffer),
children: make(map[string]*level),
}
l.children[selector[0]] = child
}
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:])
}
@ -238,6 +250,7 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
// Look at `findLevelOrCreate` for how selectors work.
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
l := m.root.findLevelOrCreate(selector)
l.lock.Lock()
defer l.lock.Unlock()
for _, metric := range metrics {
@ -268,7 +281,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) {
l := m.root.findLevelOrCreate(selector)
defer l.lock.Unlock()
l.lock.RLock()
defer l.lock.RUnlock()
if from > to {
return nil, 0, 0, errors.New("invalid time range")