diff --git a/tsdb/engine.go b/tsdb/engine.go index 79fcfa5258..f18ce5cc51 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -1,10 +1,12 @@ package tsdb import ( + "bytes" "errors" "fmt" "io" "os" + "sort" "time" "github.com/boltdb/bolt" @@ -127,3 +129,29 @@ type Cursor interface { Seek(seek []byte) (key, value []byte) Next() (key, value []byte) } + +// DedupeEntries returns slices with unique keys (the first 8 bytes). +func DedupeEntries(a [][]byte) [][]byte { + // Convert to a map where the last slice is used. + m := make(map[string][]byte) + for _, b := range a { + m[string(b[0:8])] = b + } + + // Convert map back to a slice of byte slices. + other := make([][]byte, 0, len(m)) + for _, v := range m { + other = append(other, v) + } + + // Sort entries. + sort.Sort(ByteSlices(other)) + + return other +} + +type ByteSlices [][]byte + +func (a ByteSlices) Len() int { return len(a) } +func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } diff --git a/tsdb/engine/b1/b1.go b/tsdb/engine/b1/b1.go index 7e9e0f7369..aaba7411d5 100644 --- a/tsdb/engine/b1/b1.go +++ b/tsdb/engine/b1/b1.go @@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields meta = tx.Bucket([]byte("series")) c = meta.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - series := &tsdb.Series{} + series := tsdb.NewSeries("", nil) if err := series.UnmarshalBinary(v); err != nil { return err } diff --git a/tsdb/engine/b1/b1_test.go b/tsdb/engine/b1/b1_test.go index ee1009dbd1..cd2d984457 100644 --- a/tsdb/engine/b1/b1_test.go +++ b/tsdb/engine/b1/b1_test.go @@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) { mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)} mf.CreateFieldIfNotExists("value", influxql.Float) seriesToCreate := []*tsdb.SeriesCreate{ - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)}, } // Parse point. diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index 98fc910258..5221ff3302 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -170,7 +170,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields meta = tx.Bucket([]byte("series")) c = meta.Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - series := &tsdb.Series{} + series := tsdb.NewSeries("", nil) if err := series.UnmarshalBinary(v); err != nil { return err } @@ -284,8 +284,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { c := bkt.Cursor() // Ensure the slice is sorted before retrieving the time range. - a = DedupeEntries(a) - sort.Sort(byteSlices(a)) + a = tsdb.DedupeEntries(a) + + // Convert the raw time and byte slices to entries with lengths + for i, p := range a { + timestamp := int64(btou64(p[0:8])) + a[i] = MarshalEntry(timestamp, p[8:]) + } // Determine time range of new data. tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8])) @@ -340,7 +345,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error { // Merge entries before rewriting. a = append(existing, a...) - sort.Sort(byteSlices(a)) + sort.Sort(tsdb.ByteSlices(a)) // Rewrite points to new blocks. if err := e.writeBlocks(bkt, a); err != nil { @@ -367,7 +372,7 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error { } // Append point to the end of the block. - block = append(block, MarshalEntry(timestamp, p[8:])...) + block = append(block, p...) // If the block is larger than the target block size or this is the // last point then flush the block to the bucket. @@ -616,26 +621,6 @@ func SplitEntries(b []byte) [][]byte { } } -// DedupeEntries returns slices with unique keys (the first 8 bytes). -func DedupeEntries(a [][]byte) [][]byte { - // Convert to a map where the last slice is used. - m := make(map[string][]byte) - for _, b := range a { - m[string(b[0:8])] = b - } - - // Convert map back to a slice of byte slices. - other := make([][]byte, 0, len(m)) - for _, v := range m { - other = append(other, v) - } - - // Sort entries. - sort.Sort(byteSlices(other)) - - return other -} - // entryHeaderSize is the number of bytes required for the header. const entryHeaderSize = 8 + 4 @@ -651,9 +636,3 @@ func u64tob(v uint64) []byte { // btou64 converts an 8-byte slice into an uint64. func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } - -type byteSlices [][]byte - -func (a byteSlices) Len() int { return len(a) } -func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } diff --git a/tsdb/engine/bz1/bz1_test.go b/tsdb/engine/bz1/bz1_test.go index 1662462d61..4e5d45d2c3 100644 --- a/tsdb/engine/bz1/bz1_test.go +++ b/tsdb/engine/bz1/bz1_test.go @@ -28,9 +28,9 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) { // Write series metadata. if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{ - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}}, - {Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}}, - {Series: &tsdb.Series{Key: "series with spaces"}}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})}, + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})}, + {Series: tsdb.NewSeries("series with spaces", nil)}, }); err != nil { t.Fatal(err) } @@ -144,11 +144,11 @@ func TestEngine_WriteIndex_Append(t *testing.T) { // Append points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(1, []byte{0x10}), - bz1.MarshalEntry(2, []byte{0x20}), + append(u64tob(1), 0x10), + append(u64tob(2), 0x20), }, "mem": [][]byte{ - bz1.MarshalEntry(0, []byte{0x30}), + append(u64tob(0), 0x30), }, }); err != nil { t.Fatal(err) @@ -185,9 +185,9 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { // Write initial points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(10, []byte{0x10}), - bz1.MarshalEntry(20, []byte{0x20}), - bz1.MarshalEntry(30, []byte{0x30}), + append(u64tob(10), 0x10), + append(u64tob(20), 0x20), + append(u64tob(30), 0x30), }, }); err != nil { t.Fatal(err) @@ -196,10 +196,10 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { // Write overlapping points to index. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(9, []byte{0x09}), - bz1.MarshalEntry(10, []byte{0xFF}), - bz1.MarshalEntry(25, []byte{0x25}), - bz1.MarshalEntry(31, []byte{0x31}), + append(u64tob(9), 0x09), + append(u64tob(10), 0xFF), + append(u64tob(25), 0x25), + append(u64tob(31), 0x31), }, }); err != nil { t.Fatal(err) @@ -208,7 +208,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) { // Write overlapping points to index again. if err := e.WriteIndex(map[string][][]byte{ "cpu": [][]byte{ - bz1.MarshalEntry(31, []byte{0xFF}), + append(u64tob(31), 0xFF), }, }); err != nil { t.Fatal(err) @@ -291,15 +291,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) { got = append(got, append(copyBytes(k), v...)) } - // Generate expected values. - // We need to remove the data length from the slice. - var exp [][]byte - for _, b := range points[key] { - exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len - } - - if !reflect.DeepEqual(got, exp) { - t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp) + if !reflect.DeepEqual(got, points[key]) { + t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key]) } } @@ -425,7 +418,7 @@ func MergePoints(a []Points) Points { // Dedupe points. for key, values := range m { - m[key] = bz1.DedupeEntries(values) + m[key] = tsdb.DedupeEntries(values) } return m diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 4eeef4c5b7..83be10baf8 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -662,7 +662,7 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int // always hand the index data that is sorted if p.cacheDirtySort[k] { - sort.Sort(byteSlices(seriesToFlush[k])) + sort.Sort(tsdb.ByteSlices(seriesToFlush[k])) delete(p.cacheDirtySort, k) } } @@ -881,9 +881,10 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) { // Generate in-memory cache entry of . v := MarshalEntry(timestamp, data) p.memorySize += uint64(len(v)) + // Determine if we'll need to sort the values for this key later a := p.cache[string(key)] - needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1], v) == -1) + needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1][0:8], v[0:8]) == -1) p.cacheDirtySort[string(key)] = needSort // Append to cache list. @@ -908,13 +909,14 @@ func (p *Partition) cursor(key string) *cursor { c := make([][]byte, len(fc), len(fc)+len(cache)) copy(c, fc) c = append(c, cache...) - sort.Sort(byteSlices(c)) + tsdb.DedupeEntries(c) return &cursor{cache: c} } } if p.cacheDirtySort[key] { - sort.Sort(byteSlices(cache)) + cache = tsdb.DedupeEntries(cache) + p.cache[key] = cache delete(p.cacheDirtySort, key) } @@ -1161,10 +1163,3 @@ func u64tob(v uint64) []byte { func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } - -// byteSlices represents a sortable slice of byte slices. -type byteSlices [][]byte - -func (a byteSlices) Len() int { return len(a) } -func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } -func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/mapper_test.go b/tsdb/mapper_test.go index 5fff0e8c50..9dbc15c767 100644 --- a/tsdb/mapper_test.go +++ b/tsdb/mapper_test.go @@ -494,7 +494,7 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) { func mustCreateShard(dir string) *tsdb.Shard { tmpShard := path.Join(dir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) if err := sh.Open(); err != nil { panic(fmt.Sprintf("error opening shard: %s", err.Error())) } diff --git a/tsdb/meta.go b/tsdb/meta.go index 3d27dbb995..f2dd1b707d 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -980,6 +980,16 @@ type Series struct { id uint64 measurement *Measurement + shardIDs map[uint64]bool // shards that have this series defined +} + +// NewSeries returns an initialized series struct +func NewSeries(key string, tags map[string]string) *Series { + return &Series{ + Key: key, + Tags: tags, + shardIDs: make(map[uint64]bool), + } } // MarshalBinary encodes the object to a binary format. diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index dac931bcbe..9964130b8d 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { for _, ts := range tagSets { series = append(series, &TestSeries{ Measurement: m, - Series: &tsdb.Series{ - Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), - Tags: ts, - }, + Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts), }) } } diff --git a/tsdb/shard.go b/tsdb/shard.go index b37500f140..1747517a2f 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -40,6 +40,7 @@ type Shard struct { db *bolt.DB // underlying data store index *DatabaseIndex path string + id uint64 engine Engine options EngineOptions @@ -52,10 +53,11 @@ type Shard struct { } // NewShard returns a new initialized Shard -func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard { +func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard { return &Shard{ index: index, path: path, + id: id, options: options, measurementFields: make(map[string]*MeasurementFields), @@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie for _, p := range points { // see if the series should be added to the index if ss := s.index.series[string(p.Key())]; ss == nil { - series := &Series{Key: string(p.Key()), Tags: p.Tags()} + series := NewSeries(string(p.Key()), p.Tags()) seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series}) + } else if !ss.shardIDs[s.id] { + // this is the first time this series is being written into this shard, persist it + ss.shardIDs[s.id] = true + seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss}) } // see if the field definitions need to be saved to the shard diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 0b7f273b2b..d6c1829741 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -20,7 +20,7 @@ func TestShardWriteAndIndex(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -66,7 +66,7 @@ func TestShardWriteAndIndex(t *testing.T) { sh.Close() index = tsdb.NewDatabaseIndex() - sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + sh = tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -87,7 +87,7 @@ func TestShardWriteAddNewField(t *testing.T) { tmpShard := path.Join(tmpDir, "shard") index := tsdb.NewDatabaseIndex() - sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -143,7 +143,7 @@ func TestShard_Autoflush(t *testing.T) { defer os.RemoveAll(path) // Open shard with a really low size threshold, high flush interval. - sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ EngineVersion: b1.Format, MaxWALSize: 1024, // 1KB WALFlushInterval: 1 * time.Hour, @@ -183,7 +183,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) { defer os.RemoveAll(path) // Open shard with a high size threshold, small time threshold. - sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ + sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{ EngineVersion: b1.Format, MaxWALSize: 10 * 1024 * 1024, // 10MB WALFlushInterval: 100 * time.Millisecond, @@ -266,7 +266,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { for n := 0; n < b.N; n++ { tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") - shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) shard.Open() b.StartTimer() @@ -301,7 +301,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt tmpDir, _ := ioutil.TempDir("", "") defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") - shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions()) shard.Open() defer shard.Close() chunkedWrite(shard, points) diff --git a/tsdb/store.go b/tsdb/store.go index e028437bcf..2573f7ecd7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -82,7 +82,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er } shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) - shard := NewShard(db, shardPath, s.EngineOptions) + shard := NewShard(shardID, db, shardPath, s.EngineOptions) if err := shard.Open(); err != nil { return err } @@ -236,7 +236,7 @@ func (s *Store) loadShards() error { continue } - shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions) + shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions) err = shard.Open() if err != nil { return fmt.Errorf("failed to open shard %d: %s", shardID, err)