Update tsdb package from OSS

pull/10616/head
Edd Robinson 2018-09-28 11:30:19 +01:00
parent 074f263e08
commit 3385f389f7
25 changed files with 5116 additions and 46 deletions

3
go.mod
View File

@ -5,7 +5,7 @@ require (
github.com/BurntSushi/toml v0.3.0 // indirect
github.com/Masterminds/semver v1.4.2 // indirect
github.com/NYTimes/gziphandler v1.0.1
github.com/RoaringBitmap/roaring v0.4.15
github.com/RoaringBitmap/roaring v0.4.15 // indirect
github.com/alecthomas/kingpin v2.2.6+incompatible // indirect
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
@ -52,6 +52,7 @@ require (
github.com/influxdata/influxdb v0.0.0-20180904211643-ab81104697f6
github.com/influxdata/influxql v0.0.0-20180823200743-a7267bff5327
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e
github.com/influxdata/roaring v0.4.12
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jessevdk/go-flags v1.4.0
github.com/jsternberg/zap-logfmt v1.1.0 // indirect

2
go.sum
View File

@ -131,6 +131,8 @@ github.com/influxdata/influxql v0.0.0-20180823200743-a7267bff5327/go.mod h1:KpVI
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/platform v0.0.0-20180912163125-1786402d48c7/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug=
github.com/influxdata/roaring v0.4.12 h1:3DzTjKHcXFs4P3D7xRLpCqVrfK6eFRQT0c8BG99M3Ms=
github.com/influxdata/roaring v0.4.12/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE=
github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a h1:vMqgISSVkIqWxCIZs8m1L4096temR7IbYyNdMiBxSPA=
github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a/go.mod h1:9GkyshztGufsdPQWjH+ifgnIr3xNUL5syI70g2dzU1o=
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4=

View File

@ -34,7 +34,7 @@ func (a *FloatArray) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *FloatArray) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -58,11 +58,12 @@ func (a *FloatArray) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
@ -234,7 +235,7 @@ func (a *IntegerArray) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *IntegerArray) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -258,11 +259,12 @@ func (a *IntegerArray) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
@ -434,7 +436,7 @@ func (a *UnsignedArray) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *UnsignedArray) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -458,11 +460,12 @@ func (a *UnsignedArray) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
@ -634,7 +637,7 @@ func (a *StringArray) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *StringArray) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -658,11 +661,12 @@ func (a *StringArray) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
@ -834,7 +838,7 @@ func (a *BooleanArray) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *BooleanArray) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -858,11 +862,12 @@ func (a *BooleanArray) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must

View File

@ -32,7 +32,7 @@ func (a *{{ $typename}}) Len() int {
return len(a.Timestamps)
}
// Exclude removes the subset of values not in [min, max]. The values must
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *{{ $typename }}) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
@ -56,11 +56,12 @@ func (a *{{ $typename }}) Exclude(min, max int64) {
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
}
} else {
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
return
}
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must

View File

@ -0,0 +1,210 @@
package tsdb
import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
)
func makeIntegerArray(count int, min, max int64) *IntegerArray {
vals := NewIntegerArrayLen(count)
ts := min
inc := (max - min) / int64(count)
for i := 0; i < count; i++ {
vals.Timestamps[i] = ts
ts += inc
}
return vals
}
func makeIntegerArrayFromSlice(t []int64) *IntegerArray {
iv := NewIntegerArrayLen(len(t))
copy(iv.Timestamps, t)
return iv
}
func TestIntegerArray_FindRangeNoValues(t *testing.T) {
var vals IntegerArray
l, r := vals.FindRange(0, 100)
if exp := -1; l != exp {
t.Errorf("invalid l; exp=%d, got=%d", exp, l)
}
if exp := -1; r != exp {
t.Errorf("invalid r; exp=%d, got=%d", exp, r)
}
}
func TestIntegerArray_FindRange(t *testing.T) {
vals := makeIntegerArrayFromSlice([]int64{10, 11, 13, 15, 17, 20, 21})
cases := []struct {
min, max int64
l, r int
}{
{12, 20, 2, 5},
{22, 40, -1, -1},
{1, 9, -1, -1},
{1, 10, 0, 0},
{1, 11, 0, 1},
{15, 15, 3, 3},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%d→%d", tc.min, tc.max), func(t *testing.T) {
l, r := vals.FindRange(tc.min, tc.max)
if l != tc.l {
t.Errorf("left: got %d, exp %d", l, tc.l)
}
if r != tc.r {
t.Errorf("right: got %d, exp %d", r, tc.r)
}
})
}
}
func TestIntegerArray_Exclude(t *testing.T) {
cases := []struct {
n string
min, max int64
exp []int64
}{
{"excl bad range", 18, 11, []int64{10, 12, 14, 16, 18}},
{"excl none-lo", 0, 9, []int64{10, 12, 14, 16, 18}},
{"excl none-hi", 19, 30, []int64{10, 12, 14, 16, 18}},
{"excl first", 0, 10, []int64{12, 14, 16, 18}},
{"excl last", 18, 20, []int64{10, 12, 14, 16}},
{"excl all but first and last", 12, 16, []int64{10, 18}},
{"excl none in middle", 13, 13, []int64{10, 12, 14, 16, 18}},
{"excl middle", 14, 14, []int64{10, 12, 16, 18}},
{"excl suffix", 14, 18, []int64{10, 12}},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%s[%d,%d]", tc.n, tc.min, tc.max), func(t *testing.T) {
vals := makeIntegerArray(5, 10, 20)
vals.Exclude(tc.min, tc.max)
got := vals.Timestamps
if !cmp.Equal(got, tc.exp) {
t.Errorf("unexpected values -got/+exp\n%s", cmp.Diff(got, tc.exp))
}
})
}
}
func TestIntegerArray_Include(t *testing.T) {
cases := []struct {
n string
min, max int64
exp []int64
}{
{"incl none-lo", 0, 9, []int64{}},
{"incl none-hi", 19, 30, []int64{}},
{"incl first", 0, 10, []int64{10}},
{"incl last", 18, 20, []int64{18}},
{"incl all but first and last", 12, 16, []int64{12, 14, 16}},
{"incl none in middle", 13, 13, []int64{}},
{"incl middle", 14, 14, []int64{14}},
}
for _, tc := range cases {
t.Run(fmt.Sprintf("%s[%d,%d]", tc.n, tc.min, tc.max), func(t *testing.T) {
vals := makeIntegerArray(5, 10, 20)
vals.Include(tc.min, tc.max)
got := vals.Timestamps
if !cmp.Equal(got, tc.exp) {
t.Errorf("unexpected values -got/+exp\n%s", cmp.Diff(got, tc.exp))
}
})
}
}
func benchExclude(b *testing.B, vals *IntegerArray, min, max int64) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
vals.Exclude(min, max)
}
}
func BenchmarkIntegerArray_ExcludeNone_1000(b *testing.B) {
benchExclude(b, makeIntegerArray(1000, 1000, 2000), 0, 500)
}
func BenchmarkIntegerArray_ExcludeMiddleHalf_1000(b *testing.B) {
benchExclude(b, makeIntegerArray(1000, 1000, 2000), 1250, 1750)
}
func BenchmarkIntegerArray_ExcludeFirst_1000(b *testing.B) {
benchExclude(b, makeIntegerArray(1000, 1000, 2000), 0, 1000)
}
func BenchmarkIntegerArray_ExcludeLast_1000(b *testing.B) {
benchExclude(b, makeIntegerArray(1000, 1000, 2000), 1999, 2000)
}
func BenchmarkIntegerArray_ExcludeNone_10000(b *testing.B) {
benchExclude(b, makeIntegerArray(10000, 10000, 20000), 00, 5000)
}
func BenchmarkIntegerArray_ExcludeMiddleHalf_10000(b *testing.B) {
benchExclude(b, makeIntegerArray(10000, 10000, 20000), 12500, 17500)
}
func BenchmarkIntegerArray_ExcludeFirst_10000(b *testing.B) {
benchExclude(b, makeIntegerArray(10000, 10000, 20000), 0, 10000)
}
func BenchmarkIntegerArray_ExcludeLast_10000(b *testing.B) {
benchExclude(b, makeIntegerArray(10000, 10000, 20000), 19999, 20000)
}
func benchInclude(b *testing.B, vals *IntegerArray, min, max int64) {
src := *vals
tmp := NewIntegerArrayLen(vals.Len())
copy(tmp.Timestamps, vals.Timestamps)
copy(tmp.Values, vals.Values)
b.ResetTimer()
for i := 0; i < b.N; i++ {
vals.Include(min, max)
*vals = src
copy(vals.Timestamps, tmp.Timestamps)
copy(vals.Values, tmp.Values)
}
}
func BenchmarkIntegerArray_IncludeNone_1000(b *testing.B) {
benchInclude(b, makeIntegerArray(1000, 1000, 2000), 0, 500)
}
func BenchmarkIntegerArray_IncludeMiddleHalf_1000(b *testing.B) {
benchInclude(b, makeIntegerArray(1000, 1000, 2000), 1250, 1750)
}
func BenchmarkIntegerArray_IncludeFirst_1000(b *testing.B) {
benchInclude(b, makeIntegerArray(1000, 1000, 2000), 0, 1000)
}
func BenchmarkIntegerArray_IncludeLast_1000(b *testing.B) {
benchInclude(b, makeIntegerArray(1000, 1000, 2000), 1999, 2000)
}
func BenchmarkIntegerArray_IncludeNone_10000(b *testing.B) {
benchInclude(b, makeIntegerArray(10000, 10000, 20000), 00, 5000)
}
func BenchmarkIntegerArray_IncludeMiddleHalf_10000(b *testing.B) {
benchInclude(b, makeIntegerArray(10000, 10000, 20000), 12500, 17500)
}
func BenchmarkIntegerArray_IncludeFirst_10000(b *testing.B) {
benchInclude(b, makeIntegerArray(10000, 10000, 20000), 0, 10000)
}
func BenchmarkIntegerArray_IncludeLast_10000(b *testing.B) {
benchInclude(b, makeIntegerArray(10000, 10000, 20000), 19999, 20000)
}

View File

@ -9,10 +9,10 @@ import (
"sync"
"unsafe"
"github.com/influxdata/influxdb/pkg/mmap"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/estimator"
"github.com/influxdata/platform/pkg/estimator/hll"
"github.com/influxdata/platform/pkg/mmap"
"github.com/influxdata/platform/tsdb"
)

View File

@ -15,10 +15,10 @@ import (
"unsafe"
"github.com/influxdata/influxdb/pkg/bloom"
"github.com/influxdata/influxdb/pkg/mmap"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/estimator"
"github.com/influxdata/platform/pkg/estimator/hll"
"github.com/influxdata/platform/pkg/mmap"
"github.com/influxdata/platform/tsdb"
)

View File

@ -8,9 +8,9 @@ import (
"sort"
"unsafe"
"github.com/influxdata/influxdb/pkg/rhh"
"github.com/influxdata/platform/pkg/estimator"
"github.com/influxdata/platform/pkg/estimator/hll"
"github.com/influxdata/platform/pkg/rhh"
"github.com/influxdata/platform/tsdb"
)

View File

@ -7,7 +7,7 @@ import (
"fmt"
"io"
"github.com/influxdata/influxdb/pkg/rhh"
"github.com/influxdata/platform/pkg/rhh"
"github.com/influxdata/platform/tsdb"
)

664
tsdb/index_test.go Normal file
View File

@ -0,0 +1,664 @@
package tsdb_test
import (
"compress/gzip"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"sync"
"testing"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/pkg/slices"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/index/inmem"
"github.com/influxdata/platform/tsdb/index/tsi1"
)
func toSeriesIDs(ids []uint64) []tsdb.SeriesID {
sids := make([]tsdb.SeriesID, 0, len(ids))
for _, id := range ids {
sids = append(sids, tsdb.NewSeriesID(id))
}
return sids
}
// Ensure iterator can merge multiple iterators together.
func TestMergeSeriesIDIterators(t *testing.T) {
itr := tsdb.MergeSeriesIDIterators(
tsdb.NewSeriesIDSliceIterator(toSeriesIDs([]uint64{1, 2, 3})),
tsdb.NewSeriesIDSliceIterator(nil),
tsdb.NewSeriesIDSliceIterator(toSeriesIDs([]uint64{1, 2, 3, 4})),
)
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: tsdb.NewSeriesID(1)}) {
t.Fatalf("unexpected elem(0): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: tsdb.NewSeriesID(2)}) {
t.Fatalf("unexpected elem(1): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: tsdb.NewSeriesID(3)}) {
t.Fatalf("unexpected elem(2): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(e, tsdb.SeriesIDElem{SeriesID: tsdb.NewSeriesID(4)}) {
t.Fatalf("unexpected elem(3): %#v", e)
}
if e, err := itr.Next(); err != nil {
t.Fatal(err)
} else if !e.SeriesID.IsZero() {
t.Fatalf("expected nil elem: %#v", e)
}
}
func TestIndexSet_MeasurementNamesByExpr(t *testing.T) {
// Setup indexes
indexes := map[string]*Index{}
for _, name := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(name)
idx.AddSeries("cpu", map[string]string{"region": "east"}, models.Integer)
idx.AddSeries("cpu", map[string]string{"region": "west", "secret": "foo"}, models.Integer)
idx.AddSeries("disk", map[string]string{"secret": "foo"}, models.Integer)
idx.AddSeries("mem", map[string]string{"region": "west"}, models.Integer)
idx.AddSeries("gpu", map[string]string{"region": "east"}, models.Integer)
idx.AddSeries("pci", map[string]string{"region": "east", "secret": "foo"}, models.Integer)
indexes[name] = idx
defer idx.Close()
}
authorizer := &internal.AuthorizerMock{
AuthorizeSeriesReadFn: func(database string, measurement []byte, tags models.Tags) bool {
if tags.GetString("secret") != "" {
t.Logf("Rejecting series db=%s, m=%s, tags=%v", database, measurement, tags)
return false
}
return true
},
}
type example struct {
name string
expr influxql.Expr
expected [][]byte
}
// These examples should be run without any auth.
examples := []example{
{name: "all", expected: slices.StringsToBytes("cpu", "disk", "gpu", "mem", "pci")},
{name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("cpu", "mem")},
{name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("gpu", "pci")},
{name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem", "pci")},
{name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("gpu", "pci")},
}
// These examples should be run with the authorizer.
authExamples := []example{
{name: "all", expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "EQ", expr: influxql.MustParseExpr(`region = 'west'`), expected: slices.StringsToBytes("mem")},
{name: "NEQ", expr: influxql.MustParseExpr(`region != 'west'`), expected: slices.StringsToBytes("gpu")},
{name: "EQREGEX", expr: influxql.MustParseExpr(`region =~ /.*st/`), expected: slices.StringsToBytes("cpu", "gpu", "mem")},
{name: "NEQREGEX", expr: influxql.MustParseExpr(`region !~ /.*est/`), expected: slices.StringsToBytes("gpu")},
}
for _, idx := range tsdb.RegisteredIndexes() {
t.Run(idx, func(t *testing.T) {
t.Run("no authorization", func(t *testing.T) {
for _, example := range examples {
t.Run(example.name, func(t *testing.T) {
names, err := indexes[idx].IndexSet().MeasurementNamesByExpr(nil, example.expr)
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, example.expected) {
t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected))
}
})
}
})
t.Run("with authorization", func(t *testing.T) {
for _, example := range authExamples {
t.Run(example.name, func(t *testing.T) {
names, err := indexes[idx].IndexSet().MeasurementNamesByExpr(authorizer, example.expr)
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(names, example.expected) {
t.Fatalf("got names: %v, expected %v", slices.BytesToStrings(names), slices.BytesToStrings(example.expected))
}
})
}
})
})
}
}
func TestIndexSet_DedupeInmemIndexes(t *testing.T) {
testCases := []struct {
tsiN int // Quantity of TSI indexes
inmem1N int // Quantity of ShardIndexes proxying the first inmem Index
inmem2N int // Quantity of ShardIndexes proxying the second inmem Index
uniqueN int // Quantity of total, deduplicated indexes
}{
{tsiN: 1, inmem1N: 0, uniqueN: 1},
{tsiN: 2, inmem1N: 0, uniqueN: 2},
{tsiN: 0, inmem1N: 1, uniqueN: 1},
{tsiN: 0, inmem1N: 2, uniqueN: 1},
{tsiN: 0, inmem1N: 1, inmem2N: 1, uniqueN: 2},
{tsiN: 0, inmem1N: 2, inmem2N: 2, uniqueN: 2},
{tsiN: 2, inmem1N: 2, inmem2N: 2, uniqueN: 4},
}
for _, testCase := range testCases {
name := fmt.Sprintf("%d/%d/%d -> %d", testCase.tsiN, testCase.inmem1N, testCase.inmem2N, testCase.uniqueN)
t.Run(name, func(t *testing.T) {
var indexes []tsdb.Index
for i := 0; i < testCase.tsiN; i++ {
indexes = append(indexes, MustOpenNewIndex(tsi1.IndexName))
}
if testCase.inmem1N > 0 {
sfile := MustOpenSeriesFile()
opts := tsdb.NewEngineOptions()
opts.IndexVersion = inmem.IndexName
opts.InmemIndex = inmem.NewIndex("db", sfile.SeriesFile)
for i := 0; i < testCase.inmem1N; i++ {
indexes = append(indexes, inmem.NewShardIndex(uint64(i), tsdb.NewSeriesIDSet(), opts))
}
}
if testCase.inmem2N > 0 {
sfile := MustOpenSeriesFile()
opts := tsdb.NewEngineOptions()
opts.IndexVersion = inmem.IndexName
opts.InmemIndex = inmem.NewIndex("db", sfile.SeriesFile)
for i := 0; i < testCase.inmem2N; i++ {
indexes = append(indexes, inmem.NewShardIndex(uint64(i), tsdb.NewSeriesIDSet(), opts))
}
}
is := tsdb.IndexSet{Indexes: indexes}.DedupeInmemIndexes()
if len(is.Indexes) != testCase.uniqueN {
t.Errorf("expected %d indexes, got %d", testCase.uniqueN, len(is.Indexes))
}
})
}
}
func TestIndex_Sketches(t *testing.T) {
checkCardinalities := func(t *testing.T, index *Index, state string, series, tseries, measurements, tmeasurements int) {
t.Helper()
// Get sketches and check cardinality...
sketch, tsketch, err := index.SeriesSketches()
if err != nil {
t.Fatal(err)
}
// delta calculates a rough 10% delta. If i is small then a minimum value
// of 2 is used.
delta := func(i int) int {
v := i / 10
if v == 0 {
v = 2
}
return v
}
// series cardinality should be well within 10%.
if got, exp := int(sketch.Count()), series; got-exp < -delta(series) || got-exp > delta(series) {
t.Errorf("[%s] got series cardinality %d, expected ~%d", state, got, exp)
}
// check series tombstones
if got, exp := int(tsketch.Count()), tseries; got-exp < -delta(tseries) || got-exp > delta(tseries) {
t.Errorf("[%s] got series tombstone cardinality %d, expected ~%d", state, got, exp)
}
// Check measurement cardinality.
if sketch, tsketch, err = index.MeasurementsSketches(); err != nil {
t.Fatal(err)
}
if got, exp := int(sketch.Count()), measurements; got != exp { //got-exp < -delta(measurements) || got-exp > delta(measurements) {
t.Errorf("[%s] got measurement cardinality %d, expected ~%d", state, got, exp)
}
if got, exp := int(tsketch.Count()), tmeasurements; got != exp { //got-exp < -delta(tmeasurements) || got-exp > delta(tmeasurements) {
t.Errorf("[%s] got measurement tombstone cardinality %d, expected ~%d", state, got, exp)
}
}
test := func(t *testing.T, index string) error {
idx := MustNewIndex(index)
if index, ok := idx.Index.(*tsi1.Index); ok {
// Override the log file max size to force a log file compaction sooner.
// This way, we will test the sketches are correct when they have been
// compacted into IndexFiles, and also when they're loaded from
// IndexFiles after a re-open.
tsi1.WithMaximumLogFileSize(1 << 10)(index)
}
// Open the index
idx.MustOpen()
defer idx.Close()
series := genTestSeries(10, 5, 3)
// Add series to index.
for _, serie := range series {
if err := idx.AddSeries(serie.Measurement, serie.Tags.Map(), serie.Type); err != nil {
t.Fatal(err)
}
}
// Check cardinalities after adding series.
checkCardinalities(t, idx, "initial", 2430, 0, 10, 0)
// Re-open step only applies to the TSI index.
if _, ok := idx.Index.(*tsi1.Index); ok {
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen", 2430, 0, 10, 0)
}
// Drop some series
if err := idx.DropMeasurement([]byte("measurement2")); err != nil {
return err
} else if err := idx.DropMeasurement([]byte("measurement5")); err != nil {
return err
}
// Check cardinalities after the delete
switch idx.Index.(type) {
case *tsi1.Index:
checkCardinalities(t, idx, "initial|reopen|delete", 2923, 0, 10, 2)
case *inmem.ShardIndex:
checkCardinalities(t, idx, "initial|reopen|delete", 2430, 486, 10, 2)
default:
panic("unreachable")
}
// Re-open step only applies to the TSI index.
if _, ok := idx.Index.(*tsi1.Index); ok {
// Re-open the index.
if err := idx.Reopen(); err != nil {
panic(err)
}
// Check cardinalities after the reopen
checkCardinalities(t, idx, "initial|reopen|delete|reopen", 2923, 0, 10, 2)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
if err := test(t, index); err != nil {
t.Fatal(err)
}
})
}
}
// Index wraps a series file and index.
type Index struct {
tsdb.Index
rootPath string
indexType string
sfile *tsdb.SeriesFile
}
// MustNewIndex will initialize a new index using the provide type. It creates
// everything under the same root directory so it can be cleanly removed on Close.
//
// The index will not be opened.
func MustNewIndex(index string) *Index {
opts := tsdb.NewEngineOptions()
opts.IndexVersion = index
rootPath, err := ioutil.TempDir("", "influxdb-tsdb")
if err != nil {
panic(err)
}
seriesPath, err := ioutil.TempDir(rootPath, tsdb.SeriesFileDirectory)
if err != nil {
panic(err)
}
sfile := tsdb.NewSeriesFile(seriesPath)
if err := sfile.Open(); err != nil {
panic(err)
}
if index == inmem.IndexName {
opts.InmemIndex = inmem.NewIndex("db0", sfile)
}
i, err := tsdb.NewIndex(0, "db0", filepath.Join(rootPath, "index"), tsdb.NewSeriesIDSet(), sfile, opts)
if err != nil {
panic(err)
}
if testing.Verbose() {
i.WithLogger(logger.New(os.Stderr))
}
idx := &Index{
Index: i,
indexType: index,
rootPath: rootPath,
sfile: sfile,
}
return idx
}
// MustOpenNewIndex will initialize a new index using the provide type and opens
// it.
func MustOpenNewIndex(index string) *Index {
idx := MustNewIndex(index)
idx.MustOpen()
return idx
}
// MustOpen opens the underlying index or panics.
func (i *Index) MustOpen() {
if err := i.Index.Open(); err != nil {
panic(err)
}
}
func (idx *Index) IndexSet() *tsdb.IndexSet {
return &tsdb.IndexSet{Indexes: []tsdb.Index{idx.Index}, SeriesFile: idx.sfile}
}
func (idx *Index) AddSeries(name string, tags map[string]string, typ models.FieldType) error {
t := models.NewTags(tags)
key := fmt.Sprintf("%s,%s", name, t.HashKey())
return idx.CreateSeriesIfNotExists([]byte(key), []byte(name), t, typ)
}
// Reopen closes and re-opens the underlying index, without removing any data.
func (i *Index) Reopen() error {
if err := i.Index.Close(); err != nil {
return err
}
if err := i.sfile.Close(); err != nil {
return err
}
i.sfile = tsdb.NewSeriesFile(i.sfile.Path())
if err := i.sfile.Open(); err != nil {
return err
}
opts := tsdb.NewEngineOptions()
opts.IndexVersion = i.indexType
if i.indexType == inmem.IndexName {
opts.InmemIndex = inmem.NewIndex("db0", i.sfile)
}
idx, err := tsdb.NewIndex(0, "db0", filepath.Join(i.rootPath, "index"), tsdb.NewSeriesIDSet(), i.sfile, opts)
if err != nil {
return err
}
i.Index = idx
return i.Index.Open()
}
// Close closes the index cleanly and removes all on-disk data.
func (i *Index) Close() error {
if err := i.Index.Close(); err != nil {
return err
}
if err := i.sfile.Close(); err != nil {
return err
}
//return os.RemoveAll(i.rootPath)
return nil
}
// This benchmark compares the TagSets implementation across index types.
//
// In the case of the TSI index, TagSets has to merge results across all several
// index partitions.
//
// Typical results on an i7 laptop.
//
// BenchmarkIndexSet_TagSets/1M_series/inmem-8 100 10430732 ns/op 3556728 B/op 51 allocs/op
// BenchmarkIndexSet_TagSets/1M_series/tsi1-8 100 18995530 ns/op 5221180 B/op 20379 allocs/op
func BenchmarkIndexSet_TagSets(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
// 1M series generated with:
// $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1
fd, err := os.Open("testdata/line-protocol-1M.txt.gz")
if err != nil {
b.Fatal(err)
}
gzr, err := gzip.NewReader(fd)
if err != nil {
fd.Close()
b.Fatal(err)
}
data, err := ioutil.ReadAll(gzr)
if err != nil {
b.Fatal(err)
}
if err := fd.Close(); err != nil {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
if err != nil {
b.Fatal(err)
}
// setup writes all of the above points to the index.
setup := func(idx *Index) {
batchSize := 10000
for j := 0; j < 1; j++ {
for i := 0; i < len(points); i += batchSize {
collection := tsdb.NewSeriesCollection(points[i : i+batchSize])
if err := idx.CreateSeriesListIfNotExists(collection); err != nil {
b.Fatal(err)
}
}
}
}
// TODO(edd): refactor how we call into tag sets in the tsdb package.
type indexTagSets interface {
TagSets(name []byte, options query.IteratorOptions) ([]*query.TagSet, error)
}
var errResult error
// This benchmark will merge eight bitsets each containing ~10,000 series IDs.
b.Run("1M series", func(b *testing.B) {
b.ReportAllocs()
for _, indexType := range tsdb.RegisteredIndexes() {
idx := MustOpenNewIndex(indexType)
setup(idx)
name := []byte("m4")
opt := query.IteratorOptions{Condition: influxql.MustParseExpr(`"tag5"::tag = 'value0'`)}
indexSet := tsdb.IndexSet{
SeriesFile: idx.sfile,
Indexes: []tsdb.Index{idx.Index},
} // For TSI implementation
var ts func() ([]*query.TagSet, error)
// TODO(edd): this is somewhat awkward. We should unify this difference somewhere higher
// up than the engine. I don't want to open an engine do a benchmark on
// different index implementations.
if indexType == tsdb.InmemIndexName {
ts = func() ([]*query.TagSet, error) {
return idx.Index.(indexTagSets).TagSets(name, opt)
}
} else {
ts = func() ([]*query.TagSet, error) {
return indexSet.TagSets(idx.sfile, name, opt)
}
}
b.Run(indexType, func(b *testing.B) {
for i := 0; i < b.N; i++ {
// Will call TagSets on the appropriate implementation.
_, errResult = ts()
if errResult != nil {
b.Fatal(err)
}
}
})
if err := idx.Close(); err != nil {
b.Fatal(err)
}
}
})
}
// This benchmark concurrently writes series to the index and fetches cached bitsets.
// The idea is to emphasize the performance difference when bitset caching is on and off.
//
// Typical results for an i7 laptop
//
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000/cache-8 1 5963346204 ns/op 2499655768 B/op 23964183 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/inmem/queries_100000/no_cache-8 1 5314841090 ns/op 2499495280 B/op 23963322 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000/cache-8 1 1645048376 ns/op 2215402840 B/op 23048978 allocs/op
// BenchmarkIndex_ConcurrentWriteQuery/tsi1/queries_100000/no_cache-8 1 22242155616 ns/op 28277544136 B/op 79620463 allocs/op
func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
// Read line-protocol and coerce into tsdb format.
// 1M series generated with:
// $inch -b 10000 -c 1 -t 10,10,10,10,10,10 -f 1 -m 5 -p 1
fd, err := os.Open("testdata/line-protocol-1M.txt.gz")
if err != nil {
b.Fatal(err)
}
gzr, err := gzip.NewReader(fd)
if err != nil {
fd.Close()
b.Fatal(err)
}
data, err := ioutil.ReadAll(gzr)
if err != nil {
b.Fatal(err)
}
if err := fd.Close(); err != nil {
b.Fatal(err)
}
points, err := models.ParsePoints(data)
if err != nil {
b.Fatal(err)
}
runBenchmark := func(b *testing.B, index string, queryN int) {
idx := MustOpenNewIndex(index)
var wg sync.WaitGroup
begin := make(chan struct{})
// Run concurrent iterator...
runIter := func() {
keys := [][]string{
{"m0", "tag2", "value4"},
{"m1", "tag3", "value5"},
{"m2", "tag4", "value6"},
{"m3", "tag0", "value8"},
{"m4", "tag5", "value0"},
}
<-begin // Wait for writes to land
for i := 0; i < queryN/5; i++ {
for _, key := range keys {
itr, err := idx.TagValueSeriesIDIterator([]byte(key[0]), []byte(key[1]), []byte(key[2]))
if err != nil {
b.Fatal(err)
}
if itr == nil {
panic("should not happen")
}
if err := itr.Close(); err != nil {
b.Fatal(err)
}
}
}
}
batchSize := 10000
wg.Add(1)
go func() { defer wg.Done(); runIter() }()
var once sync.Once
for j := 0; j < b.N; j++ {
for i := 0; i < len(points); i += batchSize {
collection := tsdb.NewSeriesCollection(points[i : i+batchSize])
if err := idx.CreateSeriesListIfNotExists(collection); err != nil {
b.Fatal(err)
}
once.Do(func() { close(begin) })
}
// Wait for queries to finish
wg.Wait()
// Reset the index...
b.StopTimer()
if err := idx.Close(); err != nil {
b.Fatal(err)
}
// Re-open everything
idx = MustOpenNewIndex(index)
wg.Add(1)
begin = make(chan struct{})
once = sync.Once{}
go func() { defer wg.Done(); runIter() }()
b.StartTimer()
}
}
queries := []int{1e5}
for _, indexType := range tsdb.RegisteredIndexes() {
b.Run(indexType, func(b *testing.B) {
for _, queryN := range queries {
b.Run(fmt.Sprintf("queries %d", queryN), func(b *testing.B) {
b.Run("cache", func(b *testing.B) {
tsi1.EnableBitsetCache = true
runBenchmark(b, indexType, queryN)
})
b.Run("no cache", func(b *testing.B) {
tsi1.EnableBitsetCache = false
runBenchmark(b, indexType, queryN)
})
})
}
})
}
}

263
tsdb/meta_test.go Normal file
View File

@ -0,0 +1,263 @@
package tsdb_test
import (
"bytes"
"fmt"
"testing"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
)
// Ensure tags can be marshaled into a byte slice.
func TestMarshalTags(t *testing.T) {
for i, tt := range []struct {
tags map[string]string
result []byte
}{
{
tags: nil,
result: nil,
},
{
tags: map[string]string{"foo": "bar"},
result: []byte(`foo|bar`),
},
{
tags: map[string]string{"foo": "bar", "baz": "battttt"},
result: []byte(`baz|foo|battttt|bar`),
},
{
tags: map[string]string{"baz": "battttt", "foo": "bar"},
result: []byte(`baz|foo|battttt|bar`),
},
} {
result := tsdb.MarshalTags(tt.tags)
if !bytes.Equal(result, tt.result) {
t.Fatalf("%d. unexpected result: exp=%s, got=%s", i, tt.result, result)
}
}
}
func BenchmarkMarshalTags_KeyN1(b *testing.B) { benchmarkMarshalTags(b, 1) }
func BenchmarkMarshalTags_KeyN3(b *testing.B) { benchmarkMarshalTags(b, 3) }
func BenchmarkMarshalTags_KeyN5(b *testing.B) { benchmarkMarshalTags(b, 5) }
func BenchmarkMarshalTags_KeyN10(b *testing.B) { benchmarkMarshalTags(b, 10) }
func benchmarkMarshalTags(b *testing.B, keyN int) {
const keySize, valueSize = 8, 15
// Generate tag map.
tags := make(map[string]string)
for i := 0; i < keyN; i++ {
tags[fmt.Sprintf("%0*d", keySize, i)] = fmt.Sprintf("%0*d", valueSize, i)
}
// Unmarshal map into byte slice.
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tsdb.MarshalTags(tags)
}
}
// Ensure tags can be marshaled into a byte slice.
func TestMakeTagsKey(t *testing.T) {
for i, tt := range []struct {
keys []string
tags models.Tags
result []byte
}{
{
keys: nil,
tags: nil,
result: nil,
},
{
keys: []string{"foo"},
tags: models.NewTags(map[string]string{"foo": "bar"}),
result: []byte(`foo|bar`),
},
{
keys: []string{"foo"},
tags: models.NewTags(map[string]string{"baz": "battttt"}),
result: []byte(``),
},
{
keys: []string{"baz", "foo"},
tags: models.NewTags(map[string]string{"baz": "battttt"}),
result: []byte(`baz|battttt`),
},
{
keys: []string{"baz", "foo", "zzz"},
tags: models.NewTags(map[string]string{"foo": "bar"}),
result: []byte(`foo|bar`),
},
{
keys: []string{"baz", "foo"},
tags: models.NewTags(map[string]string{"foo": "bar", "baz": "battttt"}),
result: []byte(`baz|foo|battttt|bar`),
},
{
keys: []string{"baz"},
tags: models.NewTags(map[string]string{"baz": "battttt", "foo": "bar"}),
result: []byte(`baz|battttt`),
},
} {
result := tsdb.MakeTagsKey(tt.keys, tt.tags)
if !bytes.Equal(result, tt.result) {
t.Fatalf("%d. unexpected result: exp=%s, got=%s", i, tt.result, result)
}
}
}
func BenchmarkMakeTagsKey_KeyN1(b *testing.B) { benchmarkMakeTagsKey(b, 1) }
func BenchmarkMakeTagsKey_KeyN3(b *testing.B) { benchmarkMakeTagsKey(b, 3) }
func BenchmarkMakeTagsKey_KeyN5(b *testing.B) { benchmarkMakeTagsKey(b, 5) }
func BenchmarkMakeTagsKey_KeyN10(b *testing.B) { benchmarkMakeTagsKey(b, 10) }
func makeTagsAndKeys(keyN int) ([]string, models.Tags) {
const keySize, valueSize = 8, 15
// Generate tag map.
keys := make([]string, keyN)
tags := make(map[string]string)
for i := 0; i < keyN; i++ {
keys[i] = fmt.Sprintf("%0*d", keySize, i)
tags[keys[i]] = fmt.Sprintf("%0*d", valueSize, i)
}
return keys, models.NewTags(tags)
}
func benchmarkMakeTagsKey(b *testing.B, keyN int) {
keys, tags := makeTagsAndKeys(keyN)
// Unmarshal map into byte slice.
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tsdb.MakeTagsKey(keys, tags)
}
}
type TestSeries struct {
Measurement string
Key string
Tags models.Tags
Type models.FieldType
}
func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
measurements := genStrList("measurement", mCnt)
tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets()
series := make([]*TestSeries, 0, mCnt*len(tagSets))
for _, m := range measurements {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
Tags: models.NewTags(ts),
Type: models.Integer,
})
}
}
return series
}
type TagValGenerator struct {
Key string
Vals []string
idx int
}
func NewTagValGenerator(tagKey string, nVals int) *TagValGenerator {
tvg := &TagValGenerator{Key: tagKey, Vals: make([]string, 0, nVals)}
for i := 0; i < nVals; i++ {
tvg.Vals = append(tvg.Vals, fmt.Sprintf("tagValue%d", i))
}
return tvg
}
func (tvg *TagValGenerator) First() string {
tvg.idx = 0
return tvg.Curr()
}
func (tvg *TagValGenerator) Curr() string {
return tvg.Vals[tvg.idx]
}
func (tvg *TagValGenerator) Next() string {
tvg.idx++
if tvg.idx >= len(tvg.Vals) {
tvg.idx--
return ""
}
return tvg.Curr()
}
type TagSet map[string]string
type TagSetGenerator struct {
TagVals []*TagValGenerator
}
func NewTagSetGenerator(nSets int, nTagVals ...int) *TagSetGenerator {
tsg := &TagSetGenerator{TagVals: make([]*TagValGenerator, 0, nSets)}
for i := 0; i < nSets; i++ {
nVals := nTagVals[0]
if i < len(nTagVals) {
nVals = nTagVals[i]
}
tagKey := fmt.Sprintf("tagKey%d", i)
tsg.TagVals = append(tsg.TagVals, NewTagValGenerator(tagKey, nVals))
}
return tsg
}
func (tsg *TagSetGenerator) First() TagSet {
for _, tsv := range tsg.TagVals {
tsv.First()
}
return tsg.Curr()
}
func (tsg *TagSetGenerator) Curr() TagSet {
ts := TagSet{}
for _, tvg := range tsg.TagVals {
ts[tvg.Key] = tvg.Curr()
}
return ts
}
func (tsg *TagSetGenerator) Next() TagSet {
val := ""
for _, tsv := range tsg.TagVals {
if val = tsv.Next(); val != "" {
break
} else {
tsv.First()
}
}
if val == "" {
return nil
}
return tsg.Curr()
}
func (tsg *TagSetGenerator) AllSets() []TagSet {
allSets := []TagSet{}
for ts := tsg.First(); ts != nil; ts = tsg.Next() {
allSets = append(allSets, ts)
}
return allSets
}
func genStrList(prefix string, n int) []string {
lst := make([]string, 0, n)
for i := 0; i < n; i++ {
lst = append(lst, fmt.Sprintf("%s%d", prefix, i))
}
return lst
}

View File

@ -316,6 +316,7 @@ func (i *SeriesCollectionIterator) Next() bool {
// Helpers that return the current state of the iterator.
func (i SeriesCollectionIterator) Index() int { return i.index }
func (i SeriesCollectionIterator) Length() int { return i.length }
func (i SeriesCollectionIterator) Point() models.Point { return i.s.Points[i.index] }
func (i SeriesCollectionIterator) Key() []byte { return i.s.Keys[i.index] }
func (i SeriesCollectionIterator) SeriesKey() []byte { return i.s.SeriesKeys[i.index] }

View File

@ -0,0 +1,149 @@
package tsdb
import (
"reflect"
"testing"
"time"
"github.com/influxdata/platform/models"
)
func TestSeriesCollection(t *testing.T) {
// some helper functions. short names because local scope and frequently used.
var (
equal = reflect.DeepEqual
b = func(s string) []byte { return []byte(s) }
bs = func(s ...string) [][]byte {
out := make([][]byte, len(s))
for i := range s {
out[i] = b(s[i])
}
return out
}
assertEqual = func(t *testing.T, name string, got, wanted interface{}) {
t.Helper()
if !equal(got, wanted) {
t.Fatalf("bad %s: got: %v but wanted: %v", name, got, wanted)
}
}
)
t.Run("New", func(t *testing.T) {
points := []models.Point{
models.MustNewPoint("a", models.Tags{}, models.Fields{"f": 1.0}, time.Now()),
models.MustNewPoint("b", models.Tags{}, models.Fields{"b": true}, time.Now()),
models.MustNewPoint("c", models.Tags{}, models.Fields{"i": int64(1)}, time.Now()),
}
collection := NewSeriesCollection(points)
assertEqual(t, "length", collection.Length(), 3)
for iter := collection.Iterator(); iter.Next(); {
ipt, spt := iter.Point(), points[iter.Index()]
fi := spt.FieldIterator()
fi.Next()
assertEqual(t, "point", ipt, spt)
assertEqual(t, "key", iter.Key(), spt.Key())
assertEqual(t, "name", iter.Name(), spt.Name())
assertEqual(t, "tags", iter.Tags(), spt.Tags())
assertEqual(t, "type", iter.Type(), fi.Type())
}
})
t.Run("Copy", func(t *testing.T) {
collection := &SeriesCollection{
Keys: bs("ka", "kb", "kc"),
Names: bs("na", "nb", "nc"),
}
collection.Copy(0, 2)
assertEqual(t, "keys", collection.Keys, bs("kc", "kb", "kc"))
assertEqual(t, "names", collection.Names, bs("nc", "nb", "nc"))
collection.Copy(0, 4) // out of bounds
assertEqual(t, "keys", collection.Keys, bs("kc", "kb", "kc"))
assertEqual(t, "names", collection.Names, bs("nc", "nb", "nc"))
})
t.Run("Swap", func(t *testing.T) {
collection := &SeriesCollection{
Keys: bs("ka", "kb", "kc"),
Names: bs("na", "nb", "nc"),
}
collection.Swap(0, 2)
assertEqual(t, "keys", collection.Keys, bs("kc", "kb", "ka"))
assertEqual(t, "names", collection.Names, bs("nc", "nb", "na"))
collection.Swap(0, 4) // out of bounds
assertEqual(t, "keys", collection.Keys, bs("kc", "kb", "ka"))
assertEqual(t, "names", collection.Names, bs("nc", "nb", "na"))
})
t.Run("Truncate", func(t *testing.T) {
collection := &SeriesCollection{
Keys: bs("ka", "kb", "kc"),
Names: bs("na", "nb", "nc"),
}
collection.Truncate(1)
assertEqual(t, "keys", collection.Keys, bs("ka"))
assertEqual(t, "names", collection.Names, bs("na"))
collection.Truncate(0)
assertEqual(t, "keys", collection.Keys, bs())
assertEqual(t, "names", collection.Names, bs())
})
t.Run("Advance", func(t *testing.T) {
collection := &SeriesCollection{
Keys: bs("ka", "kb", "kc"),
Names: bs("na", "nb", "nc"),
}
collection.Advance(1)
assertEqual(t, "keys", collection.Keys, bs("kb", "kc"))
assertEqual(t, "names", collection.Names, bs("nb", "nc"))
collection.Advance(1)
assertEqual(t, "keys", collection.Keys, bs("kc"))
assertEqual(t, "names", collection.Names, bs("nc"))
})
t.Run("InvalidateAll", func(t *testing.T) {
collection := &SeriesCollection{Keys: bs("ka", "kb", "kc")}
collection.InvalidateAll("test reason")
assertEqual(t, "length", collection.Length(), 0)
assertEqual(t, "error", collection.PartialWriteError(), PartialWriteError{
Reason: "test reason",
Dropped: 3,
DroppedKeys: bs("ka", "kb", "kc"),
})
})
t.Run("Invalid", func(t *testing.T) {
collection := &SeriesCollection{Keys: bs("ka", "kb", "kc")}
// invalidate half the entries
for iter := collection.Iterator(); iter.Next(); {
if iter.Index()%2 == 0 {
iter.Invalid("test reason")
}
}
// nothing happens yet: all values are staged
assertEqual(t, "length", collection.Length(), 3)
// apply all of the invalid calls
collection.ApplyConcurrentDrops()
assertEqual(t, "length", collection.Length(), 1)
assertEqual(t, "error", collection.PartialWriteError(), PartialWriteError{
Reason: "test reason",
Dropped: 2,
DroppedKeys: bs("ka", "kc"),
})
})
}

View File

@ -382,6 +382,8 @@ func parseSeriesKey(data []byte, dst models.Tags) ([]byte, models.Tags) {
dst = dst[:cap(dst)] // Grow dst to use full capacity
if got, want := len(dst), tagN; got < want {
dst = append(dst, make(models.Tags, want-got)...)
} else if got > want {
dst = dst[:want]
}
dst = dst[:tagN]

208
tsdb/series_file_test.go Normal file
View File

@ -0,0 +1,208 @@
package tsdb_test
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"testing"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
)
func TestParseSeriesKeyInto(t *testing.T) {
name := []byte("cpu")
tags := models.NewTags(map[string]string{"region": "east", "server": "a"})
key := tsdb.AppendSeriesKey(nil, name, tags)
dst := make(models.Tags, 0)
gotName, gotTags := tsdb.ParseSeriesKeyInto(key, dst)
if !bytes.Equal(gotName, name) {
t.Fatalf("got %q, expected %q", gotName, name)
}
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
dst = make(models.Tags, 0, 5)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := cap(gotTags), 5; got != exp {
t.Fatalf("got tags capacity %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
dst = make(models.Tags, 1)
_, gotTags = tsdb.ParseSeriesKeyInto(key, dst)
if got, exp := len(gotTags), 2; got != exp {
t.Fatalf("got tags length %d, expected %d", got, exp)
} else if got, exp := gotTags, tags; !got.Equal(exp) {
t.Fatalf("got tags %v, expected %v", got, exp)
}
}
// Ensure series file contains the correct set of series.
func TestSeriesFile_Series(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()
series := []Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"}), Type: models.Integer},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"}), Type: models.Integer},
}
for _, s := range series {
collection := &tsdb.SeriesCollection{
Names: [][]byte{[]byte(s.Name)},
Tags: []models.Tags{s.Tags},
Types: []models.FieldType{s.Type},
}
if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
t.Fatal(err)
}
}
// Verify total number of series is correct.
if n := sfile.SeriesCount(); n != 3 {
t.Fatalf("unexpected series count: %d", n)
}
// Verify all series exist.
for i, s := range series {
if seriesID := sfile.SeriesID(s.Name, s.Tags, nil); seriesID.IsZero() {
t.Fatalf("series does not exist: i=%d", i)
}
}
// Verify non-existent series doesn't exist.
if sfile.HasSeries([]byte("foo"), models.NewTags(map[string]string{"region": "north"}), nil) {
t.Fatal("series should not exist")
}
}
// Ensure series file can be compacted.
func TestSeriesFileCompactor(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()
// Disable automatic compactions.
for _, p := range sfile.Partitions() {
p.CompactThreshold = 0
}
collection := new(tsdb.SeriesCollection)
for i := 0; i < 10000; i++ {
collection.Names = append(collection.Names, []byte(fmt.Sprintf("m%d", i)))
collection.Tags = append(collection.Tags, models.NewTags(map[string]string{"foo": "bar"}))
collection.Types = append(collection.Types, models.Integer)
}
if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
t.Fatal(err)
}
if err := collection.PartialWriteError(); err != nil {
t.Fatal(err)
}
// Verify total number of series is correct.
if n := sfile.SeriesCount(); n != uint64(len(collection.Names)) {
t.Fatalf("unexpected series count: %d", n)
}
// Compact in-place for each partition.
for _, p := range sfile.Partitions() {
compactor := tsdb.NewSeriesPartitionCompactor()
if err := compactor.Compact(p); err != nil {
t.Fatal(err)
}
}
// Verify all series exist.
for iter := collection.Iterator(); iter.Next(); {
if seriesID := sfile.SeriesID(iter.Name(), iter.Tags(), nil); seriesID.IsZero() {
t.Fatalf("series does not exist: %s,%s", iter.Name(), iter.Tags().String())
}
}
}
// Ensures that types are tracked and checked by the series file.
func TestSeriesFile_Type(t *testing.T) {
sfile := MustOpenSeriesFile()
defer sfile.Close()
// Add the series with some types.
collection := &tsdb.SeriesCollection{
Names: [][]byte{[]byte("a"), []byte("b"), []byte("c")},
Tags: []models.Tags{{}, {}, {}},
Types: []models.FieldType{models.Integer, models.Float, models.Boolean},
}
if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
t.Fatal(err)
}
// Attempt to add the series again but with different types.
collection = &tsdb.SeriesCollection{
Names: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d")},
Tags: []models.Tags{{}, {}, {}, {}},
Types: []models.FieldType{models.String, models.String, models.String, models.String},
}
if err := sfile.CreateSeriesListIfNotExists(collection); err != nil {
t.Fatal(err)
}
// All of the series except d should be dropped.
if err := collection.PartialWriteError(); err == nil {
t.Fatal("expected partial write error")
}
if collection.Length() != 1 {
t.Fatal("expected one series to remain in collection")
}
if got := string(collection.Names[0]); got != "d" {
t.Fatal("got invalid name on remaining series:", got)
}
}
// Series represents name/tagset pairs that are used in testing.
type Series struct {
Name []byte
Tags models.Tags
Type models.FieldType
Deleted bool
}
// SeriesFile is a test wrapper for tsdb.SeriesFile.
type SeriesFile struct {
*tsdb.SeriesFile
}
// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile() *SeriesFile {
dir, err := ioutil.TempDir("", "tsdb-series-file-")
if err != nil {
panic(err)
}
return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)}
}
// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error.
func MustOpenSeriesFile() *SeriesFile {
f := NewSeriesFile()
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}
return f
}
// Close closes the log file and removes it from disk.
func (f *SeriesFile) Close() error {
defer os.RemoveAll(f.Path())
return f.SeriesFile.Close()
}

31
tsdb/series_id_test.go Normal file
View File

@ -0,0 +1,31 @@
package tsdb
import (
"math/rand"
"testing"
"github.com/influxdata/platform/models"
)
func TestSeriesID(t *testing.T) {
types := []models.FieldType{
models.Integer,
models.Float,
models.Boolean,
models.String,
models.Unsigned,
}
for i := 0; i < 1000000; i++ {
id := NewSeriesID(uint64(rand.Int31()))
for _, typ := range types {
typed := id.WithType(typ)
if got := typed.Type(); got != typ {
t.Fatalf("wanted: %v got: %v", typ, got)
}
if got := typed.SeriesID(); id != got {
t.Fatalf("wanted: %016x got: %016x", id, got)
}
}
}
}

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/mmap"
"github.com/influxdata/platform/pkg/rhh"
)
@ -214,6 +215,27 @@ func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte)
}
}
func (idx *SeriesIndex) FindIDByNameTags(segments []*SeriesSegment, name []byte, tags models.Tags, buf []byte) SeriesIDTyped {
id := idx.FindIDBySeriesKey(segments, AppendSeriesKey(buf[:0], name, tags))
if _, ok := idx.tombstones[id.SeriesID()]; ok {
return SeriesIDTyped{}
}
return id
}
func (idx *SeriesIndex) FindIDListByNameTags(segments []*SeriesSegment, names [][]byte, tagsSlice []models.Tags, buf []byte) (ids []SeriesIDTyped, ok bool) {
ids, ok = make([]SeriesIDTyped, len(names)), true
for i := range names {
id := idx.FindIDByNameTags(segments, names[i], tagsSlice[i], buf)
if id.IsZero() {
ok = false
continue
}
ids[i] = id
}
return ids, ok
}
func (idx *SeriesIndex) FindOffsetByID(id SeriesID) int64 {
if offset := idx.idOffsetMap[id]; offset != 0 {
return offset

137
tsdb/series_index_test.go Normal file
View File

@ -0,0 +1,137 @@
package tsdb_test
import (
"bytes"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
)
func toTypedSeriesID(id uint64) tsdb.SeriesIDTyped {
return tsdb.NewSeriesID(id).WithType(models.Empty)
}
func TestSeriesIndex_Count(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index"))
if err := idx.Open(); err != nil {
t.Fatal(err)
}
defer idx.Close()
key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
idx.Insert(key0, toTypedSeriesID(1), 10)
key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil)
idx.Insert(key1, toTypedSeriesID(2), 20)
if n := idx.Count(); n != 2 {
t.Fatalf("unexpected count: %d", n)
}
}
func TestSeriesIndex_Delete(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index"))
if err := idx.Open(); err != nil {
t.Fatal(err)
}
defer idx.Close()
key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
idx.Insert(key0, toTypedSeriesID(1), 10)
key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil)
idx.Insert(key1, toTypedSeriesID(2), 20)
idx.Delete(tsdb.NewSeriesID(1))
if !idx.IsDeleted(tsdb.NewSeriesID(1)) {
t.Fatal("expected deletion")
} else if idx.IsDeleted(tsdb.NewSeriesID(2)) {
t.Fatal("expected series to exist")
}
}
func TestSeriesIndex_FindIDBySeriesKey(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index"))
if err := idx.Open(); err != nil {
t.Fatal(err)
}
defer idx.Close()
key0 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
idx.Insert(key0, toTypedSeriesID(1), 10)
key1 := tsdb.AppendSeriesKey(nil, []byte("m1"), nil)
idx.Insert(key1, toTypedSeriesID(2), 20)
badKey := tsdb.AppendSeriesKey(nil, []byte("not_found"), nil)
if id := idx.FindIDBySeriesKey(nil, key0); id != toTypedSeriesID(1) {
t.Fatalf("unexpected id(0): %d", id)
} else if id := idx.FindIDBySeriesKey(nil, key1); id != toTypedSeriesID(2) {
t.Fatalf("unexpected id(1): %d", id)
} else if id := idx.FindIDBySeriesKey(nil, badKey); !id.IsZero() {
t.Fatalf("unexpected id(2): %d", id)
}
if id := idx.FindIDByNameTags(nil, []byte("m0"), nil, nil); id != toTypedSeriesID(1) {
t.Fatalf("unexpected id(0): %d", id)
} else if id := idx.FindIDByNameTags(nil, []byte("m1"), nil, nil); id != toTypedSeriesID(2) {
t.Fatalf("unexpected id(1): %d", id)
} else if id := idx.FindIDByNameTags(nil, []byte("not_found"), nil, nil); !id.IsZero() {
t.Fatalf("unexpected id(2): %d", id)
}
}
func TestSeriesIndex_FindOffsetByID(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
idx := tsdb.NewSeriesIndex(filepath.Join(dir, "index"))
if err := idx.Open(); err != nil {
t.Fatal(err)
}
defer idx.Close()
idx.Insert(tsdb.AppendSeriesKey(nil, []byte("m0"), nil), toTypedSeriesID(1), 10)
idx.Insert(tsdb.AppendSeriesKey(nil, []byte("m1"), nil), toTypedSeriesID(2), 20)
if offset := idx.FindOffsetByID(tsdb.NewSeriesID(1)); offset != 10 {
t.Fatalf("unexpected offset(0): %d", offset)
} else if offset := idx.FindOffsetByID(tsdb.NewSeriesID(2)); offset != 20 {
t.Fatalf("unexpected offset(1): %d", offset)
} else if offset := idx.FindOffsetByID(tsdb.NewSeriesID(3)); offset != 0 {
t.Fatalf("unexpected offset(2): %d", offset)
}
}
func TestSeriesIndexHeader(t *testing.T) {
// Verify header initializes correctly.
hdr := tsdb.NewSeriesIndexHeader()
if hdr.Version != tsdb.SeriesIndexVersion {
t.Fatalf("unexpected version: %d", hdr.Version)
}
hdr.MaxSeriesID = tsdb.NewSeriesID(10)
hdr.MaxOffset = 20
hdr.Count = 30
hdr.Capacity = 40
hdr.KeyIDMap.Offset, hdr.KeyIDMap.Size = 50, 60
hdr.IDOffsetMap.Offset, hdr.IDOffsetMap.Size = 70, 80
// Marshal/unmarshal.
var buf bytes.Buffer
if _, err := hdr.WriteTo(&buf); err != nil {
t.Fatal(err)
} else if other, err := tsdb.ReadSeriesIndexHeader(buf.Bytes()); err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(hdr, other); diff != "" {
t.Fatal(diff)
}
}

258
tsdb/series_segment_test.go Normal file
View File

@ -0,0 +1,258 @@
package tsdb_test
import (
"bytes"
"os"
"path/filepath"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/platform/tsdb"
)
func TestSeriesSegment(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write initial entry.
key1 := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
offset, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(1), key1))
if err != nil {
t.Fatal(err)
} else if offset != tsdb.SeriesSegmentHeaderSize {
t.Fatalf("unexpected offset: %d", offset)
}
// Write a large entry (3mb).
key2 := tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("m"), 3*(1<<20)), nil)
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(2), key2)); err != nil {
t.Fatal(err)
} else if offset != tsdb.SeriesSegmentHeaderSize {
t.Fatalf("unexpected offset: %d", offset)
}
// Write another entry that is too large for the remaining segment space.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(3), tsdb.AppendSeriesKey(nil, bytes.Repeat([]byte("n"), 3*(1<<20)), nil))); err != tsdb.ErrSeriesSegmentNotWritable {
t.Fatalf("unexpected error: %s", err)
}
// Verify two entries exist.
var n int
segment.ForEachEntry(func(flag uint8, id tsdb.SeriesIDTyped, offset int64, key []byte) error {
switch n {
case 0:
if flag != tsdb.SeriesEntryInsertFlag || id != toTypedSeriesID(1) || !bytes.Equal(key1, key) {
t.Fatalf("unexpected entry(0): %d, %d, %q", flag, id, key)
}
case 1:
if flag != tsdb.SeriesEntryInsertFlag || id != toTypedSeriesID(2) || !bytes.Equal(key2, key) {
t.Fatalf("unexpected entry(1): %d, %d, %q", flag, id, key)
}
default:
t.Fatalf("too many entries")
}
n++
return nil
})
if n != 2 {
t.Fatalf("unexpected entry count: %d", n)
}
}
func TestSeriesSegment_AppendSeriesIDs(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(10), tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(11), tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
t.Fatal(err)
} else if err := segment.Flush(); err != nil {
t.Fatal(err)
}
// Collect series ids with existing set.
a := segment.AppendSeriesIDs(toSeriesIDs([]uint64{1, 2}))
if diff := cmp.Diff(a, toSeriesIDs([]uint64{1, 2, 10, 11})); diff != "" {
t.Fatal(diff)
}
}
func TestSeriesSegment_MaxSeriesID(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(10), tsdb.AppendSeriesKey(nil, []byte("m0"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(11), tsdb.AppendSeriesKey(nil, []byte("m1"), nil))); err != nil {
t.Fatal(err)
} else if err := segment.Flush(); err != nil {
t.Fatal(err)
}
// Verify maximum.
if max := segment.MaxSeriesID(); max != tsdb.NewSeriesID(11) {
t.Fatalf("unexpected max: %d", max)
}
}
func TestSeriesSegmentHeader(t *testing.T) {
// Verify header initializes correctly.
hdr := tsdb.NewSeriesSegmentHeader()
if hdr.Version != tsdb.SeriesSegmentVersion {
t.Fatalf("unexpected version: %d", hdr.Version)
}
// Marshal/unmarshal.
var buf bytes.Buffer
if _, err := hdr.WriteTo(&buf); err != nil {
t.Fatal(err)
} else if other, err := tsdb.ReadSeriesSegmentHeader(buf.Bytes()); err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(hdr, other); diff != "" {
t.Fatal(diff)
}
}
func TestSeriesSegment_PartialWrite(t *testing.T) {
dir, cleanup := MustTempDir()
defer cleanup()
// Create a new initial segment (4mb) and initialize for writing.
segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000"))
if err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
}
defer segment.Close()
// Write two entries.
if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(1), tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil {
t.Fatal(err)
} else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(2), tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil {
t.Fatal(err)
}
sz := segment.Size()
entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, toTypedSeriesID(2), tsdb.AppendSeriesKey(nil, []byte("B"), nil)))
// Close segment.
if err := segment.Close(); err != nil {
t.Fatal(err)
}
// Truncate at each point and reopen.
for i := entrySize; i > 0; i-- {
if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil {
t.Fatal(err)
}
segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000"))
if err := segment.Open(); err != nil {
t.Fatal(err)
} else if err := segment.InitForWrite(); err != nil {
t.Fatal(err)
} else if err := segment.Close(); err != nil {
t.Fatal(err)
}
}
}
func TestJoinSeriesOffset(t *testing.T) {
if offset := tsdb.JoinSeriesOffset(0x1234, 0x56789ABC); offset != 0x123456789ABC {
t.Fatalf("unexpected offset: %x", offset)
}
}
func TestSplitSeriesOffset(t *testing.T) {
if segmentID, pos := tsdb.SplitSeriesOffset(0x123456789ABC); segmentID != 0x1234 || pos != 0x56789ABC {
t.Fatalf("unexpected segmentID/pos: %x/%x", segmentID, pos)
}
}
func TestIsValidSeriesSegmentFilename(t *testing.T) {
if tsdb.IsValidSeriesSegmentFilename("") {
t.Fatal("expected invalid")
} else if tsdb.IsValidSeriesSegmentFilename("0ab") {
t.Fatal("expected invalid")
} else if !tsdb.IsValidSeriesSegmentFilename("192a") {
t.Fatal("expected valid")
}
}
func TestParseSeriesSegmentFilename(t *testing.T) {
if v, err := tsdb.ParseSeriesSegmentFilename("a90b"); err != nil {
t.Fatal(err)
} else if v != 0xA90B {
t.Fatalf("unexpected value: %x", v)
}
if v, err := tsdb.ParseSeriesSegmentFilename("0001"); err != nil {
t.Fatal(err)
} else if v != 1 {
t.Fatalf("unexpected value: %x", v)
}
if _, err := tsdb.ParseSeriesSegmentFilename("invalid"); err == nil {
t.Fatal("expected error")
}
}
func TestSeriesSegmentSize(t *testing.T) {
const mb = (1 << 20)
if sz := tsdb.SeriesSegmentSize(0); sz != 4*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(1); sz != 8*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(2); sz != 16*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(3); sz != 32*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(4); sz != 64*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(5); sz != 128*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(6); sz != 256*mb {
t.Fatalf("unexpected size: %d", sz)
} else if sz := tsdb.SeriesSegmentSize(7); sz != 256*mb {
t.Fatalf("unexpected size: %d", sz)
}
}
func TestSeriesEntry(t *testing.T) {
seriesKey := tsdb.AppendSeriesKey(nil, []byte("m0"), nil)
buf := tsdb.AppendSeriesEntry(nil, 1, toTypedSeriesID(2), seriesKey)
if flag, id, key, sz := tsdb.ReadSeriesEntry(buf); flag != 1 {
t.Fatalf("unexpected flag: %d", flag)
} else if id != toTypedSeriesID(2) {
t.Fatalf("unexpected id: %d", id)
} else if !bytes.Equal(seriesKey, key) {
t.Fatalf("unexpected key: %q", key)
} else if sz != int64(tsdb.SeriesEntryHeaderSize+len(key)) {
t.Fatalf("unexpected size: %d", sz)
}
}

View File

@ -5,7 +5,7 @@ import (
"sync"
"unsafe"
"github.com/RoaringBitmap/roaring"
"github.com/influxdata/roaring"
)
// SeriesIDSet represents a lockable bitmap of series ids.
@ -15,12 +15,12 @@ type SeriesIDSet struct {
}
// NewSeriesIDSet returns a new instance of SeriesIDSet.
func NewSeriesIDSet(a ...uint64) *SeriesIDSet {
func NewSeriesIDSet(a ...SeriesID) *SeriesIDSet {
ss := &SeriesIDSet{bitmap: roaring.NewBitmap()}
if len(a) > 0 {
a32 := make([]uint32, len(a))
for i := range a {
a32[i] = uint32(a[i])
a32[i] = uint32(a[i].RawID())
}
ss.bitmap.AddMany(a32)
}
@ -50,6 +50,23 @@ func (s *SeriesIDSet) AddNoLock(id SeriesID) {
s.bitmap.Add(uint32(id.RawID()))
}
// AddMany adds multiple ids to the SeriesIDSet. AddMany takes a lock, so may not be
// optimal to call many times with few ids.
func (s *SeriesIDSet) AddMany(ids ...SeriesID) {
if len(ids) == 0 {
return
}
a32 := make([]uint32, len(ids))
for i := range ids {
a32[i] = uint32(ids[i].RawID())
}
s.Lock()
defer s.Unlock()
s.bitmap.AddMany(a32)
}
// Contains returns true if the id exists in the set.
func (s *SeriesIDSet) Contains(id SeriesID) bool {
s.RLock()
@ -58,6 +75,16 @@ func (s *SeriesIDSet) Contains(id SeriesID) bool {
return x
}
// SetCOW sets the copy-on-write status of the underlying bitmap. When SetCOW(true)
// is called, modifications to the bitmap will result in copies of the relevant
// data structures being made, preventing consumers of clones of the bitmap from
// seeing those modifications.
func (s *SeriesIDSet) SetCOW(b bool) {
s.Lock()
defer s.Unlock()
s.bitmap.SetCopyOnWrite(b)
}
// ContainsNoLock returns true if the id exists in the set. ContainsNoLock is
// not safe for use from multiple goroutines. The caller must manage synchronization.
func (s *SeriesIDSet) ContainsNoLock(id SeriesID) bool {
@ -108,6 +135,19 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) {
s.Unlock()
}
// MergeInPlace merges other into s, modifying s in the process.
func (s *SeriesIDSet) MergeInPlace(other *SeriesIDSet) {
if s == other {
return
}
other.RLock()
s.Lock()
s.bitmap.Or(other.bitmap)
s.Unlock()
other.RUnlock()
}
// Equals returns true if other and s are the same set of ids.
func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
if s == other {
@ -219,6 +259,18 @@ func (s *SeriesIDSet) WriteTo(w io.Writer) (int64, error) {
return s.bitmap.WriteTo(w)
}
// Clear clears the underlying bitmap for re-use. Clear is safe for use by multiple goroutines.
func (s *SeriesIDSet) Clear() {
s.Lock()
defer s.Unlock()
s.ClearNoLock()
}
// ClearNoLock clears the underlying bitmap for re-use without taking a lock.
func (s *SeriesIDSet) ClearNoLock() {
s.bitmap.Clear()
}
// Slice returns a slice of series ids.
func (s *SeriesIDSet) Slice() []uint64 {
s.RLock()

612
tsdb/series_set_test.go Normal file
View File

@ -0,0 +1,612 @@
package tsdb
import (
"bytes"
"fmt"
"math"
"math/rand"
"testing"
)
func TestSeriesIDSet_AndNot(t *testing.T) {
examples := [][3][]uint64{
[3][]uint64{
{1, 10, 20, 30},
{10, 12, 13, 14, 20},
{1, 30},
},
[3][]uint64{
{},
{10},
{},
},
[3][]uint64{
{1, 10, 20, 30},
{1, 10, 20, 30},
{},
},
[3][]uint64{
{1, 10},
{1, 10, 100},
{},
},
[3][]uint64{
{1, 10},
{},
{1, 10},
},
}
for i, example := range examples {
t.Run(fmt.Sprint(i), func(t *testing.T) {
// Build sets.
a, b := NewSeriesIDSet(), NewSeriesIDSet()
for _, v := range example[0] {
a.Add(NewSeriesID(v))
}
for _, v := range example[1] {
b.Add(NewSeriesID(v))
}
expected := NewSeriesIDSet()
for _, v := range example[2] {
expected.Add(NewSeriesID(v))
}
got := a.AndNot(b)
if got.String() != expected.String() {
t.Fatalf("got %s, expected %s", got.String(), expected.String())
}
})
}
}
var resultBool bool
// Contains should be typically a constant time lookup. Example results on a laptop:
//
// BenchmarkSeriesIDSet_Contains/1-4 20000000 68.5 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/2-4 20000000 70.8 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/10-4 20000000 70.3 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/100-4 20000000 71.3 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/1000-4 20000000 80.5 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/10000-4 20000000 67.3 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/100000-4 20000000 73.1 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/1000000-4 20000000 77.3 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Contains/10000000-4 20000000 75.3 ns/op 0 B/op 0 allocs/op
func BenchmarkSeriesIDSet_Contains(b *testing.B) {
cardinalities := []uint64{1, 2, 10, 100, 1000, 10000, 100000, 1000000, 10000000}
for _, cardinality := range cardinalities {
// Setup...
set := NewSeriesIDSet()
for i := uint64(0); i < cardinality; i++ {
set.Add(NewSeriesID(i))
}
lookup := cardinality / 2
b.Run(fmt.Sprint(cardinality), func(b *testing.B) {
for i := 0; i < b.N; i++ {
resultBool = set.Contains(NewSeriesID(lookup))
}
})
}
}
var set *SeriesIDSet
// Adding to a larger bitset shouldn't be significantly more expensive than adding
// to a smaller one. This benchmark adds a value to different cardinality sets.
//
// Example results from a laptop:
// BenchmarkSeriesIDSet_Add/1-4 1000000 1053 ns/op 48 B/op 2 allocs/op
// BenchmarkSeriesIDSet_Add/2-4 5000000 303 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/10-4 5000000 348 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/100-4 5000000 373 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/1000-4 5000000 342 ns/op 0 B/op 0 allocs/op
//
//
func BenchmarkSeriesIDSet_AddMore(b *testing.B) {
cardinalities := []uint64{1, 2, 10, 100, 1000, 10000, 100000, 1000000, 10000000}
for _, cardinality := range cardinalities {
// Setup...
set = NewSeriesIDSet()
for i := uint64(0); i < cardinality-1; i++ {
set.Add(NewSeriesID(i))
}
b.Run(fmt.Sprint(cardinality), func(b *testing.B) {
for i := 0; i < b.N; i++ {
// Add next value
set.Add(NewSeriesID(cardinality))
b.StopTimer()
set.Remove(NewSeriesID(cardinality))
b.StartTimer()
}
})
}
}
// Add benchmarks the cost of adding the same element to a set versus the
// cost of checking if it exists before adding it.
//
// Typical benchmarks from a laptop:
//
// BenchmarkSeriesIDSet_Add/cardinality_1000000_add/same-8 20000000 64.8 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_add/random-8 2000000 704 ns/op 5 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_add/same_no_lock-8 50000000 40.3 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_add/random_no_lock-8 2000000 644 ns/op 5 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/same_no_lock-8 50000000 34.0 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/random_no_lock-8 2000000 860 ns/op 14 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/same_global_lock-8 30000000 49.8 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/random_global_lock-8 2000000 914 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/same_multi_lock-8 30000000 39.7 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Add/cardinality_1000000_check_add/random_multi_lock-8 1000000 1002 ns/op 0 B/op 0 allocs/op
//
func BenchmarkSeriesIDSet_Add(b *testing.B) {
// Setup...
set = NewSeriesIDSet()
for i := uint64(0); i < 1000000; i++ {
set.Add(NewSeriesID(i))
}
lookup := NewSeriesID(300032)
// Add the same value over and over.
b.Run(fmt.Sprint("cardinality_1000000_add"), func(b *testing.B) {
b.Run("same", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Add(lookup)
}
})
b.Run("random", func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
x := NewSeriesID(uint64(rand.Intn(math.MaxInt32)))
b.StartTimer()
set.Add(x)
}
})
b.Run("same no lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.AddNoLock(lookup)
}
})
b.Run("random no lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
x := NewSeriesID(uint64(rand.Intn(math.MaxInt32)))
b.StartTimer()
set.AddNoLock(x)
}
})
})
// Add the same value over and over with no lock
b.Run(fmt.Sprint("cardinality_1000000_check_add"), func(b *testing.B) {
b.Run("same no lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if !set.ContainsNoLock(lookup) {
set.AddNoLock(lookup)
}
}
})
b.Run("random no lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
x := NewSeriesID(uint64(rand.Intn(math.MaxInt32)))
b.StartTimer()
if !set.ContainsNoLock(x) {
set.AddNoLock(x)
}
}
})
b.Run("same global lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Lock()
if !set.ContainsNoLock(lookup) {
set.AddNoLock(lookup)
}
set.Unlock()
}
})
b.Run("random global lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
x := NewSeriesID(uint64(rand.Intn(math.MaxInt32)))
b.StartTimer()
set.Lock()
if !set.ContainsNoLock(x) {
set.AddNoLock(x)
}
set.Unlock()
}
})
b.Run("same multi lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if !set.Contains(lookup) {
set.Add(lookup)
}
}
})
b.Run("random multi lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
x := NewSeriesID(uint64(rand.Intn(math.MaxInt32)))
b.StartTimer()
if !set.Contains(x) {
set.Add(x)
}
}
})
})
}
var ssResult *SeriesIDSet
// Benchmark various ways of creating a copy of a bitmap. Note, Clone_COW will result
// in a bitmap where future modifications will involve copies.
//
// Typical results from an i7 laptop.
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/Clone-8 30000 44171 ns/op 47200 B/op 1737 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/Clone_COW-8 300000 4554 ns/op 17392 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/Merge-8 100000 17877 ns/op 39008 B/op 30 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/MergeInPlace-8 200000 7367 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/Add-8 10000 137460 ns/op 62336 B/op 2596 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/re-use/WriteTo-8 30000 52896 ns/op 35872 B/op 866 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/Clone-8 30000 41940 ns/op 47200 B/op 1737 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/Clone_COW-8 300000 4474 ns/op 17392 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/Merge-8 100000 17624 ns/op 39008 B/op 30 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/MergeInPlace-8 100000 17320 ns/op 38880 B/op 28 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/Add-8 10000 167544 ns/op 101216 B/op 2624 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000/don't_re-use/WriteTo-8 20000 66976 ns/op 52897 B/op 869 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/Clone-8 10000 179933 ns/op 177072 B/op 5895 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/Clone_COW-8 100000 18578 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/Merge-8 20000 77574 ns/op 210656 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/MergeInPlace-8 100000 23645 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/Add-8 2000 689254 ns/op 224161 B/op 9572 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/re-use/WriteTo-8 10000 199052 ns/op 118791 B/op 2945 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/Clone-8 10000 183137 ns/op 177073 B/op 5895 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/Clone_COW-8 100000 19341 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/Merge-8 20000 77502 ns/op 210656 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/MergeInPlace-8 20000 72610 ns/op 210528 B/op 40 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/Add-8 2000 724789 ns/op 434691 B/op 9612 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_10000/don't_re-use/WriteTo-8 10000 215734 ns/op 177159 B/op 2948 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/Clone-8 5000 244971 ns/op 377648 B/op 6111 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/Clone_COW-8 100000 19284 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/Merge-8 20000 90580 ns/op 210656 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/MergeInPlace-8 50000 24697 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/Add-8 500 3274456 ns/op 758996 B/op 19853 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/re-use/WriteTo-8 5000 248791 ns/op 122392 B/op 3053 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/Clone-8 5000 269152 ns/op 377648 B/op 6111 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/Clone_COW-8 100000 21428 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/Merge-8 20000 85948 ns/op 210657 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/MergeInPlace-8 20000 78142 ns/op 210528 B/op 40 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/Add-8 500 3123753 ns/op 969529 B/op 19893 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_100000/don't_re-use/WriteTo-8 10000 230657 ns/op 180684 B/op 3056 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/Clone-8 3000 551781 ns/op 2245424 B/op 6111 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/Clone_COW-8 100000 16162 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/Merge-8 20000 92104 ns/op 210656 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/MergeInPlace-8 50000 27408 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/Add-8 100 22573498 ns/op 6420446 B/op 30520 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/re-use/WriteTo-8 5000 284901 ns/op 123522 B/op 3053 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/Clone-8 3000 679284 ns/op 2245424 B/op 6111 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/Clone_COW-8 100000 16006 ns/op 58736 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/Merge-8 20000 68965 ns/op 210656 B/op 42 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/MergeInPlace-8 20000 64236 ns/op 210528 B/op 40 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/Add-8 100 21960668 ns/op 6630979 B/op 30560 allocs/op
// BenchmarkSeriesIDSet_Clone/cardinality_1000000/don't_re-use/WriteTo-8 5000 298276 ns/op 181890 B/op 3056 allocs/op
func BenchmarkSeriesIDSet_Clone(b *testing.B) {
toAddCardinalities := []int{1e3, 1e4, 1e5, 1e6}
runBenchmarks := func(b *testing.B, other *SeriesIDSet, init func() *SeriesIDSet) {
b.Run("Clone", func(b *testing.B) {
for i := 0; i < b.N; i++ {
ssResult = other.Clone()
}
})
b.Run("Clone_COW", func(b *testing.B) {
other.SetCOW(true)
for i := 0; i < b.N; i++ {
ssResult = other.Clone()
}
other.SetCOW(false)
})
b.Run("Merge", func(b *testing.B) {
ssResult = init()
for i := 0; i < b.N; i++ {
ssResult.Merge(other)
b.StopTimer()
ssResult = init()
b.StartTimer()
}
})
b.Run("MergeInPlace", func(b *testing.B) {
ssResult = init()
for i := 0; i < b.N; i++ {
ssResult.MergeInPlace(other)
b.StopTimer()
ssResult = init()
b.StartTimer()
}
})
b.Run("Add", func(b *testing.B) {
ssResult = init()
for i := 0; i < b.N; i++ {
itr := other.Iterator()
ssResult.Lock()
for itr.HasNext() {
ssResult.AddNoLock(NewSeriesID(uint64(itr.Next())))
}
ssResult.Unlock()
b.StopTimer()
ssResult = init()
b.StartTimer()
}
})
b.Run("WriteTo", func(b *testing.B) {
var buf bytes.Buffer
ssResult = init()
for i := 0; i < b.N; i++ {
other.WriteTo(&buf)
ssResult.UnmarshalBinaryUnsafe(buf.Bytes())
b.StopTimer()
ssResult = init()
buf.Reset()
b.StartTimer()
}
})
}
for _, toAddCardinality := range toAddCardinalities {
b.Run(fmt.Sprintf("cardinality %d", toAddCardinality), func(b *testing.B) {
ids := make([]SeriesID, 0, toAddCardinality)
for i := 0; i < toAddCardinality; i++ {
ids = append(ids, NewSeriesID(uint64(rand.Intn(200000000))))
}
other := NewSeriesIDSet(ids...)
b.Run("re-use", func(b *testing.B) {
base := NewSeriesIDSet()
runBenchmarks(b, other, func() *SeriesIDSet {
base.Clear()
return base
})
})
b.Run("don't re-use", func(b *testing.B) {
runBenchmarks(b, other, func() *SeriesIDSet {
return NewSeriesIDSet()
})
})
})
}
}
func BenchmarkSeriesIDSet_AddMany(b *testing.B) {
cardinalities := []int{1, 1e3, 1e4, 1e5, 1e6}
toAddCardinalities := []int{1e3, 1e4, 1e5}
for _, cardinality := range cardinalities {
ids := make([]SeriesID, 0, cardinality)
for i := 0; i < cardinality; i++ {
ids = append(ids, NewSeriesID(uint64(rand.Intn(200000000))))
}
// Setup...
set = NewSeriesIDSet(ids...)
// Check if the value exists before adding it under two locks.
b.Run(fmt.Sprintf("cardinality %d", cardinality), func(b *testing.B) {
for _, toAddCardinality := range toAddCardinalities {
ids := make([]SeriesID, 0, toAddCardinality)
for i := 0; i < toAddCardinality; i++ {
ids = append(ids, NewSeriesID(uint64(rand.Intn(200000000))))
}
b.Run(fmt.Sprintf("adding %d", toAddCardinality), func(b *testing.B) {
b.Run("AddNoLock", func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
for _, id := range ids {
clone.AddNoLock(id)
}
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
b.Run("AddMany", func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
clone.AddMany(ids...)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
// Merge will involve a new bitmap being allocated.
b.Run("Merge", func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
other := NewSeriesIDSet(ids...)
clone.Merge(other)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
b.Run("MergeInPlace", func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
other := NewSeriesIDSet(ids...)
clone.MergeInPlace(other)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
})
}
})
}
}
// Remove benchmarks the cost of removing the same element in a set versus the
// cost of checking if it exists before removing it.
//
// Typical benchmarks from a laptop:
//
// BenchmarkSeriesIDSet_Remove/cardinality_1000000_remove_same-4 20000000 99.1 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Remove/cardinality_1000000_check_remove_global_lock-4 20000000 57.7 ns/op 0 B/op 0 allocs/op
// BenchmarkSeriesIDSet_Remove/cardinality_1000000_check_remove_multi_lock-4 20000000 80.1 ns/op 0 B/op 0 allocs/op
//
func BenchmarkSeriesIDSet_Remove(b *testing.B) {
// Setup...
set = NewSeriesIDSet()
for i := uint64(0); i < 1000000; i++ {
set.Add(NewSeriesID(i))
}
lookup := uint64(300032)
// Remove the same value over and over.
b.Run(fmt.Sprint("cardinality_1000000_remove_same"), func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Remove(NewSeriesID(lookup))
}
})
// Check if the value exists before adding it. Subsequent repeats of the code
// will result in contains checks.
b.Run(fmt.Sprint("cardinality_1000000_check_remove_global_lock"), func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Lock()
if set.ContainsNoLock(NewSeriesID(lookup)) {
set.RemoveNoLock(NewSeriesID(lookup))
}
set.Unlock()
}
})
// Check if the value exists before adding it under two locks.
b.Run(fmt.Sprint("cardinality_1000000_check_remove_multi_lock"), func(b *testing.B) {
for i := 0; i < b.N; i++ {
if set.Contains(NewSeriesID(lookup)) {
set.Remove(NewSeriesID(lookup))
}
}
})
}
// Typical benchmarks for a laptop:
//
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1/shards_1-4 200000 8095 ns/op 16656 B/op 11 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1/shards_10-4 200000 11755 ns/op 18032 B/op 47 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1/shards_100-4 50000 41632 ns/op 31794 B/op 407 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000/shards_1-4 200000 6022 ns/op 8384 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000/shards_10-4 100000 19674 ns/op 9760 B/op 43 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000/shards_100-4 10000 152865 ns/op 23522 B/op 403 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1000000/shards_1-4 200000 8252 ns/op 9712 B/op 44 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1000000/shards_10-4 50000 29566 ns/op 15984 B/op 143 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1000000/shards_100-4 10000 237672 ns/op 78710 B/op 1133 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000000/shards_1-4 100000 21559 ns/op 25968 B/op 330 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000000/shards_10-4 20000 102326 ns/op 114325 B/op 537 allocs/op
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_10000000/shards_100-4 2000 1042697 ns/op 997909 B/op 2608 allocs/op
func BenchmarkSeriesIDSet_Merge_Duplicates(b *testing.B) {
cardinalities := []int{1, 10000, 1000000, 10000000}
shards := []int{1, 10, 100}
for _, cardinality := range cardinalities {
set = NewSeriesIDSet()
for i := 0; i < cardinality; i++ {
set.Add(NewSeriesID(uint64(i)))
}
for _, shard := range shards {
others := make([]*SeriesIDSet, 0, shard)
for s := 0; s < shard; s++ {
others = append(others, &SeriesIDSet{bitmap: set.bitmap.Clone()})
}
b.Run(fmt.Sprintf("cardinality_%d/shards_%d", cardinality, shard), func(b *testing.B) {
base := &SeriesIDSet{bitmap: set.bitmap.Clone()}
for i := 0; i < b.N; i++ {
base.Merge(others...)
b.StopTimer()
base.bitmap = set.bitmap.Clone()
b.StartTimer()
}
})
}
}
}
// Typical benchmarks for a laptop:
//
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1/shards_1-4 200000 7841 ns/op 16656 B/op 11 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1/shards_10-4 200000 13093 ns/op 18048 B/op 47 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1/shards_100-4 30000 57399 ns/op 31985 B/op 407 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000/shards_1-4 200000 7740 ns/op 8384 B/op 7 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000/shards_10-4 50000 37116 ns/op 18208 B/op 52 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000/shards_100-4 5000 409487 ns/op 210563 B/op 955 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1000000/shards_1-4 100000 19289 ns/op 19328 B/op 79 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1000000/shards_10-4 10000 129048 ns/op 159716 B/op 556 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_1000000/shards_100-4 500 3482907 ns/op 5428116 B/op 6174 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000000/shards_1-4 30000 43734 ns/op 51872 B/op 641 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000000/shards_10-4 3000 514412 ns/op 748678 B/op 3687 allocs/op
// BenchmarkSeriesIDSet_Merge_Unique/cardinality_10000000/shards_100-4 30 61891687 ns/op 69626539 B/op 36038 allocs/op
func BenchmarkSeriesIDSet_Merge_Unique(b *testing.B) {
cardinalities := []int{1, 10000, 1000000, 10000000}
shards := []int{1, 10, 100}
for _, cardinality := range cardinalities {
set = NewSeriesIDSet()
for i := 0; i < cardinality; i++ {
set.Add(NewSeriesID(uint64(i)))
}
for _, shard := range shards {
others := make([]*SeriesIDSet, 0, shard)
for s := 1; s <= shard; s++ {
other := NewSeriesIDSet()
for i := 0; i < cardinality; i++ {
other.Add(NewSeriesID(uint64(i + (s * cardinality))))
}
others = append(others, other)
}
b.Run(fmt.Sprintf("cardinality_%d/shards_%d", cardinality, shard), func(b *testing.B) {
base := &SeriesIDSet{bitmap: set.bitmap.Clone()}
for i := 0; i < b.N; i++ {
base.Merge(others...)
b.StopTimer()
base.bitmap = set.bitmap.Clone()
b.StartTimer()
}
})
}
}
}

268
tsdb/shard_internal_test.go Normal file
View File

@ -0,0 +1,268 @@
package tsdb
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
)
func TestShard_MapType(t *testing.T) {
var sh *TempShard
setup := func(index string) {
sh = NewTempShard(index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
sh.MustWritePointsString(`
cpu,host=serverA,region=uswest value=100 0
cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
mem,host=serverA value=25i 0
mem,host=serverB value=50i,val3=t 10
_reserved,region=uswest value="foo" 0
`)
}
for _, index := range RegisteredIndexes() {
setup(index)
for _, tt := range []struct {
measurement string
field string
typ influxql.DataType
}{
{
measurement: "cpu",
field: "value",
typ: influxql.Float,
},
{
measurement: "cpu",
field: "host",
typ: influxql.Tag,
},
{
measurement: "cpu",
field: "region",
typ: influxql.Tag,
},
{
measurement: "cpu",
field: "val2",
typ: influxql.Float,
},
{
measurement: "cpu",
field: "unknown",
typ: influxql.Unknown,
},
{
measurement: "mem",
field: "value",
typ: influxql.Integer,
},
{
measurement: "mem",
field: "val3",
typ: influxql.Boolean,
},
{
measurement: "mem",
field: "host",
typ: influxql.Tag,
},
{
measurement: "unknown",
field: "unknown",
typ: influxql.Unknown,
},
{
measurement: "_fieldKeys",
field: "fieldKey",
typ: influxql.String,
},
{
measurement: "_fieldKeys",
field: "fieldType",
typ: influxql.String,
},
{
measurement: "_fieldKeys",
field: "unknown",
typ: influxql.Unknown,
},
{
measurement: "_series",
field: "key",
typ: influxql.String,
},
{
measurement: "_series",
field: "unknown",
typ: influxql.Unknown,
},
{
measurement: "_tagKeys",
field: "tagKey",
typ: influxql.String,
},
{
measurement: "_tagKeys",
field: "unknown",
typ: influxql.Unknown,
},
{
measurement: "_reserved",
field: "value",
typ: influxql.String,
},
{
measurement: "_reserved",
field: "region",
typ: influxql.Tag,
},
} {
name := fmt.Sprintf("%s_%s_%s", index, tt.measurement, tt.field)
t.Run(name, func(t *testing.T) {
typ, err := sh.mapType(tt.measurement, tt.field)
if err != nil {
t.Fatal(err)
}
if have, want := typ, tt.typ; have != want {
t.Errorf("unexpected data type: have=%#v want=%#v", have, want)
}
})
}
sh.Close()
}
}
func TestShard_MeasurementsByRegex(t *testing.T) {
var sh *TempShard
setup := func(index string) {
sh = NewTempShard(index)
if err := sh.Open(); err != nil {
t.Fatal(err)
}
sh.MustWritePointsString(`
cpu,host=serverA,region=uswest value=100 0
cpu,host=serverA,region=uswest value=50,val2=5 10
cpu,host=serverB,region=uswest value=25 0
mem,host=serverA value=25i 0
mem,host=serverB value=50i,val3=t 10
`)
}
for _, index := range RegisteredIndexes() {
setup(index)
for _, tt := range []struct {
regex string
measurements []string
}{
{regex: `cpu`, measurements: []string{"cpu"}},
{regex: `mem`, measurements: []string{"mem"}},
{regex: `cpu|mem`, measurements: []string{"cpu", "mem"}},
{regex: `gpu`, measurements: []string{}},
{regex: `pu`, measurements: []string{"cpu"}},
{regex: `p|m`, measurements: []string{"cpu", "mem"}},
} {
t.Run(index+"_"+tt.regex, func(t *testing.T) {
re := regexp.MustCompile(tt.regex)
measurements, err := sh.MeasurementNamesByRegex(re)
if err != nil {
t.Fatal(err)
}
mstrings := make([]string, 0, len(measurements))
for _, name := range measurements {
mstrings = append(mstrings, string(name))
}
sort.Strings(mstrings)
if diff := cmp.Diff(tt.measurements, mstrings, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("unexpected measurements:\n%s", diff)
}
})
}
sh.Close()
}
}
// TempShard represents a test wrapper for Shard that uses temporary
// filesystem paths.
type TempShard struct {
*Shard
path string
sfile *SeriesFile
}
// NewTempShard returns a new instance of TempShard with temp paths.
func NewTempShard(index string) *TempShard {
// Create temporary path for data and WAL.
dir, err := ioutil.TempDir("", "influxdb-tsdb-")
if err != nil {
panic(err)
}
// Create series file.
sfile := NewSeriesFile(filepath.Join(dir, "db0", SeriesFileDirectory))
sfile.Logger = logger.New(os.Stdout)
if err := sfile.Open(); err != nil {
panic(err)
}
// Build engine options.
opt := NewEngineOptions()
opt.IndexVersion = index
opt.Config.WALDir = filepath.Join(dir, "wal")
if index == InmemIndexName {
opt.InmemIndex, _ = NewInmemIndex(path.Base(dir), sfile)
}
return &TempShard{
Shard: NewShard(0,
filepath.Join(dir, "data", "db0", "rp0", "1"),
filepath.Join(dir, "wal", "db0", "rp0", "1"),
sfile,
opt,
),
sfile: sfile,
path: dir,
}
}
// Close closes the shard and removes all underlying data.
func (sh *TempShard) Close() error {
defer os.RemoveAll(sh.path)
sh.sfile.Close()
return sh.Shard.Close()
}
// MustWritePointsString parses the line protocol (with second precision) and
// inserts the resulting points into the shard. Panic on error.
func (sh *TempShard) MustWritePointsString(s string) {
a, err := models.ParsePointsWithPrecision([]byte(strings.TrimSpace(s)), time.Time{}, "s")
if err != nil {
panic(err)
}
if err := sh.WritePoints(a); err != nil {
panic(err)
}
}

2193
tsdb/shard_test.go Normal file

File diff suppressed because it is too large Load Diff

BIN
tsdb/testdata/line-protocol-1M.txt.gz vendored Normal file

Binary file not shown.

View File

@ -26,7 +26,6 @@ import (
intar "github.com/influxdata/influxdb/pkg/tar"
"github.com/influxdata/influxdb/pkg/tracing"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/bytesutil"
@ -752,9 +751,8 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
// Save reference to index for iterator creation.
e.index = index
// If we have the cached fields index on disk and we're using TSI, we
// can skip scanning all the TSM files.
if e.index.Type() != inmem.IndexName && !e.fieldset.IsEmpty() {
// If we have the cached fields index on disk and can skip scanning all the TSM files.
if !e.fieldset.IsEmpty() {
return nil
}
@ -1197,15 +1195,8 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
collection.Types = append(collection.Types, fieldTypeFromDataType(fieldTypes[i]))
}
// Build in-memory index, if necessary.
if e.index.Type() == inmem.IndexName {
if err := e.index.InitializeSeries(collection); err != nil {
return err
}
} else {
if err := e.index.CreateSeriesListIfNotExists(collection); err != nil {
return err
}
if err := e.index.CreateSeriesListIfNotExists(collection); err != nil {
return err
}
return nil