fix integration tests
parent
420b8ed362
commit
56cb2fae5d
|
@ -2805,6 +2805,7 @@ func TestServer_Query_Wildcards(t *testing.T) {
|
||||||
t.Logf("SKIP:: %s", query.name)
|
t.Logf("SKIP:: %s", query.name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := query.Execute(s); err != nil {
|
if err := query.Execute(s); err != nil {
|
||||||
t.Error(query.Error(err))
|
t.Error(query.Error(err))
|
||||||
} else if !query.success() {
|
} else if !query.success() {
|
||||||
|
@ -3181,6 +3182,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
|
||||||
t.Logf("SKIP:: %s", query.name)
|
t.Logf("SKIP:: %s", query.name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := query.Execute(s); err != nil {
|
if err := query.Execute(s); err != nil {
|
||||||
t.Error(query.Error(err))
|
t.Error(query.Error(err))
|
||||||
} else if !query.success() {
|
} else if !query.success() {
|
||||||
|
|
|
@ -38,7 +38,7 @@ type multiCursor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek moves the cursor to a given key.
|
// Seek moves the cursor to a given key.
|
||||||
func (mc *multiCursor) Seek(seek int64) (key int64, value interface{}) {
|
func (mc *multiCursor) Seek(seek int64) (int64, interface{}) {
|
||||||
// Initialize heap.
|
// Initialize heap.
|
||||||
h := make(cursorHeap, 0, len(mc.cursors))
|
h := make(cursorHeap, 0, len(mc.cursors))
|
||||||
for i, c := range mc.cursors {
|
for i, c := range mc.cursors {
|
||||||
|
@ -73,7 +73,7 @@ func (mc *multiCursor) Ascending() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next key/value from the cursor.
|
// Next returns the next key/value from the cursor.
|
||||||
func (mc *multiCursor) Next() (key int64, value interface{}) { return mc.pop() }
|
func (mc *multiCursor) Next() (int64, interface{}) { return mc.pop() }
|
||||||
|
|
||||||
// pop returns the next item from the heap.
|
// pop returns the next item from the heap.
|
||||||
// Reads the next key/value from item's cursor and puts it back on the heap.
|
// Reads the next key/value from item's cursor and puts it back on the heap.
|
||||||
|
@ -153,8 +153,8 @@ type TagSetCursor struct {
|
||||||
cursors []*TagsCursor // Underlying tags cursors.
|
cursors []*TagsCursor // Underlying tags cursors.
|
||||||
currentTags map[string]string // the current tags for the underlying series cursor in play
|
currentTags map[string]string // the current tags for the underlying series cursor in play
|
||||||
|
|
||||||
SelectFields []string
|
SelectFields []string // fields to be selected
|
||||||
WhereFields []string
|
Fields []string // fields to be selected or filtered
|
||||||
|
|
||||||
// Min-heap of cursors ordered by timestamp.
|
// Min-heap of cursors ordered by timestamp.
|
||||||
heap *pointHeap
|
heap *pointHeap
|
||||||
|
@ -336,7 +336,7 @@ func NewTagsCursor(c Cursor, filter influxql.Expr, tags map[string]string) *Tags
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek positions returning the key and value at that key.
|
// Seek positions returning the key and value at that key.
|
||||||
func (c *TagsCursor) Seek(seek int64) (key int64, value interface{}) {
|
func (c *TagsCursor) Seek(seek int64) (int64, interface{}) {
|
||||||
// We've seeked on this cursor. This seek is after that previous cached seek
|
// We've seeked on this cursor. This seek is after that previous cached seek
|
||||||
// and the result it gave was after the key for this seek.
|
// and the result it gave was after the key for this seek.
|
||||||
//
|
//
|
||||||
|
@ -347,16 +347,16 @@ func (c *TagsCursor) Seek(seek int64) (key int64, value interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seek to key/value in underlying cursor.
|
// Seek to key/value in underlying cursor.
|
||||||
key, value = c.cursor.Seek(seek)
|
key, value := c.cursor.Seek(seek)
|
||||||
|
|
||||||
// Save the seek to the buffer.
|
// Save the seek to the buffer.
|
||||||
c.seek = seek
|
c.seek = seek
|
||||||
c.buf.key, c.buf.value = key, value
|
c.buf.key, c.buf.value = key, value
|
||||||
return
|
return key, value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next returns the next timestamp and value from the cursor.
|
// Next returns the next timestamp and value from the cursor.
|
||||||
func (c *TagsCursor) Next() (key int64, value interface{}) {
|
func (c *TagsCursor) Next() (int64, interface{}) {
|
||||||
// Invalidate the seek.
|
// Invalidate the seek.
|
||||||
c.seek = -1
|
c.seek = -1
|
||||||
c.buf.key, c.buf.value = 0, nil
|
c.buf.key, c.buf.value = 0, nil
|
||||||
|
|
|
@ -215,6 +215,7 @@ func (m *RawMapper) openMeasurement(mm *Measurement) error {
|
||||||
// Validate the fields and tags asked for exist and keep track of which are in the select vs the where
|
// Validate the fields and tags asked for exist and keep track of which are in the select vs the where
|
||||||
selectFields := mm.SelectFields(m.stmt)
|
selectFields := mm.SelectFields(m.stmt)
|
||||||
selectTags := mm.SelectTags(m.stmt)
|
selectTags := mm.SelectTags(m.stmt)
|
||||||
|
fields := uniqueStrings(m.selectFields, m.whereFields)
|
||||||
|
|
||||||
// If we only have tags in our select clause we just return
|
// If we only have tags in our select clause we just return
|
||||||
if len(selectFields) == 0 && len(selectTags) > 0 {
|
if len(selectFields) == 0 && len(selectTags) > 0 {
|
||||||
|
@ -234,7 +235,7 @@ func (m *RawMapper) openMeasurement(mm *Measurement) error {
|
||||||
cursors := []*TagsCursor{}
|
cursors := []*TagsCursor{}
|
||||||
|
|
||||||
for i, key := range t.SeriesKeys {
|
for i, key := range t.SeriesKeys {
|
||||||
c := m.tx.Cursor(key, selectFields, m.shard.FieldCodec(mm.Name), ascending)
|
c := m.tx.Cursor(key, fields, m.shard.FieldCodec(mm.Name), ascending)
|
||||||
if c == nil {
|
if c == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -246,7 +247,7 @@ func (m *RawMapper) openMeasurement(mm *Measurement) error {
|
||||||
|
|
||||||
tsc := NewTagSetCursor(mm.Name, t.Tags, cursors)
|
tsc := NewTagSetCursor(mm.Name, t.Tags, cursors)
|
||||||
tsc.SelectFields = m.selectFields
|
tsc.SelectFields = m.selectFields
|
||||||
tsc.WhereFields = m.whereFields
|
tsc.Fields = fields
|
||||||
if ascending {
|
if ascending {
|
||||||
tsc.Init(m.qmin)
|
tsc.Init(m.qmin)
|
||||||
} else {
|
} else {
|
||||||
|
@ -600,7 +601,7 @@ func (m *AggregateMapper) NextChunk() (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsc.SelectFields = []string{m.fieldNames[i]}
|
tsc.SelectFields = []string{m.fieldNames[i]}
|
||||||
tsc.WhereFields = m.whereFields
|
tsc.Fields = uniqueStrings([]string{m.fieldNames[i]}, m.whereFields)
|
||||||
|
|
||||||
// Execute the map function which walks the entire interval, and aggregates the result.
|
// Execute the map function which walks the entire interval, and aggregates the result.
|
||||||
mapValue := m.mapFuncs[i](&AggregateTagSetCursor{
|
mapValue := m.mapFuncs[i](&AggregateTagSetCursor{
|
||||||
|
@ -660,3 +661,21 @@ func (a *AggregateTagSetCursor) TMin() int64 {
|
||||||
}
|
}
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// uniqueStrings returns a slice of unique strings from all lists in a.
|
||||||
|
func uniqueStrings(a ...[]string) []string {
|
||||||
|
// Calculate unique set of strings.
|
||||||
|
m := make(map[string]struct{})
|
||||||
|
for _, strs := range a {
|
||||||
|
for _, str := range strs {
|
||||||
|
m[str] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert back to slice.
|
||||||
|
result := make([]string, 0, len(m))
|
||||||
|
for k := range m {
|
||||||
|
result = append(result, k)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue