Add mutexes around seeker usage

These are not goroutine safe.
pull/4808/head
Jason Wilder 2015-11-16 09:05:27 -07:00
parent ef18f8afb2
commit c2530e93d7
1 changed files with 38 additions and 0 deletions

View File

@ -69,6 +69,7 @@ import (
"io"
"os"
"sort"
"sync"
"time"
)
@ -183,10 +184,15 @@ func NewDirectIndex() TSMIndex {
// directIndex is a simple in-memory index implementation for a TSM file. The full index
// must fit in memory.
type directIndex struct {
mu sync.RWMutex
blocks map[string]indexEntries
}
func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64, size uint32) {
d.mu.Lock()
defer d.mu.Unlock()
d.blocks[key] = append(d.blocks[key], &IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
@ -196,10 +202,16 @@ func (d *directIndex) Add(key string, minTime, maxTime time.Time, offset int64,
}
func (d *directIndex) Entries(key string) []*IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
return d.blocks[key]
}
func (d *directIndex) Entry(key string, t time.Time) *IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
entries := d.Entries(key)
for _, entry := range entries {
if entry.Contains(t) {
@ -218,10 +230,16 @@ func (d *directIndex) ContainsValue(key string, t time.Time) bool {
}
func (d *directIndex) Delete(key string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.blocks, key)
}
func (d *directIndex) Keys() []string {
d.mu.RLock()
defer d.mu.RUnlock()
var keys []string
for k := range d.blocks {
keys = append(keys, k)
@ -249,6 +267,9 @@ func (d *directIndex) Write(w io.Writer) error {
}
func (d *directIndex) MarshalBinary() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
// Index blocks are writtens sorted by key
var keys []string
for k := range d.blocks {
@ -288,6 +309,9 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
}
func (d *directIndex) UnmarshalBinary(b []byte) error {
d.mu.Lock()
defer d.mu.Unlock()
var pos int
for pos < len(b) {
n, key, err := d.readKey(b[pos:])
@ -613,6 +637,8 @@ func (t *tsmWriter) Close() error {
}
type tsmReader struct {
mu sync.Mutex
r io.ReadSeeker
indexStart, indexEnd int64
index TSMIndex
@ -692,6 +718,9 @@ func (t *tsmReader) applyTombstones() error {
}
func (t *tsmReader) Path() string {
t.mu.Lock()
defer t.mu.Unlock()
if f, ok := t.r.(*os.File); ok {
return f.Name()
}
@ -703,6 +732,9 @@ func (t *tsmReader) Keys() []string {
}
func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) {
t.mu.Lock()
defer t.mu.Unlock()
block := t.index.Entry(key, timestamp)
if block == nil {
return nil, nil
@ -736,6 +768,9 @@ func (t *tsmReader) Read(key string, timestamp time.Time) ([]Value, error) {
// ReadAll returns all values for a key in all blocks.
func (t *tsmReader) ReadAll(key string) ([]Value, error) {
t.mu.Lock()
defer t.mu.Unlock()
var values []Value
blocks := t.index.Entries(key)
if len(blocks) == 0 {
@ -780,6 +815,9 @@ func (t *tsmReader) ReadAll(key string) ([]Value, error) {
}
func (t *tsmReader) Close() error {
t.mu.Lock()
defer t.mu.Unlock()
if c, ok := t.r.(io.Closer); ok {
return c.Close()
}