From 67464238ed6bc376261a789200777ab078e90baa Mon Sep 17 00:00:00 2001 From: David Norton Date: Mon, 1 Jun 2015 17:59:43 -0400 Subject: [PATCH 1/4] add benchmarks for building in-mem series index --- tsdb/meta_test.go | 149 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index 95f8044c94..ff4aa643e9 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -137,6 +137,155 @@ func benchmarkMarshalTags(b *testing.B, keyN int) { } } +func BenchmarkCreateSeriesIndex_1K(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3)) +} + +func BenchmarkCreateSeriesIndex_100K(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5)) +} + +func BenchmarkCreateSeriesIndex_1M(b *testing.B) { + benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5)) +} + +func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { + idxs := make([]*DatabaseIndex, 0, b.N) + for i := 0; i < b.N; i++ { + idxs = append(idxs, NewDatabaseIndex()) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + idx := idxs[n] + for _, s := range series { + idx.createSeriesIndexIfNotExists(s.Measurement, s.Series) + } + } +} + +type TestSeries struct { + Measurement string + Series *Series +} + +func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries { + measurements := genStrList("measurement", mCnt) + tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets() + series := []*TestSeries{} + for _, m := range measurements { + for _, ts := range tagSets { + series = append(series, &TestSeries{ + Measurement: m, + Series: &Series{ + Key: fmt.Sprintf("%s:%s", m, string(marshalTags(ts))), + Tags: ts, + }, + }) + } + } + return series +} + +type TagValGenerator struct { + Key string + Vals []string + idx int +} + +func NewTagValGenerator(tagKey string, nVals int) *TagValGenerator { + tvg := &TagValGenerator{Key: tagKey} + for i := 0; i < nVals; i++ { + tvg.Vals = append(tvg.Vals, fmt.Sprintf("tagValue%d", i)) + } + return tvg +} + +func (tvg *TagValGenerator) First() string { + tvg.idx = 0 + return tvg.Curr() +} + +func (tvg *TagValGenerator) Curr() string { + return tvg.Vals[tvg.idx] +} + +func (tvg *TagValGenerator) Next() string { + tvg.idx++ + if tvg.idx >= len(tvg.Vals) { + tvg.idx-- + return "" + } + return tvg.Curr() +} + +type TagSet map[string]string + +type TagSetGenerator struct { + TagVals []*TagValGenerator +} + +func NewTagSetGenerator(nSets int, nTagVals ...int) *TagSetGenerator { + tsg := &TagSetGenerator{} + for i := 0; i < nSets; i++ { + nVals := nTagVals[0] + if i < len(nTagVals) { + nVals = nTagVals[i] + } + tagKey := fmt.Sprintf("tagKey%d", i) + tsg.TagVals = append(tsg.TagVals, NewTagValGenerator(tagKey, nVals)) + } + return tsg +} + +func (tsg *TagSetGenerator) First() TagSet { + for _, tsv := range tsg.TagVals { + tsv.First() + } + return tsg.Curr() +} + +func (tsg *TagSetGenerator) Curr() TagSet { + ts := TagSet{} + for _, tvg := range tsg.TagVals { + ts[tvg.Key] = tvg.Curr() + } + return ts +} + +func (tsg *TagSetGenerator) Next() TagSet { + val := "" + for _, tsv := range tsg.TagVals { + if val = tsv.Next(); val != "" { + break + } else { + tsv.First() + } + } + + if val == "" { + return nil + } + + return tsg.Curr() +} + +func (tsg *TagSetGenerator) AllSets() []TagSet { + allSets := []TagSet{} + for ts := tsg.First(); ts != nil; ts = tsg.Next() { + allSets = append(allSets, ts) + } + return allSets +} + +func genStrList(prefix string, n int) []string { + lst := make([]string, 0, n) + for i := 0; i < n; i++ { + lst = append(lst, fmt.Sprintf("%s%d", prefix, i)) + } + return lst +} + // MustParseExpr parses an expression string and returns its AST representation. func MustParseExpr(s string) influxql.Expr { expr, err := influxql.ParseExpr(s) From 97c84a6d4f30ab52b1d6767aeaa6754658c59608 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 2 Jun 2015 16:57:39 -0400 Subject: [PATCH 2/4] add benchmark tests for shard WritePoints --- tsdb/shard_test.go | 132 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 5 deletions(-) diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index f4283e377d..9faf3aadb3 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -3,18 +3,19 @@ package tsdb import ( "io/ioutil" "os" + "path" "reflect" "testing" "time" ) func TestShardWriteAndIndex(t *testing.T) { - path, _ := ioutil.TempDir("", "") - defer os.RemoveAll(path) - path += "/shard" + tmpDir, _ := ioutil.TempDir("", "") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") index := NewDatabaseIndex() - sh := NewShard(index, path) + sh := NewShard(index, tmpShard) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -59,7 +60,7 @@ func TestShardWriteAndIndex(t *testing.T) { sh.Close() index = NewDatabaseIndex() - sh = NewShard(index, path) + sh = NewShard(index, tmpShard) if err := sh.Open(); err != nil { t.Fatalf("error openeing shard: %s", err.Error()) } @@ -73,3 +74,124 @@ func TestShardWriteAndIndex(t *testing.T) { t.Fatalf(err.Error()) } } + +func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) } +func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) } +func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) } +func BenchmarkWritePoints_NewSeries_500K(b *testing.B) { benchmarkWritePoints(b, 160, 5, 5, 1) } +func BenchmarkWritePoints_NewSeries_1M(b *testing.B) { benchmarkWritePoints(b, 320, 5, 5, 1) } + +func BenchmarkWritePoints_ExistingSeries_1K(b *testing.B) { + benchmarkWritePointsExistingSeries(b, 38, 3, 3, 1) +} +func BenchmarkWritePoints_ExistingSeries_100K(b *testing.B) { + benchmarkWritePointsExistingSeries(b, 32, 5, 5, 1) +} +func BenchmarkWritePoints_ExistingSeries_250K(b *testing.B) { + benchmarkWritePointsExistingSeries(b, 80, 5, 5, 1) +} +func BenchmarkWritePoints_ExistingSeries_500K(b *testing.B) { + benchmarkWritePointsExistingSeries(b, 160, 5, 5, 1) +} +func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) { + benchmarkWritePointsExistingSeries(b, 320, 5, 5, 1) +} + +func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { + // Generate test series (measurements + unique tag sets). + series := genTestSeries(mCnt, tkCnt, tvCnt) + // Load the in-memory series index. + index := NewDatabaseIndex() + for _, s := range series { + index.createSeriesIndexIfNotExists(s.Measurement, s.Series) + } + // Generate point data to write to the shard. + points := []Point{} + for _, s := range series { + for val := 0.0; val < float64(pntCnt); val++ { + p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now()) + points = append(points, p) + } + } + + // Stop & reset timers and mem-stats before the main benchmark loop. + b.StopTimer() + b.ResetTimer() + + // Run the benchmark loop. + for n := 0; n < b.N; n++ { + tmpDir, _ := ioutil.TempDir("", "") + tmpShard := path.Join(tmpDir, "shard") + shard := NewShard(index, tmpShard) + shard.Open() + + b.StartTimer() + // Call the function being benchmarked. + chunkedWrite(shard, points) + + b.StopTimer() + shard.Close() + os.RemoveAll(tmpDir) + } +} + +func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { + // Generate test series (measurements + unique tag sets). + series := genTestSeries(mCnt, tkCnt, tvCnt) + // Load the in-memory series index. + index := NewDatabaseIndex() + for _, s := range series { + index.createSeriesIndexIfNotExists(s.Measurement, s.Series) + } + // Generate point data to write to the shard. + points := []Point{} + for _, s := range series { + for val := 0.0; val < float64(pntCnt); val++ { + p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now()) + points = append(points, p) + } + } + + tmpDir, _ := ioutil.TempDir("", "") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") + shard := NewShard(index, tmpShard) + shard.Open() + defer shard.Close() + chunkedWrite(shard, points) + + // Reset timers and mem-stats before the main benchmark loop. + b.ResetTimer() + + // Run the benchmark loop. + for n := 0; n < b.N; n++ { + b.StopTimer() + for _, p := range points { + p.SetTime(p.Time().Add(time.Second)) + } + + b.StartTimer() + // Call the function being benchmarked. + chunkedWrite(shard, points) + } +} + +func chunkedWrite(shard *Shard, points []Point) { + nPts := len(points) + chunkSz := 10000 + start := 0 + end := chunkSz + + for { + if end > nPts { + end = nPts + } + if end-start == 0 { + break + } + + shard.WritePoints(points[start:end]) + start = end + end += chunkSz + } +} From 31bb8e70a9d687d6dc5cf81fcb671bd3cc020570 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 2 Jun 2015 17:17:31 -0400 Subject: [PATCH 3/4] don't build index before benchmarking WritePoints --- tsdb/shard_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9faf3aadb3..332839bdd2 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -100,11 +100,8 @@ func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) { func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) - // Load the in-memory series index. + // Create index for the shard to use. index := NewDatabaseIndex() - for _, s := range series { - index.createSeriesIndexIfNotExists(s.Measurement, s.Series) - } // Generate point data to write to the shard. points := []Point{} for _, s := range series { @@ -138,11 +135,8 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) - // Load the in-memory series index. + // Create index for the shard to use. index := NewDatabaseIndex() - for _, s := range series { - index.createSeriesIndexIfNotExists(s.Measurement, s.Series) - } // Generate point data to write to the shard. points := []Point{} for _, s := range series { From 938ad2ef85d056feb862506b93d1ac3e80ea5095 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 3 Jun 2015 10:09:50 -0400 Subject: [PATCH 4/4] add Store Open benchmark test --- tsdb/shard_test.go | 14 ++++++++-- tsdb/store_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 332839bdd2..829652b8b0 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -10,7 +10,7 @@ import ( ) func TestShardWriteAndIndex(t *testing.T) { - tmpDir, _ := ioutil.TempDir("", "") + tmpDir, _ := ioutil.TempDir("", "shard_test") defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") @@ -97,6 +97,11 @@ func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) { benchmarkWritePointsExistingSeries(b, 320, 5, 5, 1) } +// benchmarkWritePoints benchmarks writing new series to a shard. +// mCnt - measurmeent count +// tkCnt - tag key count +// tvCnt - tag value count (values per tag) +// pntCnt - points per series. # of series = mCnt * (tvCnt ^ tkCnt) func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) @@ -117,7 +122,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Run the benchmark loop. for n := 0; n < b.N; n++ { - tmpDir, _ := ioutil.TempDir("", "") + tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") shard := NewShard(index, tmpShard) shard.Open() @@ -132,6 +137,11 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { } } +// benchmarkWritePointsExistingSeries benchmarks writing to existing series in a shard. +// mCnt - measurmeent count +// tkCnt - tag key count +// tvCnt - tag value count (values per tag) +// pntCnt - points per series. # of series = mCnt * (tvCnt ^ tkCnt) func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { // Generate test series (measurements + unique tag sets). series := genTestSeries(mCnt, tkCnt, tvCnt) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 0f4efd1172..ae4d322efe 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "testing" + "time" ) func TestStoreOpen(t *testing.T) { @@ -160,3 +161,71 @@ func TestStoreOpenShardBadShardPath(t *testing.T) { } } + +func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) } + +func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) { + // Generate test series (measurements + unique tag sets). + series := genTestSeries(mCnt, tkCnt, tvCnt) + // Generate point data to write to the shards. + points := []Point{} + for _, s := range series { + for val := 0.0; val < float64(pntCnt); val++ { + p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now()) + points = append(points, p) + } + } + // Create a temporary directory for the test data. + dir, _ := ioutil.TempDir("", "store_test") + // Create the store. + store := NewStore(dir) + // Open the store. + if err := store.Open(); err != nil { + b.Fatalf("benchmarkStoreOpen: %s", err) + } + // Create requested number of shards in the store & write points. + for shardID := 0; shardID < shardCnt; shardID++ { + if err := store.CreateShard("mydb", "myrp", uint64(shardID)); err != nil { + b.Fatalf("benchmarkStoreOpen: %s", err) + } + // Write points to the shard. + chunkedWriteStoreShard(store, shardID, points) + } + // Close the store. + if err := store.Close(); err != nil { + b.Fatalf("benchmarkStoreOpen: %s", err) + } + + // Run the benchmark loop. + b.ResetTimer() + for n := 0; n < b.N; n++ { + store := NewStore(dir) + if err := store.Open(); err != nil { + b.Fatalf("benchmarkStoreOpen: %s", err) + } + + b.StopTimer() + store.Close() + b.StartTimer() + } +} + +func chunkedWriteStoreShard(store *Store, shardID int, points []Point) { + nPts := len(points) + chunkSz := 10000 + start := 0 + end := chunkSz + + for { + if end > nPts { + end = nPts + } + if end-start == 0 { + break + } + + store.WriteToShard(uint64(shardID), points[start:end]) + start = end + end += chunkSz + } +}