Fix bug with new shards not getting series data persisted.
parent
abc71aee53
commit
3348dab4e0
|
@ -1,10 +1,12 @@
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
|
@ -127,3 +129,29 @@ type Cursor interface {
|
||||||
Seek(seek []byte) (key, value []byte)
|
Seek(seek []byte) (key, value []byte)
|
||||||
Next() (key, value []byte)
|
Next() (key, value []byte)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||||
|
func DedupeEntries(a [][]byte) [][]byte {
|
||||||
|
// Convert to a map where the last slice is used.
|
||||||
|
m := make(map[string][]byte)
|
||||||
|
for _, b := range a {
|
||||||
|
m[string(b[0:8])] = b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert map back to a slice of byte slices.
|
||||||
|
other := make([][]byte, 0, len(m))
|
||||||
|
for _, v := range m {
|
||||||
|
other = append(other, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort entries.
|
||||||
|
sort.Sort(ByteSlices(other))
|
||||||
|
|
||||||
|
return other
|
||||||
|
}
|
||||||
|
|
||||||
|
type ByteSlices [][]byte
|
||||||
|
|
||||||
|
func (a ByteSlices) Len() int { return len(a) }
|
||||||
|
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||||
|
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||||
|
|
|
@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
|
||||||
meta = tx.Bucket([]byte("series"))
|
meta = tx.Bucket([]byte("series"))
|
||||||
c = meta.Cursor()
|
c = meta.Cursor()
|
||||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||||
series := &tsdb.Series{}
|
series := tsdb.NewSeries("", nil)
|
||||||
if err := series.UnmarshalBinary(v); err != nil {
|
if err := series.UnmarshalBinary(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
|
||||||
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
|
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
|
||||||
mf.CreateFieldIfNotExists("value", influxql.Float)
|
mf.CreateFieldIfNotExists("value", influxql.Float)
|
||||||
seriesToCreate := []*tsdb.SeriesCreate{
|
seriesToCreate := []*tsdb.SeriesCreate{
|
||||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
|
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse point.
|
// Parse point.
|
||||||
|
|
|
@ -170,7 +170,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
|
||||||
meta = tx.Bucket([]byte("series"))
|
meta = tx.Bucket([]byte("series"))
|
||||||
c = meta.Cursor()
|
c = meta.Cursor()
|
||||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||||
series := &tsdb.Series{}
|
series := tsdb.NewSeries("", nil)
|
||||||
if err := series.UnmarshalBinary(v); err != nil {
|
if err := series.UnmarshalBinary(v); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -284,8 +284,13 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
|
|
||||||
// Ensure the slice is sorted before retrieving the time range.
|
// Ensure the slice is sorted before retrieving the time range.
|
||||||
a = DedupeEntries(a)
|
a = tsdb.DedupeEntries(a)
|
||||||
sort.Sort(byteSlices(a))
|
|
||||||
|
// Convert the raw time and byte slices to entries with lengths
|
||||||
|
for i, p := range a {
|
||||||
|
timestamp := int64(btou64(p[0:8]))
|
||||||
|
a[i] = MarshalEntry(timestamp, p[8:])
|
||||||
|
}
|
||||||
|
|
||||||
// Determine time range of new data.
|
// Determine time range of new data.
|
||||||
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
|
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
|
||||||
|
@ -340,7 +345,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
||||||
|
|
||||||
// Merge entries before rewriting.
|
// Merge entries before rewriting.
|
||||||
a = append(existing, a...)
|
a = append(existing, a...)
|
||||||
sort.Sort(byteSlices(a))
|
sort.Sort(tsdb.ByteSlices(a))
|
||||||
|
|
||||||
// Rewrite points to new blocks.
|
// Rewrite points to new blocks.
|
||||||
if err := e.writeBlocks(bkt, a); err != nil {
|
if err := e.writeBlocks(bkt, a); err != nil {
|
||||||
|
@ -367,7 +372,7 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append point to the end of the block.
|
// Append point to the end of the block.
|
||||||
block = append(block, MarshalEntry(timestamp, p[8:])...)
|
block = append(block, p...)
|
||||||
|
|
||||||
// If the block is larger than the target block size or this is the
|
// If the block is larger than the target block size or this is the
|
||||||
// last point then flush the block to the bucket.
|
// last point then flush the block to the bucket.
|
||||||
|
@ -616,26 +621,6 @@ func SplitEntries(b []byte) [][]byte {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
|
||||||
func DedupeEntries(a [][]byte) [][]byte {
|
|
||||||
// Convert to a map where the last slice is used.
|
|
||||||
m := make(map[string][]byte)
|
|
||||||
for _, b := range a {
|
|
||||||
m[string(b[0:8])] = b
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert map back to a slice of byte slices.
|
|
||||||
other := make([][]byte, 0, len(m))
|
|
||||||
for _, v := range m {
|
|
||||||
other = append(other, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort entries.
|
|
||||||
sort.Sort(byteSlices(other))
|
|
||||||
|
|
||||||
return other
|
|
||||||
}
|
|
||||||
|
|
||||||
// entryHeaderSize is the number of bytes required for the header.
|
// entryHeaderSize is the number of bytes required for the header.
|
||||||
const entryHeaderSize = 8 + 4
|
const entryHeaderSize = 8 + 4
|
||||||
|
|
||||||
|
@ -651,9 +636,3 @@ func u64tob(v uint64) []byte {
|
||||||
|
|
||||||
// btou64 converts an 8-byte slice into an uint64.
|
// btou64 converts an 8-byte slice into an uint64.
|
||||||
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
|
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
|
||||||
|
|
||||||
type byteSlices [][]byte
|
|
||||||
|
|
||||||
func (a byteSlices) Len() int { return len(a) }
|
|
||||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
||||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
|
||||||
|
|
|
@ -28,9 +28,9 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
|
||||||
|
|
||||||
// Write series metadata.
|
// Write series metadata.
|
||||||
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
|
if err := e.WritePoints(nil, nil, []*tsdb.SeriesCreate{
|
||||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), Tags: map[string]string{"host": "server0"}}},
|
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server0"})), map[string]string{"host": "server0"})},
|
||||||
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), Tags: map[string]string{"host": "server1"}}},
|
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("cpu"), map[string]string{"host": "server1"})), map[string]string{"host": "server1"})},
|
||||||
{Series: &tsdb.Series{Key: "series with spaces"}},
|
{Series: tsdb.NewSeries("series with spaces", nil)},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -144,11 +144,11 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
|
||||||
// Append points to index.
|
// Append points to index.
|
||||||
if err := e.WriteIndex(map[string][][]byte{
|
if err := e.WriteIndex(map[string][][]byte{
|
||||||
"cpu": [][]byte{
|
"cpu": [][]byte{
|
||||||
bz1.MarshalEntry(1, []byte{0x10}),
|
append(u64tob(1), 0x10),
|
||||||
bz1.MarshalEntry(2, []byte{0x20}),
|
append(u64tob(2), 0x20),
|
||||||
},
|
},
|
||||||
"mem": [][]byte{
|
"mem": [][]byte{
|
||||||
bz1.MarshalEntry(0, []byte{0x30}),
|
append(u64tob(0), 0x30),
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -185,9 +185,9 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
||||||
// Write initial points to index.
|
// Write initial points to index.
|
||||||
if err := e.WriteIndex(map[string][][]byte{
|
if err := e.WriteIndex(map[string][][]byte{
|
||||||
"cpu": [][]byte{
|
"cpu": [][]byte{
|
||||||
bz1.MarshalEntry(10, []byte{0x10}),
|
append(u64tob(10), 0x10),
|
||||||
bz1.MarshalEntry(20, []byte{0x20}),
|
append(u64tob(20), 0x20),
|
||||||
bz1.MarshalEntry(30, []byte{0x30}),
|
append(u64tob(30), 0x30),
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -196,10 +196,10 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
||||||
// Write overlapping points to index.
|
// Write overlapping points to index.
|
||||||
if err := e.WriteIndex(map[string][][]byte{
|
if err := e.WriteIndex(map[string][][]byte{
|
||||||
"cpu": [][]byte{
|
"cpu": [][]byte{
|
||||||
bz1.MarshalEntry(9, []byte{0x09}),
|
append(u64tob(9), 0x09),
|
||||||
bz1.MarshalEntry(10, []byte{0xFF}),
|
append(u64tob(10), 0xFF),
|
||||||
bz1.MarshalEntry(25, []byte{0x25}),
|
append(u64tob(25), 0x25),
|
||||||
bz1.MarshalEntry(31, []byte{0x31}),
|
append(u64tob(31), 0x31),
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -208,7 +208,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
||||||
// Write overlapping points to index again.
|
// Write overlapping points to index again.
|
||||||
if err := e.WriteIndex(map[string][][]byte{
|
if err := e.WriteIndex(map[string][][]byte{
|
||||||
"cpu": [][]byte{
|
"cpu": [][]byte{
|
||||||
bz1.MarshalEntry(31, []byte{0xFF}),
|
append(u64tob(31), 0xFF),
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -291,15 +291,8 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
|
||||||
got = append(got, append(copyBytes(k), v...))
|
got = append(got, append(copyBytes(k), v...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate expected values.
|
if !reflect.DeepEqual(got, points[key]) {
|
||||||
// We need to remove the data length from the slice.
|
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
|
||||||
var exp [][]byte
|
|
||||||
for _, b := range points[key] {
|
|
||||||
exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(got, exp) {
|
|
||||||
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,7 +418,7 @@ func MergePoints(a []Points) Points {
|
||||||
|
|
||||||
// Dedupe points.
|
// Dedupe points.
|
||||||
for key, values := range m {
|
for key, values := range m {
|
||||||
m[key] = bz1.DedupeEntries(values)
|
m[key] = tsdb.DedupeEntries(values)
|
||||||
}
|
}
|
||||||
|
|
||||||
return m
|
return m
|
||||||
|
|
|
@ -662,7 +662,7 @@ func (p *Partition) seriesToFlush(readySeriesSize int) (map[string][][]byte, int
|
||||||
|
|
||||||
// always hand the index data that is sorted
|
// always hand the index data that is sorted
|
||||||
if p.cacheDirtySort[k] {
|
if p.cacheDirtySort[k] {
|
||||||
sort.Sort(byteSlices(seriesToFlush[k]))
|
sort.Sort(tsdb.ByteSlices(seriesToFlush[k]))
|
||||||
delete(p.cacheDirtySort, k)
|
delete(p.cacheDirtySort, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -881,9 +881,10 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
|
||||||
// Generate in-memory cache entry of <timestamp,data>.
|
// Generate in-memory cache entry of <timestamp,data>.
|
||||||
v := MarshalEntry(timestamp, data)
|
v := MarshalEntry(timestamp, data)
|
||||||
p.memorySize += uint64(len(v))
|
p.memorySize += uint64(len(v))
|
||||||
|
|
||||||
// Determine if we'll need to sort the values for this key later
|
// Determine if we'll need to sort the values for this key later
|
||||||
a := p.cache[string(key)]
|
a := p.cache[string(key)]
|
||||||
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1], v) == -1)
|
needSort := !(len(a) == 0 || bytes.Compare(a[len(a)-1][0:8], v[0:8]) == -1)
|
||||||
p.cacheDirtySort[string(key)] = needSort
|
p.cacheDirtySort[string(key)] = needSort
|
||||||
|
|
||||||
// Append to cache list.
|
// Append to cache list.
|
||||||
|
@ -908,13 +909,14 @@ func (p *Partition) cursor(key string) *cursor {
|
||||||
c := make([][]byte, len(fc), len(fc)+len(cache))
|
c := make([][]byte, len(fc), len(fc)+len(cache))
|
||||||
copy(c, fc)
|
copy(c, fc)
|
||||||
c = append(c, cache...)
|
c = append(c, cache...)
|
||||||
sort.Sort(byteSlices(c))
|
tsdb.DedupeEntries(c)
|
||||||
return &cursor{cache: c}
|
return &cursor{cache: c}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.cacheDirtySort[key] {
|
if p.cacheDirtySort[key] {
|
||||||
sort.Sort(byteSlices(cache))
|
cache = tsdb.DedupeEntries(cache)
|
||||||
|
p.cache[key] = cache
|
||||||
delete(p.cacheDirtySort, key)
|
delete(p.cacheDirtySort, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1161,10 +1163,3 @@ func u64tob(v uint64) []byte {
|
||||||
func btou64(b []byte) uint64 {
|
func btou64(b []byte) uint64 {
|
||||||
return binary.BigEndian.Uint64(b)
|
return binary.BigEndian.Uint64(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byteSlices represents a sortable slice of byte slices.
|
|
||||||
type byteSlices [][]byte
|
|
||||||
|
|
||||||
func (a byteSlices) Len() int { return len(a) }
|
|
||||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
|
||||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
||||||
|
|
|
@ -494,7 +494,7 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
|
||||||
func mustCreateShard(dir string) *tsdb.Shard {
|
func mustCreateShard(dir string) *tsdb.Shard {
|
||||||
tmpShard := path.Join(dir, "shard")
|
tmpShard := path.Join(dir, "shard")
|
||||||
index := tsdb.NewDatabaseIndex()
|
index := tsdb.NewDatabaseIndex()
|
||||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
|
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
10
tsdb/meta.go
10
tsdb/meta.go
|
@ -980,6 +980,16 @@ type Series struct {
|
||||||
|
|
||||||
id uint64
|
id uint64
|
||||||
measurement *Measurement
|
measurement *Measurement
|
||||||
|
shardIDs map[uint64]bool // shards that have this series defined
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSeries returns an initialized series struct
|
||||||
|
func NewSeries(key string, tags map[string]string) *Series {
|
||||||
|
return &Series{
|
||||||
|
Key: key,
|
||||||
|
Tags: tags,
|
||||||
|
shardIDs: make(map[uint64]bool),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalBinary encodes the object to a binary format.
|
// MarshalBinary encodes the object to a binary format.
|
||||||
|
|
|
@ -182,10 +182,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
|
||||||
for _, ts := range tagSets {
|
for _, ts := range tagSets {
|
||||||
series = append(series, &TestSeries{
|
series = append(series, &TestSeries{
|
||||||
Measurement: m,
|
Measurement: m,
|
||||||
Series: &tsdb.Series{
|
Series: tsdb.NewSeries(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))), ts),
|
||||||
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
|
|
||||||
Tags: ts,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ type Shard struct {
|
||||||
db *bolt.DB // underlying data store
|
db *bolt.DB // underlying data store
|
||||||
index *DatabaseIndex
|
index *DatabaseIndex
|
||||||
path string
|
path string
|
||||||
|
id uint64
|
||||||
|
|
||||||
engine Engine
|
engine Engine
|
||||||
options EngineOptions
|
options EngineOptions
|
||||||
|
@ -52,10 +53,11 @@ type Shard struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewShard returns a new initialized Shard
|
// NewShard returns a new initialized Shard
|
||||||
func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard {
|
func NewShard(id uint64, index *DatabaseIndex, path string, options EngineOptions) *Shard {
|
||||||
return &Shard{
|
return &Shard{
|
||||||
index: index,
|
index: index,
|
||||||
path: path,
|
path: path,
|
||||||
|
id: id,
|
||||||
options: options,
|
options: options,
|
||||||
measurementFields: make(map[string]*MeasurementFields),
|
measurementFields: make(map[string]*MeasurementFields),
|
||||||
|
|
||||||
|
@ -327,8 +329,12 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
|
||||||
for _, p := range points {
|
for _, p := range points {
|
||||||
// see if the series should be added to the index
|
// see if the series should be added to the index
|
||||||
if ss := s.index.series[string(p.Key())]; ss == nil {
|
if ss := s.index.series[string(p.Key())]; ss == nil {
|
||||||
series := &Series{Key: string(p.Key()), Tags: p.Tags()}
|
series := NewSeries(string(p.Key()), p.Tags())
|
||||||
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
|
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), series})
|
||||||
|
} else if !ss.shardIDs[s.id] {
|
||||||
|
// this is the first time this series is being written into this shard, persist it
|
||||||
|
ss.shardIDs[s.id] = true
|
||||||
|
seriesToCreate = append(seriesToCreate, &SeriesCreate{p.Name(), ss})
|
||||||
}
|
}
|
||||||
|
|
||||||
// see if the field definitions need to be saved to the shard
|
// see if the field definitions need to be saved to the shard
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
||||||
tmpShard := path.Join(tmpDir, "shard")
|
tmpShard := path.Join(tmpDir, "shard")
|
||||||
|
|
||||||
index := tsdb.NewDatabaseIndex()
|
index := tsdb.NewDatabaseIndex()
|
||||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
t.Fatalf("error openeing shard: %s", err.Error())
|
t.Fatalf("error openeing shard: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ func TestShardWriteAndIndex(t *testing.T) {
|
||||||
sh.Close()
|
sh.Close()
|
||||||
|
|
||||||
index = tsdb.NewDatabaseIndex()
|
index = tsdb.NewDatabaseIndex()
|
||||||
sh = tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
sh = tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
t.Fatalf("error openeing shard: %s", err.Error())
|
t.Fatalf("error openeing shard: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func TestShardWriteAddNewField(t *testing.T) {
|
||||||
tmpShard := path.Join(tmpDir, "shard")
|
tmpShard := path.Join(tmpDir, "shard")
|
||||||
|
|
||||||
index := tsdb.NewDatabaseIndex()
|
index := tsdb.NewDatabaseIndex()
|
||||||
sh := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(); err != nil {
|
||||||
t.Fatalf("error openeing shard: %s", err.Error())
|
t.Fatalf("error openeing shard: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,7 @@ func TestShard_Autoflush(t *testing.T) {
|
||||||
defer os.RemoveAll(path)
|
defer os.RemoveAll(path)
|
||||||
|
|
||||||
// Open shard with a really low size threshold, high flush interval.
|
// Open shard with a really low size threshold, high flush interval.
|
||||||
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||||
EngineVersion: b1.Format,
|
EngineVersion: b1.Format,
|
||||||
MaxWALSize: 1024, // 1KB
|
MaxWALSize: 1024, // 1KB
|
||||||
WALFlushInterval: 1 * time.Hour,
|
WALFlushInterval: 1 * time.Hour,
|
||||||
|
@ -183,7 +183,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
|
||||||
defer os.RemoveAll(path)
|
defer os.RemoveAll(path)
|
||||||
|
|
||||||
// Open shard with a high size threshold, small time threshold.
|
// Open shard with a high size threshold, small time threshold.
|
||||||
sh := tsdb.NewShard(tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
sh := tsdb.NewShard(1, tsdb.NewDatabaseIndex(), filepath.Join(path, "shard"), tsdb.EngineOptions{
|
||||||
EngineVersion: b1.Format,
|
EngineVersion: b1.Format,
|
||||||
MaxWALSize: 10 * 1024 * 1024, // 10MB
|
MaxWALSize: 10 * 1024 * 1024, // 10MB
|
||||||
WALFlushInterval: 100 * time.Millisecond,
|
WALFlushInterval: 100 * time.Millisecond,
|
||||||
|
@ -266,7 +266,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
tmpDir, _ := ioutil.TempDir("", "shard_test")
|
||||||
tmpShard := path.Join(tmpDir, "shard")
|
tmpShard := path.Join(tmpDir, "shard")
|
||||||
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
shard.Open()
|
shard.Open()
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
@ -301,7 +301,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
|
||||||
tmpDir, _ := ioutil.TempDir("", "")
|
tmpDir, _ := ioutil.TempDir("", "")
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
tmpShard := path.Join(tmpDir, "shard")
|
tmpShard := path.Join(tmpDir, "shard")
|
||||||
shard := tsdb.NewShard(index, tmpShard, tsdb.NewEngineOptions())
|
shard := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
|
||||||
shard.Open()
|
shard.Open()
|
||||||
defer shard.Close()
|
defer shard.Close()
|
||||||
chunkedWrite(shard, points)
|
chunkedWrite(shard, points)
|
||||||
|
|
|
@ -82,7 +82,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
|
||||||
}
|
}
|
||||||
|
|
||||||
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
|
||||||
shard := NewShard(db, shardPath, s.EngineOptions)
|
shard := NewShard(shardID, db, shardPath, s.EngineOptions)
|
||||||
if err := shard.Open(); err != nil {
|
if err := shard.Open(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -236,7 +236,7 @@ func (s *Store) loadShards() error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
shard := NewShard(s.databaseIndexes[db], path, s.EngineOptions)
|
shard := NewShard(shardID, s.databaseIndexes[db], path, s.EngineOptions)
|
||||||
err = shard.Open()
|
err = shard.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
|
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
|
||||||
|
|
Loading…
Reference in New Issue