Merge pull request #8405 from influxdata/sgc-move-meta
move Measurement and Series to inmem packagepull/8431/head
commit
a6c5430397
|
@ -46,9 +46,9 @@ type Index struct {
|
|||
mu sync.RWMutex
|
||||
|
||||
// In-memory metadata index, built on load and updated when new series come in
|
||||
measurements map[string]*tsdb.Measurement // measurement name to object and index
|
||||
series map[string]*tsdb.Series // map series key to the Series object
|
||||
lastID uint64 // last used series ID. They're in memory only for this shard
|
||||
measurements map[string]*Measurement // measurement name to object and index
|
||||
series map[string]*Series // map series key to the Series object
|
||||
lastID uint64 // last used series ID. They're in memory only for this shard
|
||||
|
||||
seriesSketch, seriesTSSketch *hll.Plus
|
||||
measurementsSketch, measurementsTSSketch *hll.Plus
|
||||
|
@ -57,8 +57,8 @@ type Index struct {
|
|||
// NewIndex returns a new initialized Index.
|
||||
func NewIndex() *Index {
|
||||
index := &Index{
|
||||
measurements: make(map[string]*tsdb.Measurement),
|
||||
series: make(map[string]*tsdb.Series),
|
||||
measurements: make(map[string]*Measurement),
|
||||
series: make(map[string]*Series),
|
||||
}
|
||||
|
||||
index.seriesSketch = hll.NewDefaultPlus()
|
||||
|
@ -74,7 +74,7 @@ func (i *Index) Open() (err error) { return nil }
|
|||
func (i *Index) Close() error { return nil }
|
||||
|
||||
// Series returns a series by key.
|
||||
func (i *Index) Series(key []byte) (*tsdb.Series, error) {
|
||||
func (i *Index) Series(key []byte) (*Series, error) {
|
||||
i.mu.RLock()
|
||||
s := i.series[string(key)]
|
||||
i.mu.RUnlock()
|
||||
|
@ -99,7 +99,7 @@ func (i *Index) SeriesN() int64 {
|
|||
}
|
||||
|
||||
// Measurement returns the measurement object from the index by the name
|
||||
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
|
||||
func (i *Index) Measurement(name []byte) (*Measurement, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
return i.measurements[string(name)], nil
|
||||
|
@ -120,11 +120,11 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
|
|||
}
|
||||
|
||||
// MeasurementsByName returns a list of measurements.
|
||||
func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error) {
|
||||
func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
a := make([]*tsdb.Measurement, 0, len(names))
|
||||
a := make([]*Measurement, 0, len(names))
|
||||
for _, name := range names {
|
||||
if m := i.measurements[string(name)]; m != nil {
|
||||
a = append(a, m)
|
||||
|
@ -168,7 +168,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m
|
|||
|
||||
// set the in memory ID for query processing on this shard
|
||||
// The series key and tags are clone to prevent a memory leak
|
||||
series := tsdb.NewSeries([]byte(string(key)), tags.Clone())
|
||||
series := NewSeries([]byte(string(key)), tags.Clone())
|
||||
series.ID = i.lastID + 1
|
||||
i.lastID++
|
||||
|
||||
|
@ -187,7 +187,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m
|
|||
|
||||
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
|
||||
// object for the measurement
|
||||
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *tsdb.Measurement {
|
||||
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement {
|
||||
name = escape.Unescape(name)
|
||||
|
||||
// See if the measurement exists using a read-lock
|
||||
|
@ -207,7 +207,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *tsdb.Measurement
|
|||
// and acquire the write lock
|
||||
m = i.measurements[string(name)]
|
||||
if m == nil {
|
||||
m = tsdb.NewMeasurement(string(name))
|
||||
m = NewMeasurement(string(name))
|
||||
i.measurements[string(name)] = m
|
||||
|
||||
// Add the measurement to the measurements sketch.
|
||||
|
@ -343,7 +343,7 @@ func (i *Index) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) {
|
|||
return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
|
||||
}
|
||||
|
||||
tf := &tsdb.TagFilter{
|
||||
tf := &TagFilter{
|
||||
Op: e.Op,
|
||||
Key: tag.Val,
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ func (i *Index) measurementNamesByNameFilter(op influxql.Token, val string, rege
|
|||
}
|
||||
|
||||
// measurementNamesByTagFilters returns the sorted measurements matching the filters on tag values.
|
||||
func (i *Index) measurementNamesByTagFilters(filter *tsdb.TagFilter) [][]byte {
|
||||
func (i *Index) measurementNamesByTagFilters(filter *TagFilter) [][]byte {
|
||||
// Build a list of measurements matching the filters.
|
||||
var names [][]byte
|
||||
var tagMatch bool
|
||||
|
@ -599,7 +599,7 @@ func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
|
|||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
mms := make(tsdb.Measurements, 0, len(i.measurements))
|
||||
mms := make(Measurements, 0, len(i.measurements))
|
||||
for _, m := range i.measurements {
|
||||
mms = append(mms, m)
|
||||
}
|
||||
|
@ -649,7 +649,7 @@ func (i *Index) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr
|
|||
// SeriesPointIterator returns an influxql iterator over all series.
|
||||
func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
// Read and sort all measurements.
|
||||
mms := make(tsdb.Measurements, 0, len(i.measurements))
|
||||
mms := make(Measurements, 0, len(i.measurements))
|
||||
for _, mm := range i.measurements {
|
||||
mms = append(mms, mm)
|
||||
}
|
||||
|
@ -838,7 +838,7 @@ func NewShardIndex(id uint64, path string, opt tsdb.EngineOptions) tsdb.Index {
|
|||
|
||||
// seriesPointIterator emits series as influxql points.
|
||||
type seriesPointIterator struct {
|
||||
mms tsdb.Measurements
|
||||
mms Measurements
|
||||
keys struct {
|
||||
buf []string
|
||||
i int
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,255 @@
|
|||
package inmem_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb/index/inmem"
|
||||
)
|
||||
|
||||
// Test comparing SeriesIDs for equality.
|
||||
func TestSeriesIDs_Equals(t *testing.T) {
|
||||
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3})
|
||||
ids2 := inmem.SeriesIDs([]uint64{1, 2, 3})
|
||||
ids3 := inmem.SeriesIDs([]uint64{4, 5, 6})
|
||||
|
||||
if !ids1.Equals(ids2) {
|
||||
t.Fatal("expected ids1 == ids2")
|
||||
} else if ids1.Equals(ids3) {
|
||||
t.Fatal("expected ids1 != ids3")
|
||||
}
|
||||
}
|
||||
|
||||
// Test intersecting sets of SeriesIDs.
|
||||
func TestSeriesIDs_Intersect(t *testing.T) {
|
||||
// Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)'
|
||||
ids1 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
ids2 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
exp := inmem.SeriesIDs([]uint64{1, 3})
|
||||
got := ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit for loop when 'i < len(l)'
|
||||
ids1 = inmem.SeriesIDs([]uint64{1})
|
||||
ids2 = inmem.SeriesIDs([]uint64{1, 2})
|
||||
exp = inmem.SeriesIDs([]uint64{1})
|
||||
got = ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test union sets of SeriesIDs.
|
||||
func TestSeriesIDs_Union(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := inmem.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
|
||||
got := ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)' and append remainder from right.
|
||||
ids1 = inmem.SeriesIDs([]uint64{1})
|
||||
ids2 = inmem.SeriesIDs([]uint64{1, 2})
|
||||
exp = inmem.SeriesIDs([]uint64{1, 2})
|
||||
got = ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test removing one set of SeriesIDs from another.
|
||||
func TestSeriesIDs_Reject(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := inmem.SeriesIDs([]uint64{2, 7})
|
||||
got := ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)'.
|
||||
ids1 = inmem.SeriesIDs([]uint64{1})
|
||||
ids2 = inmem.SeriesIDs([]uint64{1, 2})
|
||||
exp = inmem.SeriesIDs{}
|
||||
got = ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 0, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
s := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s.ID = 1
|
||||
m.AddSeries(s)
|
||||
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 1, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if exp, got := "cpu,host=foo", dst[0]; exp != got {
|
||||
t.Fatalf("series mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s1.ID = 1
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
s2.ID = 2
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.TagSets(1, influxql.IteratorOptions{})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s1.ID = 1
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
s2.ID = 2
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.ForEachSeriesByExpr(nil, func(tags models.Tags) error {
|
||||
return nil
|
||||
})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.NewTag(
|
||||
[]byte("host"),
|
||||
[]byte(fmt.Sprintf("host%d", i)))})
|
||||
s.ID = uint64(i)
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
|
||||
m := inmem.NewMeasurement("cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.Tag{
|
||||
Key: []byte("host"),
|
||||
Value: []byte(fmt.Sprintf("host%d", i))}})
|
||||
s.ID = uint64(i)
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
func BenchmarkCreateSeriesIndex_1K(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3))
|
||||
}
|
||||
|
||||
func BenchmarkCreateSeriesIndex_100K(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5))
|
||||
}
|
||||
|
||||
func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5))
|
||||
}
|
||||
|
||||
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
|
||||
idxs := make([]*inmem.DatabaseIndex, 0, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
index, err := inmem.NewDatabaseIndex(fmt.Sprintf("db%d", i))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
idxs = append(idxs, index)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
idx := idxs[n]
|
||||
for _, s := range series {
|
||||
idx.CreateSeriesIndexIfNotExists(s.Measurement, s.Series, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
1607
tsdb/meta.go
1607
tsdb/meta.go
File diff suppressed because it is too large
Load Diff
|
@ -3,225 +3,13 @@ package tsdb_test
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/influxdata/influxdb/tsdb/index/inmem"
|
||||
)
|
||||
|
||||
// Test comparing SeriesIDs for equality.
|
||||
func TestSeriesIDs_Equals(t *testing.T) {
|
||||
ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3})
|
||||
ids2 := tsdb.SeriesIDs([]uint64{1, 2, 3})
|
||||
ids3 := tsdb.SeriesIDs([]uint64{4, 5, 6})
|
||||
|
||||
if !ids1.Equals(ids2) {
|
||||
t.Fatal("expected ids1 == ids2")
|
||||
} else if ids1.Equals(ids3) {
|
||||
t.Fatal("expected ids1 != ids3")
|
||||
}
|
||||
}
|
||||
|
||||
// Test intersecting sets of SeriesIDs.
|
||||
func TestSeriesIDs_Intersect(t *testing.T) {
|
||||
// Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)'
|
||||
ids1 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
ids2 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
exp := tsdb.SeriesIDs([]uint64{1, 3})
|
||||
got := ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit for loop when 'i < len(l)'
|
||||
ids1 = tsdb.SeriesIDs([]uint64{1})
|
||||
ids2 = tsdb.SeriesIDs([]uint64{1, 2})
|
||||
exp = tsdb.SeriesIDs([]uint64{1})
|
||||
got = ids1.Intersect(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test union sets of SeriesIDs.
|
||||
func TestSeriesIDs_Union(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := tsdb.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
|
||||
got := ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)' and append remainder from right.
|
||||
ids1 = tsdb.SeriesIDs([]uint64{1})
|
||||
ids2 = tsdb.SeriesIDs([]uint64{1, 2})
|
||||
exp = tsdb.SeriesIDs([]uint64{1, 2})
|
||||
got = ids1.Union(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Test removing one set of SeriesIDs from another.
|
||||
func TestSeriesIDs_Reject(t *testing.T) {
|
||||
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
|
||||
ids1 := tsdb.SeriesIDs([]uint64{1, 2, 3, 7})
|
||||
ids2 := tsdb.SeriesIDs([]uint64{1, 3, 4, 5, 6})
|
||||
exp := tsdb.SeriesIDs([]uint64{2, 7})
|
||||
got := ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
|
||||
// Test exit because of 'i < len(l)'.
|
||||
ids1 = tsdb.SeriesIDs([]uint64{1})
|
||||
ids2 = tsdb.SeriesIDs([]uint64{1, 2})
|
||||
exp = tsdb.SeriesIDs{}
|
||||
got = ids1.Reject(ids2)
|
||||
|
||||
if !exp.Equals(got) {
|
||||
t.Fatalf("exp=%v, got=%v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 0, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
s := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s.ID = 1
|
||||
m.AddSeries(s)
|
||||
|
||||
var dst []string
|
||||
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
|
||||
if exp, got := 1, len(dst); exp != got {
|
||||
t.Fatalf("series len mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if exp, got := "cpu,host=foo", dst[0]; exp != got {
|
||||
t.Fatalf("series mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
s1 := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s1.ID = 1
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := tsdb.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
s2.ID = 2
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.TagSets(1, influxql.IteratorOptions{})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMeasurement_ForEachSeriesByExpr_Deadlock(t *testing.T) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
s1 := tsdb.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
|
||||
s1.ID = 1
|
||||
m.AddSeries(s1)
|
||||
|
||||
s2 := tsdb.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
|
||||
s2.ID = 2
|
||||
m.AddSeries(s2)
|
||||
|
||||
m.DropSeries(s1)
|
||||
|
||||
// This was deadlocking
|
||||
m.ForEachSeriesByExpr(nil, func(tags models.Tags) error {
|
||||
return nil
|
||||
})
|
||||
if got, exp := len(m.SeriesIDs()), 1; got != exp {
|
||||
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := tsdb.NewSeries([]byte("cpu"), models.Tags{models.NewTag(
|
||||
[]byte("host"),
|
||||
[]byte(fmt.Sprintf("host%d", i)))})
|
||||
s.ID = uint64(i)
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host =~ /host\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
|
||||
m := tsdb.NewMeasurement("cpu")
|
||||
for i := 0; i < 100000; i++ {
|
||||
s := tsdb.NewSeries([]byte("cpu"), models.Tags{models.Tag{
|
||||
Key: []byte("host"),
|
||||
Value: []byte(fmt.Sprintf("host%d", i))}})
|
||||
s.ID = uint64(i)
|
||||
m.AddSeries(s)
|
||||
}
|
||||
|
||||
if exp, got := 100000, len(m.SeriesKeys()); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
stmt, err := influxql.NewParser(strings.NewReader(`SELECT * FROM cpu WHERE host !~ /foo\d+/`)).ParseStatement()
|
||||
if err != nil {
|
||||
b.Fatalf("invalid statement: %s", err)
|
||||
}
|
||||
|
||||
selectStmt := stmt.(*influxql.SelectStatement)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ids := m.IDsForExpr(selectStmt.Condition.(*influxql.BinaryExpr))
|
||||
if exp, got := 100000, len(ids); exp != got {
|
||||
b.Fatalf("series count mismatch: exp %v got %v", exp, got)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Ensure tags can be marshaled into a byte slice.
|
||||
func TestMarshalTags(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
|
@ -273,42 +61,9 @@ func benchmarkMarshalTags(b *testing.B, keyN int) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func BenchmarkCreateSeriesIndex_1K(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3))
|
||||
}
|
||||
|
||||
func BenchmarkCreateSeriesIndex_100K(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5))
|
||||
}
|
||||
|
||||
func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
|
||||
benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5))
|
||||
}
|
||||
|
||||
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
|
||||
idxs := make([]*tsdb.DatabaseIndex, 0, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
index, err := tsdb.NewDatabaseIndex(fmt.Sprintf("db%d", i))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
idxs = append(idxs, index)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
idx := idxs[n]
|
||||
for _, s := range series {
|
||||
idx.CreateSeriesIndexIfNotExists(s.Measurement, s.Series, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
type TestSeries struct {
|
||||
Measurement string
|
||||
Series *tsdb.Series
|
||||
Series *inmem.Series
|
||||
}
|
||||
|
||||
func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
|
||||
|
@ -319,7 +74,7 @@ func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
|
|||
for _, ts := range tagSets {
|
||||
series = append(series, &TestSeries{
|
||||
Measurement: m,
|
||||
Series: tsdb.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)),
|
||||
Series: inmem.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -429,12 +429,6 @@ type FieldCreate struct {
|
|||
Field *Field
|
||||
}
|
||||
|
||||
// SeriesCreate holds information for a series to create.
|
||||
type SeriesCreate struct {
|
||||
Measurement string
|
||||
Series *Series
|
||||
}
|
||||
|
||||
// WritePoints will write the raw data points and any new metadata to the index in the shard.
|
||||
func (s *Shard) WritePoints(points []models.Point) error {
|
||||
if err := s.ready(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue