Add test for TagValueSeriesIDIterator
parent
81f640e9ae
commit
52b5640a4a
|
@ -133,7 +133,6 @@ func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
|
|||
|
||||
name, tags := ParseSeriesKey(key)
|
||||
deleted := itr.sfile.IsDeleted(elem.SeriesID)
|
||||
|
||||
return &seriesElemAdapter{
|
||||
name: name,
|
||||
tags: tags,
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -20,7 +21,7 @@ const M, K = 4096, 6
|
|||
|
||||
// Ensure index can iterate over all measurement names.
|
||||
func TestIndex_ForEachMeasurementName(t *testing.T) {
|
||||
idx := MustOpenIndex(1)
|
||||
idx := MustOpenDefaultIndex()
|
||||
defer idx.Close()
|
||||
|
||||
// Add series to index.
|
||||
|
@ -73,7 +74,7 @@ func TestIndex_ForEachMeasurementName(t *testing.T) {
|
|||
|
||||
// Ensure index can return whether a measurement exists.
|
||||
func TestIndex_MeasurementExists(t *testing.T) {
|
||||
idx := MustOpenIndex(1)
|
||||
idx := MustOpenDefaultIndex()
|
||||
defer idx.Close()
|
||||
|
||||
// Add series to index.
|
||||
|
@ -135,7 +136,7 @@ func TestIndex_MeasurementExists(t *testing.T) {
|
|||
|
||||
// Ensure index can return a list of matching measurements.
|
||||
func TestIndex_MeasurementNamesByRegex(t *testing.T) {
|
||||
idx := MustOpenIndex(1)
|
||||
idx := MustOpenDefaultIndex()
|
||||
defer idx.Close()
|
||||
|
||||
// Add series to index.
|
||||
|
@ -160,7 +161,7 @@ func TestIndex_MeasurementNamesByRegex(t *testing.T) {
|
|||
|
||||
// Ensure index can delete a measurement and all related keys, values, & series.
|
||||
func TestIndex_DropMeasurement(t *testing.T) {
|
||||
idx := MustOpenIndex(1)
|
||||
idx := MustOpenDefaultIndex()
|
||||
defer idx.Close()
|
||||
|
||||
// Add series to index.
|
||||
|
@ -207,7 +208,7 @@ func TestIndex_DropMeasurement(t *testing.T) {
|
|||
|
||||
func TestIndex_Open(t *testing.T) {
|
||||
// Opening a fresh index should set the MANIFEST version to current version.
|
||||
idx := NewIndex(tsi1.DefaultPartitionN)
|
||||
idx := NewDefaultIndex()
|
||||
t.Run("open new index", func(t *testing.T) {
|
||||
if err := idx.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -236,7 +237,7 @@ func TestIndex_Open(t *testing.T) {
|
|||
incompatibleVersions := []int{-1, 0, 2}
|
||||
for _, v := range incompatibleVersions {
|
||||
t.Run(fmt.Sprintf("incompatible index version: %d", v), func(t *testing.T) {
|
||||
idx = NewIndex(tsi1.DefaultPartitionN)
|
||||
idx = NewDefaultIndex()
|
||||
// Manually create a MANIFEST file for an incompatible index version.
|
||||
// under one of the partitions.
|
||||
partitionPath := filepath.Join(idx.Path(), "2")
|
||||
|
@ -310,6 +311,88 @@ func TestIndex_DiskSizeBytes(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestIndex_TagValueSeriesIDIterator(t *testing.T) {
|
||||
idx := MustOpenDefaultIndex()
|
||||
defer idx.Close()
|
||||
|
||||
// Add some series.
|
||||
data := []struct {
|
||||
Key string
|
||||
Name string
|
||||
Tags map[string]string
|
||||
}{
|
||||
{"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}},
|
||||
{"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}},
|
||||
{"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}},
|
||||
{"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}},
|
||||
{"cpu,region=south,server=s", "cpu", map[string]string{"region": "south", "server": "s"}},
|
||||
{"mem,region=west,server=a", "mem", map[string]string{"region": "west", "server": "a"}},
|
||||
{"mem,region=west,server=b", "mem", map[string]string{"region": "west", "server": "b"}},
|
||||
{"mem,region=west,server=c", "mem", map[string]string{"region": "west", "server": "c"}},
|
||||
{"disk,region=east,server=a", "disk", map[string]string{"region": "east", "server": "a"}},
|
||||
{"disk,region=east,server=a", "disk", map[string]string{"region": "east", "server": "a"}},
|
||||
{"disk,region=north,server=c", "disk", map[string]string{"region": "north", "server": "c"}},
|
||||
}
|
||||
|
||||
for _, pt := range data {
|
||||
if err := idx.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
testTagValueSeriesIDIterator := func(t *testing.T, name, key, value string, expKeys []string) {
|
||||
sitr, err := idx.TagValueSeriesIDIterator([]byte(name), []byte(key), []byte(value))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sitr == nil {
|
||||
t.Fatal("series id iterater nil")
|
||||
}
|
||||
|
||||
// Convert series ids to series keys.
|
||||
itr := tsdb.NewSeriesIteratorAdapter(idx.SeriesFile.SeriesFile, sitr)
|
||||
if itr == nil {
|
||||
t.Fatal("got nil iterator")
|
||||
}
|
||||
defer itr.Close()
|
||||
|
||||
var keys []string
|
||||
for e, err := itr.Next(); err == nil; e, err = itr.Next() {
|
||||
if e == nil {
|
||||
break
|
||||
}
|
||||
keys = append(keys, string(models.MakeKey(e.Name(), e.Tags())))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Iterator was in series id order, which may not be series key order.
|
||||
sort.Strings(keys)
|
||||
if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("got %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that correct series are initially returned
|
||||
t.Run("initial", func(t *testing.T) {
|
||||
testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{
|
||||
"mem,region=west,server=a",
|
||||
"mem,region=west,server=b",
|
||||
"mem,region=west,server=c",
|
||||
})
|
||||
})
|
||||
|
||||
// The result should now be cached, and the same result should be returned.
|
||||
t.Run("cached", func(t *testing.T) {
|
||||
testTagValueSeriesIDIterator(t, "mem", "region", "west", []string{
|
||||
"mem,region=west,server=a",
|
||||
"mem,region=west,server=b",
|
||||
"mem,region=west,server=c",
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Index is a test wrapper for tsi1.Index.
|
||||
type Index struct {
|
||||
*tsi1.Index
|
||||
|
@ -324,6 +407,11 @@ func NewIndex(partitionN uint64) *Index {
|
|||
return idx
|
||||
}
|
||||
|
||||
// NewIndex returns a new instance of Index with default number of partitions at a temporary path.
|
||||
func NewDefaultIndex() *Index {
|
||||
return NewIndex(tsi1.DefaultPartitionN)
|
||||
}
|
||||
|
||||
// MustOpenIndex returns a new, open index. Panic on error.
|
||||
func MustOpenIndex(partitionN uint64) *Index {
|
||||
idx := NewIndex(partitionN)
|
||||
|
@ -333,6 +421,11 @@ func MustOpenIndex(partitionN uint64) *Index {
|
|||
return idx
|
||||
}
|
||||
|
||||
// MustOpenIndex returns a new, open index with the default number of partitions.
|
||||
func MustOpenDefaultIndex() *Index {
|
||||
return MustOpenIndex(tsi1.DefaultPartitionN)
|
||||
}
|
||||
|
||||
// Open opens the underlying tsi1.Index and tsdb.SeriesFile
|
||||
func (idx Index) Open() error {
|
||||
if err := idx.SeriesFile.Open(); err != nil {
|
||||
|
|
|
@ -236,7 +236,6 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append new key to be added to hash map after flush.
|
||||
ids[i] = id
|
||||
newIDs[string(key)] = id
|
||||
|
|
Loading…
Reference in New Issue