Fix bug with new shards not getting series data persisted.

pull/3717/head
Paul Dix 2015-08-16 15:45:09 -04:00
parent abc71aee53
commit 3348dab4e0
12 changed files with 92 additions and 84 deletions

View File

@ -1,10 +1,12 @@
package tsdb
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"sort"
"time"
"github.com/boltdb/bolt"
@ -127,3 +129,29 @@ type Cursor interface {
Seek(seek []byte) (key, value []byte)
Next() (key, value []byte)
}
// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}
// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}
// Sort entries.
sort.Sort(ByteSlices(other))
return other
}
type ByteSlices [][]byte
func (a ByteSlices) Len() int { return len(a) }
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }

View File

@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}

View File

@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
}
// Parse point.

View File

@ -170,7 +170,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}
@ -284,8 +284,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
c := bkt.Cursor()
// Ensure the slice is sorted before retrieving the time range.
a = DedupeEntries(a)
sort.Sort(byteSlices(a))
a = tsdb.DedupeEntries(a)
// Convert the raw time and byte slices to entries with lengths
for i, p := range a {
timestamp := int64(btou64(p[0:8]))
a[i] = MarshalEntry(timestamp, p[8:])
}
// Determine time range of new data.
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
@ -340,7 +345,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
// Merge entries before rewriting.
a = append(existing, a...)
sort.Sort(byteSlices(a))
sort.Sort(tsdb.ByteSlices(a))
// Rewrite points to new blocks.
if err := e.writeBlocks(bkt, a); err != nil {
@ -367,7 +372,7 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
}
// Append point to the end of the block.
block = append(block, MarshalEntry(timestamp, p[8:])...)
block = append(block, p...)
// If the block is larger than the target block size or this is the
// last point then flush the block to the bucket.
@ -616,26 +621,6 @@ func SplitEntries(b []byte) [][]byte {
}
}
// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}
// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}
// Sort entries.
sort.Sort(byteSlices(other))
return other
}
// entryHeaderSize is the number of bytes required for the header.
const entryHeaderSize = 8 + 4
@ -651,9 +636,3 @@ func u64tob(v uint64) []byte {
// btou64 converts an 8-byte slice into an uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }

View File

@ -28,9 +28,9 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
// Write series metadata.
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}},
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}},
{Series: &tsdb.Series{Key: "series with spaces"}},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})},
{Series: tsdb.NewSeries("series with spaces", nil)},
}); err != nil {
t.Fatal(err)
}
@ -144,11 +144,11 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
// Append points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(1, []byte{0x10}),
bz1.MarshalEntry(2, []byte{0x20}),
append(u64tob(1), 0x10),
append(u64tob(2), 0x20),
},
"mem": [][]byte{
bz1.MarshalEntry(0, []byte{0x30}),
append(u64tob(0), 0x30),
},
}); err != nil {
t.Fatal(err)
@ -185,9 +185,9 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(10, []byte{0x10}),
bz1.MarshalEntry(20, []byte{0x20}),
bz1.MarshalEntry(30, []byte{0x30}),
append(u64tob(10), 0x10),
append(u64tob(20), 0x20),
append(u64tob(30), 0x30),
},
}); err != nil {
t.Fatal(err)
@ -196,10 +196,10 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write overlapping points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(9, []byte{0x09}),
bz1.MarshalEntry(10, []byte{0xFF}),
bz1.MarshalEntry(25, []byte{0x25}),
bz1.MarshalEntry(31, []byte{0x31}),
append(u64tob(9), 0x09),
append(u64tob(10), 0xFF),
append(u64tob(25), 0x25),
append(u64tob(31), 0x31),
},
}); err != nil {
t.Fatal(err)
@ -208,7 +208,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write overlapping points to index again.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
bz1.MarshalEntry(31, []byte{0xFF}),
append(u64tob(31), 0xFF),
},
}); err != nil {
t.Fatal(err)
@ -291,15 +291,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
got = append(got, append(copyBytes(k), v...))
}
// Generate expected values.
// We need to remove the data length from the slice.
var exp [][]byte
for _, b := range points[key] {
exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len
}
if !reflect.DeepEqual(got, exp) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp)
if !reflect.DeepEqual(got, points[key]) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
}
}
@ -425,7 +418,7 @@ func MergePoints(a []Points) Points {
// Dedupe points.
for key, values := range m {
m[key] = bz1.DedupeEntries(values)
m[key] = tsdb.DedupeEntries(values)
}
return m

View File

@ -662,7 +662,7 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int
// always hand the index data that is sorted
if p.cacheDirtySort[k] {
sort.Sort(byteSlices(seriesToFlush[k]))
sort.Sort(tsdb.ByteSlices(seriesToFlush[k]))
delete(p.cacheDirtySort, k)
}
}
@ -881,9 +881,10 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
// Generate in-memory cache entry of <timestamp,data>.
v := MarshalEntry(timestamp, data)
p.memorySize += uint64(len(v))
// Determine if we'll need to sort the values for this key later
a := p.cache[string(key)]
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1], v) == -1)
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1][0:8], v[0:8]) == -1)
p.cacheDirtySort[string(key)] = needSort
// Append to cache list.
@ -908,13 +909,14 @@ func (p *Partition) cursor(key string) *cursor {
c := make([][]byte, len(fc), len(fc)+len(cache))
copy(c, fc)
c = append(c, cache...)
sort.Sort(byteSlices(c))
tsdb.DedupeEntries(c)
return &cursor{cache: c}
}
}
if p.cacheDirtySort[key] {
sort.Sort(byteSlices(cache))
cache = tsdb.DedupeEntries(cache)
p.cache[key] = cache
delete(p.cacheDirtySort, key)
}
@ -1161,10 +1163,3 @@ func u64tob(v uint64) []byte {
func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// byteSlices represents a sortable slice of byte slices.
type byteSlices [][]byte
func (a byteSlices) Len() int { return len(a) }
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -494,7 +494,7 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
func mustCreateShard(dir string) *tsdb.Shard {
tmpShard := path.Join(dir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
if err := sh.Open(); err != nil {
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
}

View File

@ -980,6 +980,16 @@ type Series struct {
id uint64
measurement *Measurement
shardIDs map[uint64]bool // shards that have this series defined
}
// NewSeries returns an initialized series struct
func NewSeries(key string, tags map[string]string) *Series {
return &Series{
Key: key,
Tags: tags,
shardIDs: make(map[uint64]bool),
}
}
// MarshalBinary encodes the object to a binary format.

View File

@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Series: &tsdb.Series{
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
Tags: ts,
},
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
})
}
}

View File

@ -40,6 +40,7 @@ type Shard struct {
db *bolt.DB // underlying data store
index *DatabaseIndex
path string
id uint64
engine Engine
options EngineOptions
@ -52,10 +53,11 @@ type Shard struct {
}
// NewShard returns a new initialized Shard
func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard {
func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard {
return &Shard{
index: index,
path: path,
id: id,
options: options,
measurementFields: make(map[string]*MeasurementFields),
@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
for _, p := range points {
// see if the series should be added to the index
if ss := s.index.series[string(p.Key())]; ss == nil {
series := &Series{Key: string(p.Key()), Tags: p.Tags()}
series := NewSeries(string(p.Key()), p.Tags())
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
} else if !ss.shardIDs[s.id] {
// this is the first time this series is being written into this shard, persist it
ss.shardIDs[s.id] = true
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss})
}
// see if the field definitions need to be saved to the shard

View File

@ -20,7 +20,7 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -66,7 +66,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = tsdb.NewDatabaseIndex()
sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
sh = tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -87,7 +87,7 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -143,7 +143,7 @@ func TestShard_Autoflush(t *testing.T) {
defer os.RemoveAll(path)
// Open shard with a really low size threshold, high flush interval.
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
EngineVersion: b1.Format,
MaxWALSize: 1024, // 1KB
WALFlushInterval: 1 * time.Hour,
@ -183,7 +183,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
defer os.RemoveAll(path)
// Open shard with a high size threshold, small time threshold.
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
EngineVersion: b1.Format,
MaxWALSize: 10 * 1024 * 1024, // 10MB
WALFlushInterval: 100 * time.Millisecond,
@ -266,7 +266,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
for n := 0; n < b.N; n++ {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
shard.Open()
b.StartTimer()
@ -301,7 +301,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
tmpDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)

View File

@ -82,7 +82,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
}
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(db, shardPath, s.EngineOptions)
shard := NewShard(shardID, db, shardPath, s.EngineOptions)
if err := shard.Open(); err != nil {
return err
}
@ -236,7 +236,7 @@ func (s *Store) loadShards() error {
continue
}
shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions)
shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions)
err = shard.Open()
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)