Fix bug with new shards not getting series data persisted.
parent
abc71aee53
commit
3348dab4e0
|
@ -1,10 +1,12 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
|
@ -127,3 +129,29 @@ type Cursor interface {
|
|||
Seek(seek []byte) (key, value []byte)
|
||||
Next() (key, value []byte)
|
||||
}
|
||||
|
||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||
func DedupeEntries(a [][]byte) [][]byte {
|
||||
// Convert to a map where the last slice is used.
|
||||
m := make(map[string][]byte)
|
||||
for _, b := range a {
|
||||
m[string(b[0:8])] = b
|
||||
}
|
||||
|
||||
// Convert map back to a slice of byte slices.
|
||||
other := make([][]byte, 0, len(m))
|
||||
for _, v := range m {
|
||||
other = append(other, v)
|
||||
}
|
||||
|
||||
// Sort entries.
|
||||
sort.Sort(ByteSlices(other))
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
type ByteSlices [][]byte
|
||||
|
||||
func (a ByteSlices) Len() int { return len(a) }
|
||||
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
|
|
|
@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
|
|||
meta = tx.Bucket([]byte("series"))
|
||||
c = meta.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
series := &tsdb.Series{}
|
||||
series := tsdb.NewSeries("", nil)
|
||||
if err := series.UnmarshalBinary(v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
|
|||
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
|
||||
mf.CreateFieldIfNotExists("value", influxql.Float)
|
||||
seriesToCreate := []*tsdb.SeriesCreate{
|
||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
|
||||
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
|
||||
}
|
||||
|
||||
// Parse point.
|
||||
|
|
|
@ -170,7 +170,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
|
|||
meta = tx.Bucket([]byte("series"))
|
||||
c = meta.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
series := &tsdb.Series{}
|
||||
series := tsdb.NewSeries("", nil)
|
||||
if err := series.UnmarshalBinary(v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -284,8 +284,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
c := bkt.Cursor()
|
||||
|
||||
// Ensure the slice is sorted before retrieving the time range.
|
||||
a = DedupeEntries(a)
|
||||
sort.Sort(byteSlices(a))
|
||||
a = tsdb.DedupeEntries(a)
|
||||
|
||||
// Convert the raw time and byte slices to entries with lengths
|
||||
for i, p := range a {
|
||||
timestamp := int64(btou64(p[0:8]))
|
||||
a[i] = MarshalEntry(timestamp, p[8:])
|
||||
}
|
||||
|
||||
// Determine time range of new data.
|
||||
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
|
||||
|
@ -340,7 +345,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
|
||||
// Merge entries before rewriting.
|
||||
a = append(existing, a...)
|
||||
sort.Sort(byteSlices(a))
|
||||
sort.Sort(tsdb.ByteSlices(a))
|
||||
|
||||
// Rewrite points to new blocks.
|
||||
if err := e.writeBlocks(bkt, a); err != nil {
|
||||
|
@ -367,7 +372,7 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
|
|||
}
|
||||
|
||||
// Append point to the end of the block.
|
||||
block = append(block, MarshalEntry(timestamp, p[8:])...)
|
||||
block = append(block, p...)
|
||||
|
||||
// If the block is larger than the target block size or this is the
|
||||
// last point then flush the block to the bucket.
|
||||
|
@ -616,26 +621,6 @@ func SplitEntries(b []byte) [][]byte {
|
|||
}
|
||||
}
|
||||
|
||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||
func DedupeEntries(a [][]byte) [][]byte {
|
||||
// Convert to a map where the last slice is used.
|
||||
m := make(map[string][]byte)
|
||||
for _, b := range a {
|
||||
m[string(b[0:8])] = b
|
||||
}
|
||||
|
||||
// Convert map back to a slice of byte slices.
|
||||
other := make([][]byte, 0, len(m))
|
||||
for _, v := range m {
|
||||
other = append(other, v)
|
||||
}
|
||||
|
||||
// Sort entries.
|
||||
sort.Sort(byteSlices(other))
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
// entryHeaderSize is the number of bytes required for the header.
|
||||
const entryHeaderSize = 8 + 4
|
||||
|
||||
|
@ -651,9 +636,3 @@ func u64tob(v uint64) []byte {
|
|||
|
||||
// btou64 converts an 8-byte slice into an uint64.
|
||||
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
|
||||
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
|
|
|
@ -28,9 +28,9 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
|
|||
|
||||
// Write series metadata.
|
||||
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
|
||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}},
|
||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}},
|
||||
{Series: &tsdb.Series{Key: "series with spaces"}},
|
||||
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})},
|
||||
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})},
|
||||
{Series: tsdb.NewSeries("series with spaces", nil)},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -144,11 +144,11 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
|
|||
// Append points to index.
|
||||
if err := e.WriteIndex(map[string][][]byte{
|
||||
"cpu": [][]byte{
|
||||
bz1.MarshalEntry(1, []byte{0x10}),
|
||||
bz1.MarshalEntry(2, []byte{0x20}),
|
||||
append(u64tob(1), 0x10),
|
||||
append(u64tob(2), 0x20),
|
||||
},
|
||||
"mem": [][]byte{
|
||||
bz1.MarshalEntry(0, []byte{0x30}),
|
||||
append(u64tob(0), 0x30),
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -185,9 +185,9 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
|||
// Write initial points to index.
|
||||
if err := e.WriteIndex(map[string][][]byte{
|
||||
"cpu": [][]byte{
|
||||
bz1.MarshalEntry(10, []byte{0x10}),
|
||||
bz1.MarshalEntry(20, []byte{0x20}),
|
||||
bz1.MarshalEntry(30, []byte{0x30}),
|
||||
append(u64tob(10), 0x10),
|
||||
append(u64tob(20), 0x20),
|
||||
append(u64tob(30), 0x30),
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -196,10 +196,10 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
|||
// Write overlapping points to index.
|
||||
if err := e.WriteIndex(map[string][][]byte{
|
||||
"cpu": [][]byte{
|
||||
bz1.MarshalEntry(9, []byte{0x09}),
|
||||
bz1.MarshalEntry(10, []byte{0xFF}),
|
||||
bz1.MarshalEntry(25, []byte{0x25}),
|
||||
bz1.MarshalEntry(31, []byte{0x31}),
|
||||
append(u64tob(9), 0x09),
|
||||
append(u64tob(10), 0xFF),
|
||||
append(u64tob(25), 0x25),
|
||||
append(u64tob(31), 0x31),
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -208,7 +208,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
|||
// Write overlapping points to index again.
|
||||
if err := e.WriteIndex(map[string][][]byte{
|
||||
"cpu": [][]byte{
|
||||
bz1.MarshalEntry(31, []byte{0xFF}),
|
||||
append(u64tob(31), 0xFF),
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -291,15 +291,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
|
|||
got = append(got, append(copyBytes(k), v...))
|
||||
}
|
||||
|
||||
// Generate expected values.
|
||||
// We need to remove the data length from the slice.
|
||||
var exp [][]byte
|
||||
for _, b := range points[key] {
|
||||
exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp)
|
||||
if !reflect.DeepEqual(got, points[key]) {
|
||||
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -425,7 +418,7 @@ func MergePoints(a []Points) Points {
|
|||
|
||||
// Dedupe points.
|
||||
for key, values := range m {
|
||||
m[key] = bz1.DedupeEntries(values)
|
||||
m[key] = tsdb.DedupeEntries(values)
|
||||
}
|
||||
|
||||
return m
|
||||
|
|
|
@ -662,7 +662,7 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int
|
|||
|
||||
// always hand the index data that is sorted
|
||||
if p.cacheDirtySort[k] {
|
||||
sort.Sort(byteSlices(seriesToFlush[k]))
|
||||
sort.Sort(tsdb.ByteSlices(seriesToFlush[k]))
|
||||
delete(p.cacheDirtySort, k)
|
||||
}
|
||||
}
|
||||
|
@ -881,9 +881,10 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
|
|||
// Generate in-memory cache entry of <timestamp,data>.
|
||||
v := MarshalEntry(timestamp, data)
|
||||
p.memorySize += uint64(len(v))
|
||||
|
||||
// Determine if we'll need to sort the values for this key later
|
||||
a := p.cache[string(key)]
|
||||
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1], v) == -1)
|
||||
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1][0:8], v[0:8]) == -1)
|
||||
p.cacheDirtySort[string(key)] = needSort
|
||||
|
||||
// Append to cache list.
|
||||
|
@ -908,13 +909,14 @@ func (p *Partition) cursor(key string) *cursor {
|
|||
c := make([][]byte, len(fc), len(fc)+len(cache))
|
||||
copy(c, fc)
|
||||
c = append(c, cache...)
|
||||
sort.Sort(byteSlices(c))
|
||||
tsdb.DedupeEntries(c)
|
||||
return &cursor{cache: c}
|
||||
}
|
||||
}
|
||||
|
||||
if p.cacheDirtySort[key] {
|
||||
sort.Sort(byteSlices(cache))
|
||||
cache = tsdb.DedupeEntries(cache)
|
||||
p.cache[key] = cache
|
||||
delete(p.cacheDirtySort, key)
|
||||
}
|
||||
|
||||
|
@ -1161,10 +1163,3 @@ func u64tob(v uint64) []byte {
|
|||
func btou64(b []byte) uint64 {
|
||||
return binary.BigEndian.Uint64(b)
|
||||
}
|
||||
|
||||
// byteSlices represents a sortable slice of byte slices.
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
|
|
@ -494,7 +494,7 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
|
|||
func mustCreateShard(dir string) *tsdb.Shard {
|
||||
tmpShard := path.Join(dir, "shard")
|
||||
index := tsdb.NewDatabaseIndex()
|
||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
if err := sh.Open(); err != nil {
|
||||
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
|
||||
}
|
||||
|
|
10
tsdb/meta.go
10
tsdb/meta.go
|
@ -980,6 +980,16 @@ type Series struct {
|
|||
|
||||
id uint64
|
||||
measurement *Measurement
|
||||
shardIDs map[uint64]bool // shards that have this series defined
|
||||
}
|
||||
|
||||
// NewSeries returns an initialized series struct
|
||||
func NewSeries(key string, tags map[string]string) *Series {
|
||||
return &Series{
|
||||
Key: key,
|
||||
Tags: tags,
|
||||
shardIDs: make(map[uint64]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the object to a binary format.
|
||||
|
|
|
@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
|
|||
for _, ts := range tagSets {
|
||||
series = append(series, &TestSeries{
|
||||
Measurement: m,
|
||||
Series: &tsdb.Series{
|
||||
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
|
||||
Tags: ts,
|
||||
},
|
||||
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ type Shard struct {
|
|||
db *bolt.DB // underlying data store
|
||||
index *DatabaseIndex
|
||||
path string
|
||||
id uint64
|
||||
|
||||
engine Engine
|
||||
options EngineOptions
|
||||
|
@ -52,10 +53,11 @@ type Shard struct {
|
|||
}
|
||||
|
||||
// NewShard returns a new initialized Shard
|
||||
func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard {
|
||||
func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard {
|
||||
return &Shard{
|
||||
index: index,
|
||||
path: path,
|
||||
id: id,
|
||||
options: options,
|
||||
measurementFields: make(map[string]*MeasurementFields),
|
||||
|
||||
|
@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
|
|||
for _, p := range points {
|
||||
// see if the series should be added to the index
|
||||
if ss := s.index.series[string(p.Key())]; ss == nil {
|
||||
series := &Series{Key: string(p.Key()), Tags: p.Tags()}
|
||||
series := NewSeries(string(p.Key()), p.Tags())
|
||||
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
|
||||
} else if !ss.shardIDs[s.id] {
|
||||
// this is the first time this series is being written into this shard, persist it
|
||||
ss.shardIDs[s.id] = true
|
||||
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss})
|
||||
}
|
||||
|
||||
// see if the field definitions need to be saved to the shard
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
tmpShard := path.Join(tmpDir, "shard")
|
||||
|
||||
index := tsdb.NewDatabaseIndex()
|
||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error openeing shard: %s", err.Error())
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
|||
sh.Close()
|
||||
|
||||
index = tsdb.NewDatabaseIndex()
|
||||
sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
sh = tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error openeing shard: %s", err.Error())
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ func TestShardWriteAddNewField(t *testing.T) {
|
|||
tmpShard := path.Join(tmpDir, "shard")
|
||||
|
||||
index := tsdb.NewDatabaseIndex()
|
||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatalf("error openeing shard: %s", err.Error())
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ func TestShard_Autoflush(t *testing.T) {
|
|||
defer os.RemoveAll(path)
|
||||
|
||||
// Open shard with a really low size threshold, high flush interval.
|
||||
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||
EngineVersion: b1.Format,
|
||||
MaxWALSize: 1024, // 1KB
|
||||
WALFlushInterval: 1 * time.Hour,
|
||||
|
@ -183,7 +183,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
|
|||
defer os.RemoveAll(path)
|
||||
|
||||
// Open shard with a high size threshold, small time threshold.
|
||||
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||
EngineVersion: b1.Format,
|
||||
MaxWALSize: 10 * 1024 * 1024, // 10MB
|
||||
WALFlushInterval: 100 * time.Millisecond,
|
||||
|
@ -266,7 +266,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
|
|||
for n := 0; n < b.N; n++ {
|
||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||
tmpShard := path.Join(tmpDir, "shard")
|
||||
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
shard.Open()
|
||||
|
||||
b.StartTimer()
|
||||
|
@ -301,7 +301,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
|
|||
tmpDir, _ := ioutil.TempDir("", "")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
tmpShard := path.Join(tmpDir, "shard")
|
||||
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
||||
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||
shard.Open()
|
||||
defer shard.Close()
|
||||
chunkedWrite(shard, points)
|
||||
|
|
|
@ -82,7 +82,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
|
|||
}
|
||||
|
||||
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
||||
shard := NewShard(db, shardPath, s.EngineOptions)
|
||||
shard := NewShard(shardID, db, shardPath, s.EngineOptions)
|
||||
if err := shard.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -236,7 +236,7 @@ func (s *Store) loadShards() error {
|
|||
continue
|
||||
}
|
||||
|
||||
shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions)
|
||||
shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions)
|
||||
err = shard.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
|
||||
|
|
Loading…
Reference in New Issue