Merge pull request #2743 from influxdb/tsdb-benchmarks

add shard & index benchmarks
pull/2769/head
Todd Persen 2015-06-05 13:15:38 -07:00
commit 0ee71b9755
3 changed files with 349 additions and 5 deletions

View File

@ -137,6 +137,155 @@ func benchmarkMarshalTags(b *testing.B, keyN int) {
}
}
func BenchmarkCreateSeriesIndex_1K(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3))
}
func BenchmarkCreateSeriesIndex_100K(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(32, 5, 5))
}
func BenchmarkCreateSeriesIndex_1M(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(330, 5, 5))
}
func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
idxs := make([]*DatabaseIndex, 0, b.N)
for i := 0; i < b.N; i++ {
idxs = append(idxs, NewDatabaseIndex())
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
idx := idxs[n]
for _, s := range series {
idx.createSeriesIndexIfNotExists(s.Measurement, s.Series)
}
}
}
type TestSeries struct {
Measurement string
Series *Series
}
func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
measurements := genStrList("measurement", mCnt)
tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets()
series := []*TestSeries{}
for _, m := range measurements {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Series: &Series{
Key: fmt.Sprintf("%s:%s", m, string(marshalTags(ts))),
Tags: ts,
},
})
}
}
return series
}
type TagValGenerator struct {
Key string
Vals []string
idx int
}
func NewTagValGenerator(tagKey string, nVals int) *TagValGenerator {
tvg := &TagValGenerator{Key: tagKey}
for i := 0; i < nVals; i++ {
tvg.Vals = append(tvg.Vals, fmt.Sprintf("tagValue%d", i))
}
return tvg
}
func (tvg *TagValGenerator) First() string {
tvg.idx = 0
return tvg.Curr()
}
func (tvg *TagValGenerator) Curr() string {
return tvg.Vals[tvg.idx]
}
func (tvg *TagValGenerator) Next() string {
tvg.idx++
if tvg.idx >= len(tvg.Vals) {
tvg.idx--
return ""
}
return tvg.Curr()
}
type TagSet map[string]string
type TagSetGenerator struct {
TagVals []*TagValGenerator
}
func NewTagSetGenerator(nSets int, nTagVals ...int) *TagSetGenerator {
tsg := &TagSetGenerator{}
for i := 0; i < nSets; i++ {
nVals := nTagVals[0]
if i < len(nTagVals) {
nVals = nTagVals[i]
}
tagKey := fmt.Sprintf("tagKey%d", i)
tsg.TagVals = append(tsg.TagVals, NewTagValGenerator(tagKey, nVals))
}
return tsg
}
func (tsg *TagSetGenerator) First() TagSet {
for _, tsv := range tsg.TagVals {
tsv.First()
}
return tsg.Curr()
}
func (tsg *TagSetGenerator) Curr() TagSet {
ts := TagSet{}
for _, tvg := range tsg.TagVals {
ts[tvg.Key] = tvg.Curr()
}
return ts
}
func (tsg *TagSetGenerator) Next() TagSet {
val := ""
for _, tsv := range tsg.TagVals {
if val = tsv.Next(); val != "" {
break
} else {
tsv.First()
}
}
if val == "" {
return nil
}
return tsg.Curr()
}
func (tsg *TagSetGenerator) AllSets() []TagSet {
allSets := []TagSet{}
for ts := tsg.First(); ts != nil; ts = tsg.Next() {
allSets = append(allSets, ts)
}
return allSets
}
func genStrList(prefix string, n int) []string {
lst := make([]string, 0, n)
for i := 0; i < n; i++ {
lst = append(lst, fmt.Sprintf("%s%d", prefix, i))
}
return lst
}
// MustParseExpr parses an expression string and returns its AST representation.
func MustParseExpr(s string) influxql.Expr {
expr, err := influxql.ParseExpr(s)

View File

@ -3,18 +3,19 @@ package tsdb
import (
"io/ioutil"
"os"
"path"
"reflect"
"testing"
"time"
)
func TestShardWriteAndIndex(t *testing.T) {
path, _ := ioutil.TempDir("", "")
defer os.RemoveAll(path)
path += "/shard"
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
index := NewDatabaseIndex()
sh := NewShard(index, path)
sh := NewShard(index, tmpShard)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -59,7 +60,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = NewDatabaseIndex()
sh = NewShard(index, path)
sh = NewShard(index, tmpShard)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -73,3 +74,128 @@ func TestShardWriteAndIndex(t *testing.T) {
t.Fatalf(err.Error())
}
}
func BenchmarkWritePoints_NewSeries_1K(b *testing.B) { benchmarkWritePoints(b, 38, 3, 3, 1) }
func BenchmarkWritePoints_NewSeries_100K(b *testing.B) { benchmarkWritePoints(b, 32, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_250K(b *testing.B) { benchmarkWritePoints(b, 80, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_500K(b *testing.B) { benchmarkWritePoints(b, 160, 5, 5, 1) }
func BenchmarkWritePoints_NewSeries_1M(b *testing.B) { benchmarkWritePoints(b, 320, 5, 5, 1) }
func BenchmarkWritePoints_ExistingSeries_1K(b *testing.B) {
benchmarkWritePointsExistingSeries(b, 38, 3, 3, 1)
}
func BenchmarkWritePoints_ExistingSeries_100K(b *testing.B) {
benchmarkWritePointsExistingSeries(b, 32, 5, 5, 1)
}
func BenchmarkWritePoints_ExistingSeries_250K(b *testing.B) {
benchmarkWritePointsExistingSeries(b, 80, 5, 5, 1)
}
func BenchmarkWritePoints_ExistingSeries_500K(b *testing.B) {
benchmarkWritePointsExistingSeries(b, 160, 5, 5, 1)
}
func BenchmarkWritePoints_ExistingSeries_1M(b *testing.B) {
benchmarkWritePointsExistingSeries(b, 320, 5, 5, 1)
}
// benchmarkWritePoints benchmarks writing new series to a shard.
// mCnt - measurmeent count
// tkCnt - tag key count
// tvCnt - tag value count (values per tag)
// pntCnt - points per series. # of series = mCnt * (tvCnt ^ tkCnt)
func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := NewDatabaseIndex()
// Generate point data to write to the shard.
points := []Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}
// Stop & reset timers and mem-stats before the main benchmark loop.
b.StopTimer()
b.ResetTimer()
// Run the benchmark loop.
for n := 0; n < b.N; n++ {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
shard := NewShard(index, tmpShard)
shard.Open()
b.StartTimer()
// Call the function being benchmarked.
chunkedWrite(shard, points)
b.StopTimer()
shard.Close()
os.RemoveAll(tmpDir)
}
}
// benchmarkWritePointsExistingSeries benchmarks writing to existing series in a shard.
// mCnt - measurmeent count
// tkCnt - tag key count
// tvCnt - tag value count (values per tag)
// pntCnt - points per series. # of series = mCnt * (tvCnt ^ tkCnt)
func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Create index for the shard to use.
index := NewDatabaseIndex()
// Generate point data to write to the shard.
points := []Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}
tmpDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
shard := NewShard(index, tmpShard)
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
// Reset timers and mem-stats before the main benchmark loop.
b.ResetTimer()
// Run the benchmark loop.
for n := 0; n < b.N; n++ {
b.StopTimer()
for _, p := range points {
p.SetTime(p.Time().Add(time.Second))
}
b.StartTimer()
// Call the function being benchmarked.
chunkedWrite(shard, points)
}
}
func chunkedWrite(shard *Shard, points []Point) {
nPts := len(points)
chunkSz := 10000
start := 0
end := chunkSz
for {
if end > nPts {
end = nPts
}
if end-start == 0 {
break
}
shard.WritePoints(points[start:end])
start = end
end += chunkSz
}
}

View File

@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
)
func TestStoreOpen(t *testing.T) {
@ -206,3 +207,71 @@ func TestStoreOpenShardBadShardPath(t *testing.T) {
}
}
func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) }
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {
// Generate test series (measurements + unique tag sets).
series := genTestSeries(mCnt, tkCnt, tvCnt)
// Generate point data to write to the shards.
points := []Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := NewPoint(s.Measurement, s.Series.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}
// Create a temporary directory for the test data.
dir, _ := ioutil.TempDir("", "store_test")
// Create the store.
store := NewStore(dir)
// Open the store.
if err := store.Open(); err != nil {
b.Fatalf("benchmarkStoreOpen: %s", err)
}
// Create requested number of shards in the store & write points.
for shardID := 0; shardID < shardCnt; shardID++ {
if err := store.CreateShard("mydb", "myrp", uint64(shardID)); err != nil {
b.Fatalf("benchmarkStoreOpen: %s", err)
}
// Write points to the shard.
chunkedWriteStoreShard(store, shardID, points)
}
// Close the store.
if err := store.Close(); err != nil {
b.Fatalf("benchmarkStoreOpen: %s", err)
}
// Run the benchmark loop.
b.ResetTimer()
for n := 0; n < b.N; n++ {
store := NewStore(dir)
if err := store.Open(); err != nil {
b.Fatalf("benchmarkStoreOpen: %s", err)
}
b.StopTimer()
store.Close()
b.StartTimer()
}
}
func chunkedWriteStoreShard(store *Store, shardID int, points []Point) {
nPts := len(points)
chunkSz := 10000
start := 0
end := chunkSz
for {
if end > nPts {
end = nPts
}
if end-start == 0 {
break
}
store.WriteToShard(uint64(shardID), points[start:end])
start = end
end += chunkSz
}
}