diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index 13f7c95388..2ee4474aea 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -69,6 +69,7 @@ import ( "io" "os" "sort" + "sync" "time" ) @@ -183,10 +184,15 @@ func NewDirectIndex() TSMIndex { // directIndex is a simple in-memory index implementation for a TSM file. The full index // must fit in memory. type directIndex struct { + mu sync.RWMutex + blocks map[string]indexEntries } func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) { + d.mu.Lock() + defer d.mu.Unlock() + d.blocks[key] = append(d.blocks[key], &IndexEntry{ MinTime: minTime, MaxTime: maxTime, @@ -196,10 +202,16 @@ func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64, } func (d *directIndex) Entries(key string) []*IndexEntry { + d.mu.RLock() + defer d.mu.RUnlock() + return d.blocks[key] } func (d *directIndex) Entry(key string, t time.Time) *IndexEntry { + d.mu.RLock() + defer d.mu.RUnlock() + entries := d.Entries(key) for _, entry := range entries { if entry.Contains(t) { @@ -218,10 +230,16 @@ func (d *directIndex) ContainsValue(key string, t time.Time) bool { } func (d *directIndex) Delete(key string) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.blocks, key) } func (d *directIndex) Keys() []string { + d.mu.RLock() + defer d.mu.RUnlock() + var keys []string for k := range d.blocks { keys = append(keys, k) @@ -249,6 +267,9 @@ func (d *directIndex) Write(w io.Writer) error { } func (d *directIndex) MarshalBinary() ([]byte, error) { + d.mu.RLock() + defer d.mu.RUnlock() + // Index blocks are writtens sorted by key var keys []string for k := range d.blocks { @@ -288,6 +309,9 @@ func (d *directIndex) MarshalBinary() ([]byte, error) { } func (d *directIndex) UnmarshalBinary(b []byte) error { + d.mu.Lock() + defer d.mu.Unlock() + var pos int for pos < len(b) { n, key, err := d.readKey(b[pos:]) @@ -613,6 +637,8 @@ func (t *tsmWriter) Close() error { } type tsmReader struct { + mu sync.Mutex + r io.ReadSeeker indexStart, indexEnd int64 index TSMIndex @@ -692,6 +718,9 @@ func (t *tsmReader) applyTombstones() error { } func (t *tsmReader) Path() string { + t.mu.Lock() + defer t.mu.Unlock() + if f, ok := t.r.(*os.File); ok { return f.Name() } @@ -703,6 +732,9 @@ func (t *tsmReader) Keys() []string { } func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) { + t.mu.Lock() + defer t.mu.Unlock() + block := t.index.Entry(key, timestamp) if block == nil { return nil, nil @@ -736,6 +768,9 @@ func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) { // ReadAll returns all values for a key in all blocks. func (t *tsmReader) ReadAll(key string) ([]Value, error) { + t.mu.Lock() + defer t.mu.Unlock() + var values []Value blocks := t.index.Entries(key) if len(blocks) == 0 { @@ -780,6 +815,9 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) { } func (t *tsmReader) Close() error { + t.mu.Lock() + defer t.mu.Unlock() + if c, ok := t.r.(io.Closer); ok { return c.Close() }