Merge pull request #4180 from benbjohnson/refactor-select-mapper

Cursor & SelectMapper Refactor (WIP)
pull/4201/merge
Ben Johnson 2015-09-22 14:07:05 -06:00
commit 522f9eaef4
23 changed files with 1581 additions and 1692 deletions

View File

@ -16,6 +16,7 @@
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor
## v0.9.4 [2015-09-14]

View File

@ -1,6 +1,7 @@
package cluster
import (
"encoding/json"
"fmt"
"math/rand"
"net"
@ -40,11 +41,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper {
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.MetaStore.NodeID()) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
@ -53,7 +50,13 @@ func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, c
}
conn.SetDeadline(time.Now().Add(s.timeout))
m.SetRemote(NewRemoteMapper(conn, sh.ID, stmt, chunkSize))
return NewRemoteMapper(conn, sh.ID, stmt, chunkSize), nil
}
// If it is local then return the mapper from the store.
m, err := s.TSDBStore.CreateMapper(sh.ID, stmt, chunkSize)
if err != nil {
return nil, err
}
return m, nil
@ -87,6 +90,8 @@ type RemoteMapper struct {
conn net.Conn
bufferedResponse *MapShardResponse
unmarshallers []tsdb.UnmarshalFunc // Mapping-specific unmarshal functions.
}
// NewRemoteMapper returns a new remote mapper using the given connection.
@ -106,6 +111,7 @@ func (r *RemoteMapper) Open() (err error) {
r.conn.Close()
}
}()
// Build Map request.
var request MapShardRequest
request.SetShardID(r.shardID)
@ -143,11 +149,18 @@ func (r *RemoteMapper) Open() (err error) {
r.tagsets = r.bufferedResponse.TagSets()
r.fields = r.bufferedResponse.Fields()
return nil
}
// Set up each mapping function for this statement.
if stmt, ok := r.stmt.(*influxql.SelectStatement); ok {
for _, c := range stmt.FunctionCalls() {
fn, err := tsdb.InitializeUnmarshaller(c)
if err != nil {
return err
}
r.unmarshallers = append(r.unmarshallers, fn)
}
}
func (r *RemoteMapper) SetRemote(m tsdb.Mapper) error {
return fmt.Errorf("cannot set remote mapper on a remote mapper")
return nil
}
func (r *RemoteMapper) TagSets() []string {
@ -187,7 +200,55 @@ func (r *RemoteMapper) NextChunk() (chunk interface{}, err error) {
return nil, nil
}
return response.Data(), err
moj := &tsdb.MapperOutputJSON{}
if err := json.Unmarshal(response.Data(), moj); err != nil {
return nil, err
}
mvj := []*tsdb.MapperValueJSON{}
if err := json.Unmarshal(moj.Values, &mvj); err != nil {
return nil, err
}
// Prep the non-JSON version of Mapper output.
mo := &tsdb.MapperOutput{
Name: moj.Name,
Tags: moj.Tags,
Fields: moj.Fields,
}
if len(mvj) == 1 && len(mvj[0].AggData) > 0 {
// The MapperValue is carrying aggregate data, so run it through the
// custom unmarshallers for the map functions through which the data
// was mapped.
aggValues := []interface{}{}
for i, b := range mvj[0].AggData {
v, err := r.unmarshallers[i](b)
if err != nil {
return nil, err
}
aggValues = append(aggValues, v)
}
mo.Values = []*tsdb.MapperValue{&tsdb.MapperValue{
Value: aggValues,
Tags: mvj[0].Tags,
}}
} else {
// Must be raw data instead.
for _, v := range mvj {
var rawValue interface{}
if err := json.Unmarshal(v.RawData, &rawValue); err != nil {
return nil, err
}
mo.Values = append(mo.Values, &tsdb.MapperValue{
Time: v.Time,
Value: rawValue,
Tags: v.Tags,
})
}
}
return mo, nil
}
// Close the Mapper

View File

@ -80,14 +80,10 @@ func TestShardWriter_RemoteMapper_Success(t *testing.T) {
if err != nil {
t.Fatalf("failed to get next chunk from mapper: %s", err.Error())
}
b, ok := chunk.([]byte)
output, ok := chunk.(*tsdb.MapperOutput)
if !ok {
t.Fatal("chunk is not of expected type")
}
output := &tsdb.MapperOutput{}
if err := json.Unmarshal(b, output); err != nil {
t.Fatal(err)
}
if output.Name != "cpu" {
t.Fatalf("received output incorrect, exp: %v, got %v", expOutput, output)
}

View File

@ -2805,6 +2805,7 @@ func TestServer_Query_Wildcards(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
@ -3181,6 +3182,7 @@ func TestServer_Query_Where_Fields(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {

View File

@ -77,8 +77,7 @@ func main() {
for _, key := range series {
fieldSummary := []string{}
cursor := tx.Cursor(key, tsdb.Forward)
cursor := tx.Cursor(key, m.FieldNames(), shard.FieldCodec(m.Name), true)
// Series doesn't exist in this shard
if cursor == nil {
@ -86,21 +85,16 @@ func main() {
}
// Seek to the beginning
_, value := cursor.Seek([]byte{})
codec := shard.FieldCodec(m.Name)
if codec != nil {
fields, err := codec.DecodeFieldsWithNames(value)
if err != nil {
fmt.Printf("Failed to decode values: %v", err)
}
_, fields := cursor.SeekTo(0)
if fields, ok := fields.(map[string]interface{}); ok {
for field, value := range fields {
fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value))
}
sort.Strings(fieldSummary)
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
}
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
break
}
tx.Rollback()

View File

@ -718,6 +718,18 @@ type SelectStatement struct {
FillValue interface{}
}
// SourceNames returns a list of source names.
func (s *SelectStatement) SourceNames() []string {
a := make([]string, 0, len(s.Sources))
for _, src := range s.Sources {
switch src := src.(type) {
case *Measurement:
a = append(a, src.Name)
}
}
return a
}
// HasDerivative returns true if one of the function calls in the statement is a
// derivative aggregate
func (s *SelectStatement) HasDerivative() bool {
@ -743,6 +755,11 @@ func (s *SelectStatement) IsSimpleDerivative() bool {
return false
}
// TimeAscending returns true if the time field is sorted in chronological order.
func (s *SelectStatement) TimeAscending() bool {
return len(s.SortFields) == 0 || s.SortFields[0].Ascending
}
// Clone returns a deep copy of the statement.
func (s *SelectStatement) Clone() *SelectStatement {
clone := &SelectStatement{
@ -1517,6 +1534,25 @@ func (s *SelectStatement) NamesInDimension() []string {
return a
}
// LimitTagSets returns a tag set list with SLIMIT and SOFFSET applied.
func (s *SelectStatement) LimitTagSets(a []*TagSet) []*TagSet {
// Ignore if no limit or offset is specified.
if s.SLimit == 0 && s.SOffset == 0 {
return a
}
// If offset is beyond the number of tag sets then return nil.
if s.SOffset > len(a) {
return nil
}
// Clamp limit to the max number of tag sets.
if s.SOffset+s.SLimit > len(a) {
s.SLimit = len(a) - s.SOffset
}
return a[s.SOffset : s.SOffset+s.SLimit]
}
// walkNames will walk the Expr and return the database fields
func walkNames(exp Expr) []string {
switch expr := exp.(type) {
@ -2950,6 +2986,13 @@ func evalBinaryExpr(expr *BinaryExpr, m map[string]interface{}) interface{} {
return nil
}
// EvalBool evaluates expr and returns true if result is a boolean true.
// Otherwise returns false.
func EvalBool(expr Expr, m map[string]interface{}) bool {
v, _ := Eval(expr, m).(bool)
return v
}
// Reduce evaluates expr using the available values in valuer.
// References that don't exist in valuer are ignored.
func Reduce(expr Expr, valuer Valuer) Expr {

View File

@ -1,35 +1,22 @@
package tsdb
import (
"bytes"
"container/heap"
"encoding/binary"
"sort"
"strings"
"github.com/influxdb/influxdb/influxql"
)
// Direction represents a cursor navigation direction.
type Direction bool
// EOF represents a "not found" key returned by a Cursor.
const EOF = int64(-1)
const (
// Forward indicates that a cursor will move forward over its values.
Forward Direction = true
// Reverse indicates that a cursor will move backwards over its values.
Reverse Direction = false
)
func (d Direction) String() string {
if d.Forward() {
return "forward"
}
return "reverse"
}
// Forward returns true if direction is forward
func (d Direction) Forward() bool {
return d == Forward
}
// Forward returns true if direction is reverse
func (d Direction) Reverse() bool {
return d == Reverse
// Cursor represents an iterator over a series.
type Cursor interface {
SeekTo(seek int64) (key int64, value interface{})
Next() (key int64, value interface{})
Ascending() bool
}
// MultiCursor returns a single cursor that combines the results of all cursors in order.
@ -37,26 +24,27 @@ func (d Direction) Reverse() bool {
// If the same key is returned from multiple cursors then the first cursor
// specified will take precendence. A key will only be returned once from the
// returned cursor.
func MultiCursor(d Direction, cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors, direction: d}
func MultiCursor(cursors ...Cursor) Cursor {
return &multiCursor{
cursors: cursors,
}
}
// multiCursor represents a cursor that combines multiple cursors into one.
type multiCursor struct {
cursors []Cursor
heap cursorHeap
prev []byte
direction Direction
cursors []Cursor
heap cursorHeap
prev int64 // previously read key
}
// Seek moves the cursor to a given key.
func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
func (mc *multiCursor) SeekTo(seek int64) (int64, interface{}) {
// Initialize heap.
h := make(cursorHeap, 0, len(mc.cursors))
for i, c := range mc.cursors {
// Move cursor to position. Skip if it's empty.
k, v := c.Seek(seek)
if k == nil {
k, v := c.SeekTo(seek)
if k == EOF {
continue
}
@ -71,26 +59,32 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
heap.Init(&h)
mc.heap = h
mc.prev = nil
mc.prev = EOF
return mc.pop()
}
func (mc *multiCursor) Direction() Direction { return mc.direction }
// Ascending returns the direction of the first cursor.
func (mc *multiCursor) Ascending() bool {
if len(mc.cursors) == 0 {
return true
}
return mc.cursors[0].Ascending()
}
// Next returns the next key/value from the cursor.
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }
func (mc *multiCursor) Next() (int64, interface{}) { return mc.pop() }
// pop returns the next item from the heap.
// Reads the next key/value from item's cursor and puts it back on the heap.
func (mc *multiCursor) pop() (key, value []byte) {
func (mc *multiCursor) pop() (key int64, value interface{}) {
// Read items until we have a key that doesn't match the previously read one.
// This is to perform deduplication when there's multiple items with the same key.
// The highest priority cursor will be read first and then remaining keys will be dropped.
for {
// Return nil if there are no more items left.
// Return EOF marker if there are no more items left.
if len(mc.heap) == 0 {
return nil, nil
return EOF, nil
}
// Read the next item from the heap.
@ -100,12 +94,12 @@ func (mc *multiCursor) pop() (key, value []byte) {
key, value = item.key, item.value
// Read the next item from the cursor. Push back to heap if one exists.
if item.key, item.value = item.cursor.Next(); item.key != nil {
if item.key, item.value = item.cursor.Next(); item.key != EOF {
heap.Push(&mc.heap, item)
}
// Skip if this key matches the previously returned one.
if bytes.Equal(mc.prev, key) {
if key == mc.prev {
continue
}
@ -120,17 +114,16 @@ type cursorHeap []*cursorHeapItem
func (h cursorHeap) Len() int { return len(h) }
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h cursorHeap) Less(i, j int) bool {
dir := -1
if !h[i].cursor.Direction() {
dir = 1
}
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir {
return true
} else if cmp == 0 {
// Use priority if the keys are the same.
if h[i].key == h[j].key {
return h[i].priority > h[j].priority
}
return false
// Otherwise compare based on cursor direction.
if h[i].cursor.Ascending() {
return h[i].key < h[j].key
}
return h[i].key > h[j].key
}
func (h *cursorHeap) Push(x interface{}) {
@ -147,8 +140,246 @@ func (h *cursorHeap) Pop() interface{} {
// cursorHeapItem is something we manage in a priority queue.
type cursorHeapItem struct {
key []byte
value []byte
key int64
value interface{}
cursor Cursor
priority int
}
// TagSetCursor is virtual cursor that iterates over multiple TagsCursors.
type TagSetCursor struct {
measurement string // Measurement name
tags map[string]string // Tag key-value pairs
cursors []*TagsCursor // Underlying tags cursors.
currentTags map[string]string // the current tags for the underlying series cursor in play
SelectFields []string // fields to be selected
Fields []string // fields to be selected or filtered
// Min-heap of cursors ordered by timestamp.
heap *pointHeap
// Memoize the cursor's tagset-based key.
memokey string
}
// NewTagSetCursor returns a instance of TagSetCursor.
func NewTagSetCursor(m string, t map[string]string, c []*TagsCursor) *TagSetCursor {
return &TagSetCursor{
measurement: m,
tags: t,
cursors: c,
heap: newPointHeap(),
}
}
func (tsc *TagSetCursor) key() string {
if tsc.memokey == "" {
if len(tsc.tags) == 0 {
tsc.memokey = tsc.measurement
} else {
tsc.memokey = strings.Join([]string{tsc.measurement, string(MarshalTags(tsc.tags))}, "|")
}
}
return tsc.memokey
}
func (tsc *TagSetCursor) Init(seek int64) {
tsc.heap = newPointHeap()
// Prime the buffers.
for i := 0; i < len(tsc.cursors); i++ {
k, v := tsc.cursors[i].SeekTo(seek)
if k == EOF {
k, v = tsc.cursors[i].Next()
}
if k == EOF {
continue
}
heap.Push(tsc.heap, &pointHeapItem{
timestamp: k,
value: v,
cursor: tsc.cursors[i],
})
}
}
// Next returns the next matching series-key, timestamp byte slice and meta tags for the tagset. Filtering
// is enforced on the values. If there is no matching value, then a nil result is returned.
func (tsc *TagSetCursor) Next(tmin, tmax int64) (int64, interface{}) {
for {
// If we're out of points, we're done.
if tsc.heap.Len() == 0 {
return -1, nil
}
// Grab the next point with the lowest timestamp.
p := heap.Pop(tsc.heap).(*pointHeapItem)
// We're done if the point is outside the query's time range [tmin:tmax).
if p.timestamp != tmin && (p.timestamp < tmin || p.timestamp >= tmax) {
return -1, nil
}
// Save timestamp & value.
timestamp, value := p.timestamp, p.value
// Keep track of the current tags for the series cursor so we can
// respond with them if asked
tsc.currentTags = p.cursor.tags
// Advance the cursor.
if nextKey, nextVal := p.cursor.Next(); nextKey != -1 {
*p = pointHeapItem{
timestamp: nextKey,
value: nextVal,
cursor: p.cursor,
}
heap.Push(tsc.heap, p)
}
// Value didn't match, look for the next one.
if value == nil {
continue
}
// Filter value.
if p.cursor.filter != nil {
// Convert value to a map for filter evaluation.
m, ok := value.(map[string]interface{})
if !ok {
m = map[string]interface{}{tsc.SelectFields[0]: value}
}
// If filter fails then skip to the next value.
if !influxql.EvalBool(p.cursor.filter, m) {
continue
}
}
// Filter out single field, if specified.
if len(tsc.SelectFields) == 1 {
if m, ok := value.(map[string]interface{}); ok {
value = m[tsc.SelectFields[0]]
}
if value == nil {
continue
}
}
return timestamp, value
}
}
// Tags returns the current tags of the current cursor
// if there is no current currsor, it returns nil
func (tsc *TagSetCursor) Tags() map[string]string { return tsc.currentTags }
type pointHeapItem struct {
timestamp int64
value interface{}
cursor *TagsCursor // cursor whence pointHeapItem came
}
type pointHeap []*pointHeapItem
func newPointHeap() *pointHeap {
q := make(pointHeap, 0)
heap.Init(&q)
return &q
}
func (pq pointHeap) Len() int { return len(pq) }
func (pq pointHeap) Less(i, j int) bool {
// We want a min-heap (points in chronological order), so use less than.
return pq[i].timestamp < pq[j].timestamp
}
func (pq pointHeap) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
func (pq *pointHeap) Push(x interface{}) {
item := x.(*pointHeapItem)
*pq = append(*pq, item)
}
func (pq *pointHeap) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// TagsCursor is a cursor with attached tags and filter.
type TagsCursor struct {
cursor Cursor
filter influxql.Expr
tags map[string]string
seek int64
buf struct {
key int64
value interface{}
}
}
// NewTagsCursor returns a new instance of a series cursor.
func NewTagsCursor(c Cursor, filter influxql.Expr, tags map[string]string) *TagsCursor {
return &TagsCursor{
cursor: c,
filter: filter,
tags: tags,
seek: EOF,
}
}
// Seek positions returning the key and value at that key.
func (c *TagsCursor) SeekTo(seek int64) (int64, interface{}) {
// We've seeked on this cursor. This seek is after that previous cached seek
// and the result it gave was after the key for this seek.
//
// In this case, any seek would just return what we got before, so there's
// no point in reseeking.
if c.seek != -1 && c.seek < seek && (c.buf.key == EOF || c.buf.key >= seek) {
return c.buf.key, c.buf.value
}
// Seek to key/value in underlying cursor.
key, value := c.cursor.SeekTo(seek)
// Save the seek to the buffer.
c.seek = seek
c.buf.key, c.buf.value = key, value
return key, value
}
// Next returns the next timestamp and value from the cursor.
func (c *TagsCursor) Next() (int64, interface{}) {
// Invalidate the seek.
c.seek = -1
c.buf.key, c.buf.value = 0, nil
// Return next key/value.
return c.cursor.Next()
}
// TagSetCursors represents a sortable slice of TagSetCursors.
type TagSetCursors []*TagSetCursor
func (a TagSetCursors) Len() int { return len(a) }
func (a TagSetCursors) Less(i, j int) bool { return a[i].key() < a[j].key() }
func (a TagSetCursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagSetCursors) Keys() []string {
keys := []string{}
for i := range a {
keys = append(keys, a[i].key())
}
sort.Strings(keys)
return keys
}
// btou64 converts an 8-byte slice into an uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }

View File

@ -14,191 +14,187 @@ import (
// Ensure the multi-cursor can correctly iterate across a single subcursor.
func TestMultiCursor_Single(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Forward,
NewCursor(tsdb.Forward, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x01}, Value: []byte{0x10}},
{Key: []byte{0x02}, Value: []byte{0x20}},
}),
)
mc := tsdb.MultiCursor(NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 1, Value: 10},
{Key: 2, Value: 20},
}, true))
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can correctly iterate across a single subcursor in reverse order.
func TestMultiCursor_Single_Reverse(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Reverse,
NewCursor(tsdb.Reverse, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x01}, Value: []byte{0x10}},
{Key: []byte{0x02}, Value: []byte{0x20}},
}),
)
mc := tsdb.MultiCursor(NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 1, Value: 10},
{Key: 2, Value: 20},
}, false))
if k, v := mc.Seek([]byte{0x02}); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
if k, v := mc.SeekTo(2); k != 2 || v.(int) != 20 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
} else if k, v = mc.Next(); k != 0 || v.(int) != 0 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Forward,
NewCursor(tsdb.Forward, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x03}, Value: []byte{0x30}},
{Key: []byte{0x04}, Value: []byte{0x40}},
}),
NewCursor(tsdb.Forward, []CursorItem{
{Key: []byte{0x01}, Value: []byte{0x10}},
{Key: []byte{0x02}, Value: []byte{0x20}},
}),
mc := tsdb.MultiCursor(
NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 3, Value: 30},
{Key: 4, Value: 40},
}, true),
NewCursor([]CursorItem{
{Key: 1, Value: 10},
{Key: 2, Value: 20},
}, true),
)
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x30}) {
} else if k, v = mc.Next(); k != 3 || v.(int) != 30 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x40}) {
} else if k, v = mc.Next(); k != 4 || v.(int) != 40 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
func TestMultiCursor_Multiple_NonOverlapping_Reverse(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Reverse,
NewCursor(tsdb.Reverse, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x03}, Value: []byte{0x30}},
{Key: []byte{0x04}, Value: []byte{0x40}},
}),
NewCursor(tsdb.Reverse, []CursorItem{
{Key: []byte{0x01}, Value: []byte{0x10}},
{Key: []byte{0x02}, Value: []byte{0x20}},
}),
mc := tsdb.MultiCursor(
NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 3, Value: 30},
{Key: 4, Value: 40},
}, false),
NewCursor([]CursorItem{
{Key: 1, Value: 10},
{Key: 2, Value: 20},
}, false),
)
if k, v := mc.Seek([]byte{0x04}); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x40}) {
if k, v := mc.SeekTo(4); k != 4 || v.(int) != 40 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x30}) {
} else if k, v = mc.Next(); k != 3 || v.(int) != 30 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0x20}) {
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x01}) || !bytes.Equal(v, []byte{0x10}) {
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
} else if k, v = mc.Next(); k != 0 || v.(int) != 00 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
func TestMultiCursor_Multiple_Overlapping(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Forward,
NewCursor(tsdb.Forward, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x03}, Value: []byte{0x03}},
{Key: []byte{0x04}, Value: []byte{0x04}},
}),
NewCursor(tsdb.Forward, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0xF0}},
{Key: []byte{0x02}, Value: []byte{0xF2}},
{Key: []byte{0x04}, Value: []byte{0xF4}},
}),
mc := tsdb.MultiCursor(
NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 3, Value: 3},
{Key: 4, Value: 4},
}, true),
NewCursor([]CursorItem{
{Key: 0, Value: 0xF0},
{Key: 2, Value: 0xF2},
{Key: 4, Value: 0xF4},
}, true),
)
if k, v := mc.Seek([]byte{0x00}); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0xF2}) {
} else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x03}) {
} else if k, v = mc.Next(); k != 3 || v.(int) != 3 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x04}) {
} else if k, v = mc.Next(); k != 4 || v.(int) != 4 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
func TestMultiCursor_Multiple_Overlapping_Reverse(t *testing.T) {
mc := tsdb.MultiCursor(tsdb.Reverse,
NewCursor(tsdb.Reverse, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0x00}},
{Key: []byte{0x03}, Value: []byte{0x03}},
{Key: []byte{0x04}, Value: []byte{0x04}},
}),
NewCursor(tsdb.Reverse, []CursorItem{
{Key: []byte{0x00}, Value: []byte{0xF0}},
{Key: []byte{0x02}, Value: []byte{0xF2}},
{Key: []byte{0x04}, Value: []byte{0xF4}},
}),
mc := tsdb.MultiCursor(
NewCursor([]CursorItem{
{Key: 0, Value: 0},
{Key: 3, Value: 3},
{Key: 4, Value: 4},
}, false),
NewCursor([]CursorItem{
{Key: 0, Value: 0xF0},
{Key: 2, Value: 0xF2},
{Key: 4, Value: 0xF4},
}, false),
)
if k, v := mc.Seek([]byte{0x04}); !bytes.Equal(k, []byte{0x04}) || !bytes.Equal(v, []byte{0x04}) {
if k, v := mc.SeekTo(4); k != 4 || v.(int) != 4 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x03}) || !bytes.Equal(v, []byte{0x03}) {
} else if k, v = mc.Next(); k != 3 || v.(int) != 3 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x02}) || !bytes.Equal(v, []byte{0xF2}) {
} else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); !bytes.Equal(k, []byte{0x00}) || !bytes.Equal(v, []byte{0x00}) {
} else if k, v = mc.Next(); k != 0 || v.(int) != 0 {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = mc.Next(); k != nil {
} else if k, v = mc.Next(); k != tsdb.EOF {
t.Fatalf("expected eof, got: %x / %x", k, v)
}
}
// Ensure the multi-cursor can handle randomly generated data.
func TestMultiCursor_Quick(t *testing.T) {
quick.Check(func(seek uint64, cursors []Cursor) bool {
var got, exp [][]byte
seek %= 100
quick.Check(func(useek uint64, cursors []Cursor) bool {
var got, exp []CursorItem
seek := int64(useek) % 100
// Merge all cursor data to determine expected output.
// First seen key overrides all other items with the same key.
m := make(map[string][]byte)
m := make(map[int64]CursorItem)
for _, c := range cursors {
for _, item := range c.items {
if bytes.Compare(item.Key, u64tob(seek)) == -1 {
if item.Key < seek {
continue
}
if _, ok := m[string(item.Key)]; ok {
if _, ok := m[item.Key]; ok {
continue
}
m[string(item.Key)] = item.Value
m[item.Key] = item
}
}
// Convert map back to single item list.
for k, v := range m {
exp = append(exp, append([]byte(k), v...))
for _, item := range m {
exp = append(exp, item)
}
sort.Sort(byteSlices(exp))
sort.Sort(CursorItems(exp))
// Create multi-cursor and iterate over all items.
mc := tsdb.MultiCursor(tsdb.Forward, tsdbCursorSlice(cursors)...)
for k, v := mc.Seek(u64tob(seek)); k != nil; k, v = mc.Next() {
got = append(got, append(k, v...))
mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...)
for k, v := mc.SeekTo(seek); k != tsdb.EOF; k, v = mc.Next() {
got = append(got, CursorItem{k, v.(int)})
}
// Verify results.
@ -212,65 +208,69 @@ func TestMultiCursor_Quick(t *testing.T) {
// Cursor represents an in-memory test cursor.
type Cursor struct {
direction tsdb.Direction
items []CursorItem
index int
ascending bool
}
// NewCursor returns a new instance of Cursor.
func NewCursor(direction tsdb.Direction, items []CursorItem) *Cursor {
func NewCursor(items []CursorItem, ascending bool) *Cursor {
index := 0
sort.Sort(CursorItems(items))
if direction.Reverse() {
if !ascending {
index = len(items)
}
return &Cursor{direction: direction, items: items, index: index}
return &Cursor{
items: items,
index: index,
ascending: ascending,
}
}
func (c *Cursor) Direction() tsdb.Direction { return c.direction }
func (c *Cursor) Ascending() bool { return c.ascending }
// Seek seeks to an item by key.
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
if c.direction.Forward() {
func (c *Cursor) SeekTo(seek int64) (key int64, value interface{}) {
if c.ascending {
return c.seekForward(seek)
}
return c.seekReverse(seek)
}
func (c *Cursor) seekForward(seek []byte) (key, value []byte) {
func (c *Cursor) seekForward(seek int64) (key int64, value interface{}) {
for c.index = 0; c.index < len(c.items); c.index++ {
if bytes.Compare(c.items[c.index].Key, seek) == -1 { // skip keys less than seek
if c.items[c.index].Key < seek { // skip keys less than seek
continue
}
return c.items[c.index].Key, c.items[c.index].Value
}
return nil, nil
return tsdb.EOF, nil
}
func (c *Cursor) seekReverse(seek []byte) (key, value []byte) {
func (c *Cursor) seekReverse(seek int64) (key int64, value interface{}) {
for c.index = len(c.items) - 1; c.index >= 0; c.index-- {
if bytes.Compare(c.items[c.index].Key, seek) == 1 { // skip keys greater than seek
if c.items[c.index].Key > seek { // skip keys greater than seek
continue
}
return c.items[c.index].Key, c.items[c.index].Value
}
return nil, nil
return tsdb.EOF, nil
}
// Next returns the next key/value pair.
func (c *Cursor) Next() (key, value []byte) {
if c.direction.Reverse() && c.index < 0 {
return nil, nil
func (c *Cursor) Next() (key int64, value interface{}) {
if !c.ascending && c.index < 0 {
return tsdb.EOF, nil
}
if c.direction.Forward() && c.index >= len(c.items) {
return nil, nil
if c.ascending && c.index >= len(c.items) {
return tsdb.EOF, nil
}
k, v := c.items[c.index].Key, c.items[c.index].Value
if c.direction.Forward() {
if c.ascending {
c.index++
} else {
c.index--
@ -281,15 +281,13 @@ func (c *Cursor) Next() (key, value []byte) {
// Generate returns a randomly generated cursor. Implements quick.Generator.
func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value {
c.index = 0
c.direction = tsdb.Forward
c.ascending = true
c.items = make([]CursorItem, rand.Intn(size))
for i := range c.items {
value, _ := quick.Value(reflect.TypeOf([]byte(nil)), rand)
c.items[i] = CursorItem{
Key: u64tob(uint64(rand.Intn(size))),
Value: value.Interface().([]byte),
Key: rand.Int63n(int64(size)),
Value: rand.Int(),
}
}
@ -310,15 +308,15 @@ func tsdbCursorSlice(a []Cursor) []tsdb.Cursor {
// CursorItem represents a key/value pair in a cursor.
type CursorItem struct {
Key []byte
Value []byte
Key int64
Value int
}
type CursorItems []CursorItem
func (a CursorItems) Len() int { return len(a) }
func (a CursorItems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CursorItems) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 }
func (a CursorItems) Less(i, j int) bool { return a[i].Key < a[j].Key }
// byteSlices represents a sortable slice of byte slices.
type byteSlices [][]byte

View File

@ -121,17 +121,11 @@ func NewEngineOptions() EngineOptions {
type Tx interface {
io.WriterTo
Cursor(series string, direction Direction) Cursor
Size() int64
Commit() error
Rollback() error
}
// Cursor represents an iterator over a series.
type Cursor interface {
Seek(seek []byte) (key, value []byte)
Next() (key, value []byte)
Direction() Direction
Cursor(series string, fields []string, dec *FieldCodec, ascending bool) Cursor
}
// DedupeEntries returns slices with unique keys (the first 8 bytes).

View File

@ -550,32 +550,37 @@ type Tx struct {
engine *Engine
}
// Cursor returns an iterator for a key.
func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
// Retrieve key bucket.
b := tx.Bucket([]byte(key))
// Cursor returns an iterator for a key over a single field.
func (tx *Tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
// Retrieve series bucket.
b := tx.Bucket([]byte(series))
tx.engine.mu.RLock()
defer tx.engine.mu.RUnlock()
// Ignore if there is no bucket or points in the cache.
partitionID := WALPartition([]byte(key))
if b == nil && len(tx.engine.cache[partitionID][key]) == 0 {
partitionID := WALPartition([]byte(series))
if b == nil && len(tx.engine.cache[partitionID][series]) == 0 {
return nil
}
// Retrieve a copy of the in-cache points for the key.
cache := make([][]byte, len(tx.engine.cache[partitionID][key]))
copy(cache, tx.engine.cache[partitionID][key])
// Retrieve a copy of the in-cache points for the series.
cache := make([][]byte, len(tx.engine.cache[partitionID][series]))
copy(cache, tx.engine.cache[partitionID][series])
// Build a cursor that merges the bucket and cache together.
cur := &Cursor{cache: cache, direction: direction}
cur := &Cursor{
cache: cache,
fields: fields,
dec: dec,
ascending: ascending,
}
if b != nil {
cur.cursor = b.Cursor()
}
// If it's a reverse cursor, set the current location to the end.
if direction.Reverse() {
if !ascending {
cur.index = len(cache) - 1
if cur.cursor != nil {
cur.cursor.Last()
@ -592,6 +597,10 @@ type Cursor struct {
key, value []byte
}
// Fields and codec.
fields []string
dec *tsdb.FieldCodec
// Cache and current cache index.
cache [][]byte
index int
@ -600,26 +609,27 @@ type Cursor struct {
prev []byte
// The direction the cursor pointer moves after each call to Next()
direction tsdb.Direction
ascending bool
}
func (c *Cursor) Direction() tsdb.Direction { return c.direction }
func (c *Cursor) Ascending() bool { return c.ascending }
// Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
func (c *Cursor) SeekTo(seek int64) (key int64, value interface{}) {
// Seek bolt cursor.
seekBytes := u64tob(uint64(seek))
if c.cursor != nil {
c.buf.key, c.buf.value = c.cursor.Seek(seek)
c.buf.key, c.buf.value = c.cursor.Seek(seekBytes)
}
// Seek cache index.
c.index = sort.Search(len(c.cache), func(i int) bool {
return bytes.Compare(c.cache[i][0:8], seek) != -1
return bytes.Compare(c.cache[i][0:8], seekBytes) != -1
})
// Search will return an index after the length of cache if the seek value is greater
// than all the values. Clamp it to the end of the cache.
if c.direction.Reverse() && c.index >= len(c.cache) {
if !c.ascending && c.index >= len(c.cache) {
c.index = len(c.cache) - 1
}
@ -628,28 +638,54 @@ func (c *Cursor) Seek(seek []byte) (key, value []byte) {
}
// Next returns the next key/value pair from the cursor.
func (c *Cursor) Next() (key, value []byte) {
func (c *Cursor) Next() (key int64, value interface{}) {
return c.read()
}
// read returns the next key/value in the cursor buffer or cache.
func (c *Cursor) read() (key, value []byte) {
func (c *Cursor) read() (key int64, value interface{}) {
// Continue skipping ahead through duplicate keys in the cache list.
var k, v []byte
for {
if c.direction.Forward() {
key, value = c.readForward()
if c.ascending {
k, v = c.readForward()
} else {
key, value = c.readReverse()
k, v = c.readReverse()
}
// Exit loop if we're at the end of the cache or the next key is different.
if key == nil || !bytes.Equal(key, c.prev) {
if k == nil || !bytes.Equal(k, c.prev) {
break
}
}
c.prev = key
return
// Save key so it's not re-read.
c.prev = k
// Exit if no keys left.
if k == nil {
return tsdb.EOF, nil
}
// Convert key to timestamp.
key = int64(btou64(k))
// Decode fields. Optimize for single field, if possible.
if len(c.fields) == 1 {
decValue, err := c.dec.DecodeByName(c.fields[0], v)
if err != nil {
return key, nil
}
return key, decValue
} else if len(c.fields) > 1 {
m, err := c.dec.DecodeFieldsWithNames(v)
if err != nil {
return key, nil
}
return key, m
} else {
return key, nil
}
}
// readForward returns the next key/value from the cursor and moves the current location forward.
@ -757,6 +793,9 @@ func u64tob(v uint64) []byte {
return b
}
// btou64 converts an 8-byte slice to a uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
// byteSlices represents a sortable slice of byte slices.
type byteSlices [][]byte

View File

@ -1,7 +1,6 @@
package b1_test
import (
"bytes"
"encoding/binary"
"io/ioutil"
"math"
@ -66,16 +65,14 @@ func TestEngine_WritePoints(t *testing.T) {
tx := e.MustBegin(false)
defer tx.Rollback()
c := tx.Cursor("temperature", tsdb.Forward)
if k, v := c.Seek([]byte{0}); !bytes.Equal(k, u64tob(uint64(time.Unix(1434059627, 0).UnixNano()))) {
c := tx.Cursor("temperature", []string{"value"}, mf.Codec, true)
if k, v := c.SeekTo(0); k != 1434059627000000000 {
t.Fatalf("unexpected key: %#v", k)
} else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil {
t.Fatal(err)
} else if m["value"] != float64(200) {
t.Errorf("unexpected value: %#v", m)
} else if v == nil || v.(float64) != 200 {
t.Errorf("unexpected value: %#v", v)
}
if k, v := c.Next(); k != nil {
if k, v := c.Next(); k != tsdb.EOF {
t.Fatalf("unexpected key/value: %#v / %#v", k, v)
}
}
@ -131,18 +128,16 @@ func TestEngine_WritePoints_Reverse(t *testing.T) {
tx := e.MustBegin(false)
defer tx.Rollback()
c := tx.Cursor("temperature", tsdb.Reverse)
if k, _ := c.Seek(u64tob(math.MaxInt64)); !bytes.Equal(k, u64tob(uint64(time.Unix(1, 0).UnixNano()))) {
t.Fatalf("unexpected key: %v", btou64(k))
} else if k, v := c.Next(); !bytes.Equal(k, u64tob(uint64(time.Unix(0, 0).UnixNano()))) {
t.Fatalf("unexpected key: %#v", k)
} else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil {
t.Fatal(err)
} else if m["value"] != float64(100) {
t.Errorf("unexpected value: %#v", m)
c := tx.Cursor("temperature", []string{"value"}, mf.Codec, false)
if k, _ := c.SeekTo(math.MaxInt64); k != time.Unix(1, 0).UnixNano() {
t.Fatalf("unexpected key: %v", k)
} else if k, v := c.Next(); k != time.Unix(0, 0).UnixNano() {
t.Fatalf("unexpected key: %v", k)
} else if v == nil || v.(float64) != 100 {
t.Errorf("unexpected value: %#v", v)
}
if k, v := c.Next(); k != nil {
if k, v := c.Next(); k != tsdb.EOF {
t.Fatalf("unexpected key/value: %#v / %#v", k, v)
}
}

View File

@ -74,7 +74,7 @@ type WAL interface {
WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
DeleteSeries(keys []string) error
Cursor(key string, direction tsdb.Direction) tsdb.Cursor
Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
Open() error
Close() error
Flush() error
@ -623,25 +623,27 @@ type Tx struct {
}
// Cursor returns an iterator for a key.
func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
walCursor := tx.wal.Cursor(key, direction)
func (tx *Tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
walCursor := tx.wal.Cursor(series, fields, dec, ascending)
// Retrieve points bucket. Ignore if there is no bucket.
b := tx.Bucket([]byte("points")).Bucket([]byte(key))
b := tx.Bucket([]byte("points")).Bucket([]byte(series))
if b == nil {
return walCursor
}
c := &Cursor{
cursor: b.Cursor(),
direction: direction,
fields: fields,
dec: dec,
ascending: ascending,
}
if direction.Reverse() {
if !ascending {
c.last()
}
return tsdb.MultiCursor(direction, walCursor, c)
return tsdb.MultiCursor(walCursor, c)
}
// Cursor provides ordered iteration across a series.
@ -649,9 +651,12 @@ type Cursor struct {
cursor *bolt.Cursor
buf []byte // uncompressed buffer
off int // buffer offset
direction tsdb.Direction
ascending bool
fieldIndices []int
index int
fields []string
dec *tsdb.FieldCodec
}
func (c *Cursor) last() {
@ -659,26 +664,28 @@ func (c *Cursor) last() {
c.setBuf(v)
}
func (c *Cursor) Direction() tsdb.Direction { return c.direction }
func (c *Cursor) Ascending() bool { return c.ascending }
// Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
func (c *Cursor) SeekTo(seek int64) (key int64, value interface{}) {
seekBytes := u64tob(uint64(seek))
// Move cursor to appropriate block and set to buffer.
k, v := c.cursor.Seek(seek)
k, v := c.cursor.Seek(seekBytes)
if v == nil { // get the last block, it might have this time
_, v = c.cursor.Last()
} else if bytes.Compare(seek, k) == -1 { // the seek key is less than this block, go back one and check
} else if seek < int64(btou64(k)) { // the seek key is less than this block, go back one and check
_, v = c.cursor.Prev()
// if the previous block max time is less than the seek value, reset to where we were originally
if v == nil || bytes.Compare(seek, v[0:8]) > 0 {
_, v = c.cursor.Seek(seek)
if v == nil || seek > int64(btou64(v[0:8])) {
_, v = c.cursor.Seek(seekBytes)
}
}
c.setBuf(v)
// Read current block up to seek position.
c.seekBuf(seek)
c.seekBuf(seekBytes)
// Return current entry.
return c.read()
@ -695,13 +702,13 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) {
return
}
if c.direction.Forward() && bytes.Compare(buf[0:8], seek) != -1 {
if c.ascending && bytes.Compare(buf[0:8], seek) != -1 {
return
} else if c.direction.Reverse() && bytes.Compare(buf[0:8], seek) != 1 {
} else if !c.ascending && bytes.Compare(buf[0:8], seek) != 1 {
return
}
if c.direction.Forward() {
if c.ascending {
// Otherwise skip ahead to the next entry.
c.off += entryHeaderSize + entryDataSize(buf)
} else {
@ -715,13 +722,13 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) {
}
// Next returns the next key/value pair from the cursor.
func (c *Cursor) Next() (key, value []byte) {
func (c *Cursor) Next() (key int64, value interface{}) {
// Ignore if there is no buffer.
if len(c.buf) == 0 {
return nil, nil
return tsdb.EOF, nil
}
if c.direction.Forward() {
if c.ascending {
// Move forward to next entry.
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
} else {
@ -762,7 +769,7 @@ func (c *Cursor) setBuf(block []byte) {
log.Printf("block decode error: %s", err)
}
if c.direction.Forward() {
if c.ascending {
c.buf, c.off = buf, 0
} else {
c.buf, c.off = buf, 0
@ -789,16 +796,17 @@ func (c *Cursor) setBuf(block []byte) {
}
// read reads the current key and value from the current block.
func (c *Cursor) read() (key, value []byte) {
func (c *Cursor) read() (key int64, value interface{}) {
// Return nil if the offset is at the end of the buffer.
if c.off >= len(c.buf) {
return nil, nil
return tsdb.EOF, nil
}
// Otherwise read the current entry.
buf := c.buf[c.off:]
dataSize := entryDataSize(buf)
return buf[0:8], buf[entryHeaderSize : entryHeaderSize+dataSize]
return wal.DecodeKeyValue(c.fields, c.dec, buf[0:8], buf[entryHeaderSize:entryHeaderSize+dataSize])
}
// MarshalEntry encodes point data into a single byte slice.

View File

@ -1,39 +1,23 @@
package bz1_test
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"testing"
"testing/quick"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/bz1"
"github.com/influxdb/influxdb/tsdb/engine/wal"
)
var QuickConfig quick.Config
func init() {
// Limit the number of iterations on CI so it doesn't take so long.
if os.Getenv("CI") == "true" {
QuickConfig.MaxCount = 10
fmt.Fprintf(os.Stderr, "Limiting quickchecks to %d iterations (CI)\n", QuickConfig.MaxCount)
}
}
// Ensure the engine can write series metadata and reload it.
func TestEngine_LoadMetadataIndex_Series(t *testing.T) {
e := OpenDefaultEngine()
@ -159,14 +143,19 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Create codec.
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {ID: uint8(1), Name: "value", Type: influxql.Float},
})
// Append points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(1), 0x10),
append(u64tob(2), 0x20),
append(u64tob(1), MustEncodeFields(codec, models.Fields{"value": float64(10)})...),
append(u64tob(2), MustEncodeFields(codec, models.Fields{"value": float64(20)})...),
},
"mem": [][]byte{
append(u64tob(0), 0x30),
append(u64tob(0), MustEncodeFields(codec, models.Fields{"value": float64(30)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -177,20 +166,20 @@ func TestEngine_WriteIndex_Append(t *testing.T) {
defer tx.Rollback()
// Iterate over "cpu" series.
c := tx.Cursor("cpu", tsdb.Forward)
if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 1}) || !reflect.DeepEqual(v, []byte{0x10}) {
c := tx.Cursor("cpu", []string{"value"}, codec, true)
if k, v := c.SeekTo(0); k != 1 || v.(float64) != float64(10) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 2}) || !reflect.DeepEqual(v, []byte{0x20}) {
} else if k, v = c.Next(); k != 2 || v.(float64) != float64(20) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, _ = c.Next(); k != nil {
} else if k, _ = c.Next(); k != tsdb.EOF {
t.Fatalf("unexpected key/value: %x / %x", k, v)
}
// Iterate over "mem" series.
c = tx.Cursor("mem", tsdb.Forward)
if k, v := c.Seek(u64tob(0)); !reflect.DeepEqual(k, []byte{0, 0, 0, 0, 0, 0, 0, 0}) || !reflect.DeepEqual(v, []byte{0x30}) {
c = tx.Cursor("mem", []string{"value"}, codec, true)
if k, v := c.SeekTo(0); k != 0 || v.(float64) != float64(30) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, _ = c.Next(); k != nil {
} else if k, _ = c.Next(); k != tsdb.EOF {
t.Fatalf("unexpected key/value: %x / %x", k, v)
}
}
@ -200,12 +189,17 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Create codec.
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {ID: uint8(1), Name: "value", Type: influxql.Float},
})
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(10), 0x10),
append(u64tob(20), 0x20),
append(u64tob(30), 0x30),
append(u64tob(10), MustEncodeFields(codec, models.Fields{"value": float64(10)})...),
append(u64tob(20), MustEncodeFields(codec, models.Fields{"value": float64(20)})...),
append(u64tob(30), MustEncodeFields(codec, models.Fields{"value": float64(30)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -214,10 +208,10 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write overlapping points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(9), 0x09),
append(u64tob(10), 0xFF),
append(u64tob(25), 0x25),
append(u64tob(31), 0x31),
append(u64tob(9), MustEncodeFields(codec, models.Fields{"value": float64(9)})...),
append(u64tob(10), MustEncodeFields(codec, models.Fields{"value": float64(255)})...),
append(u64tob(25), MustEncodeFields(codec, models.Fields{"value": float64(25)})...),
append(u64tob(31), MustEncodeFields(codec, models.Fields{"value": float64(31)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -226,7 +220,7 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
// Write overlapping points to index again.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(31), 0xFF),
append(u64tob(31), MustEncodeFields(codec, models.Fields{"value": float64(255)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -237,18 +231,18 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
defer tx.Rollback()
// Iterate over "cpu" series.
c := tx.Cursor("cpu", tsdb.Forward)
if k, v := c.Seek(u64tob(0)); btou64(k) != 9 || !bytes.Equal(v, []byte{0x09}) {
c := tx.Cursor("cpu", []string{"value"}, codec, true)
if k, v := c.SeekTo(0); k != 9 || v.(float64) != float64(9) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 10 || !bytes.Equal(v, []byte{0xFF}) {
} else if k, v = c.Next(); k != 10 || v.(float64) != float64(255) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 20 || !bytes.Equal(v, []byte{0x20}) {
} else if k, v = c.Next(); k != 20 || v.(float64) != float64(20) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 25 || !bytes.Equal(v, []byte{0x25}) {
} else if k, v = c.Next(); k != 25 || v.(float64) != float64(25) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 30 || !bytes.Equal(v, []byte{0x30}) {
} else if k, v = c.Next(); k != 30 || v.(float64) != float64(30) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 31 || !bytes.Equal(v, []byte{0xFF}) {
} else if k, v = c.Next(); k != 31 || v.(float64) != float64(255) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
}
}
@ -258,12 +252,17 @@ func TestEngine_Cursor_Reverse(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Create codec.
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {ID: uint8(1), Name: "value", Type: influxql.Float},
})
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(10), 0x10),
append(u64tob(20), 0x20),
append(u64tob(30), 0x30),
append(u64tob(10), MustEncodeFields(codec, models.Fields{"value": float64(10)})...),
append(u64tob(20), MustEncodeFields(codec, models.Fields{"value": float64(20)})...),
append(u64tob(30), MustEncodeFields(codec, models.Fields{"value": float64(30)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -272,10 +271,10 @@ func TestEngine_Cursor_Reverse(t *testing.T) {
// Write overlapping points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(9), 0x09),
append(u64tob(10), 0xFF),
append(u64tob(25), 0x25),
append(u64tob(31), 0x31),
append(u64tob(9), MustEncodeFields(codec, models.Fields{"value": float64(9)})...),
append(u64tob(10), MustEncodeFields(codec, models.Fields{"value": float64(255)})...),
append(u64tob(25), MustEncodeFields(codec, models.Fields{"value": float64(25)})...),
append(u64tob(31), MustEncodeFields(codec, models.Fields{"value": float64(31)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -284,7 +283,7 @@ func TestEngine_Cursor_Reverse(t *testing.T) {
// Write overlapping points to index again.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(31), 0xFF),
append(u64tob(31), MustEncodeFields(codec, models.Fields{"value": float64(255)})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -295,18 +294,18 @@ func TestEngine_Cursor_Reverse(t *testing.T) {
defer tx.Rollback()
// Iterate over "cpu" series.
c := tx.Cursor("cpu", tsdb.Reverse)
if k, v := c.Seek(u64tob(math.MaxUint64)); btou64(k) != 31 || !bytes.Equal(v, []byte{0xFF}) {
c := tx.Cursor("cpu", []string{"value"}, codec, false)
if k, v := c.SeekTo(math.MaxInt64); k != 31 || v.(float64) != float64(255) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 30 || !bytes.Equal(v, []byte{0x30}) {
} else if k, v = c.Next(); k != 30 || v.(float64) != float64(30) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 25 || !bytes.Equal(v, []byte{0x25}) {
} else if k, v = c.Next(); k != 25 || v.(float64) != float64(25) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 20 || !bytes.Equal(v, []byte{0x20}) {
} else if k, v = c.Next(); k != 20 || v.(float64) != float64(20) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Next(); btou64(k) != 10 || !bytes.Equal(v, []byte{0xFF}) {
} else if k, v = c.Next(); k != 10 || v.(float64) != float64(255) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
} else if k, v = c.Seek(u64tob(0)); btou64(k) != 9 || !bytes.Equal(v, []byte{0x09}) {
} else if k, v = c.SeekTo(0); k != 9 || v.(float64) != float64(9) {
t.Fatalf("unexpected key/value: %x / %x", k, v)
}
}
@ -316,16 +315,22 @@ func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Create codec.
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {ID: uint8(1), Name: "value", Type: influxql.String},
})
// make sure we have data split across two blocks
dataSize := (bz1.DefaultBlockSize - 16) / 2
data := make([]byte, dataSize, dataSize)
data := strings.Repeat("*", dataSize)
// Write initial points to index.
if err := e.WriteIndex(map[string][][]byte{
"cpu": [][]byte{
append(u64tob(10), data...),
append(u64tob(20), data...),
append(u64tob(30), data...),
append(u64tob(40), data...),
append(u64tob(10), MustEncodeFields(codec, models.Fields{"value": data})...),
append(u64tob(20), MustEncodeFields(codec, models.Fields{"value": data})...),
append(u64tob(30), MustEncodeFields(codec, models.Fields{"value": data})...),
append(u64tob(40), MustEncodeFields(codec, models.Fields{"value": data})...),
},
}, nil, nil); err != nil {
t.Fatal(err)
@ -336,13 +341,13 @@ func TestEngine_WriteIndex_SeekAgainstInBlockValue(t *testing.T) {
defer tx.Rollback()
// Ensure that we can seek to a block in the middle
c := tx.Cursor("cpu", tsdb.Forward)
if k, _ := c.Seek(u64tob(15)); btou64(k) != 20 {
t.Fatalf("expected to seek to time 20, but got %d", btou64(k))
c := tx.Cursor("cpu", []string{"value"}, codec, true)
if k, _ := c.SeekTo(15); k != 20 {
t.Fatalf("expected to seek to time 20, but got %d", k)
}
// Ensure that we can seek to the block on the end
if k, _ := c.Seek(u64tob(35)); btou64(k) != 40 {
t.Fatalf("expected to seek to time 40, but got %d", btou64(k))
if k, _ := c.SeekTo(35); k != 40 {
t.Fatalf("expected to seek to time 40, but got %d", k)
}
}
@ -364,156 +369,6 @@ func TestEngine_WriteIndex_NoPoints(t *testing.T) {
}
}
// Ensure the engine can accept randomly generated points.
func TestEngine_WriteIndex_Quick(t *testing.T) {
if testing.Short() {
t.Skip("short mode")
}
quick.Check(func(sets []Points, blockSize uint) bool {
e := OpenDefaultEngine()
e.BlockSize = int(blockSize % 1024) // 1KB max block size
defer e.Close()
// Write points to index in multiple sets.
for _, set := range sets {
if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil {
t.Fatal(err)
}
}
// Merge all points together.
points := MergePoints(sets)
// Retrieve a sorted list of keys so results are deterministic.
keys := points.Keys()
// Start transaction to read index.
tx := e.MustBegin(false)
defer tx.Rollback()
// Iterate over results to ensure they are correct.
for _, key := range keys {
c := tx.Cursor(key, tsdb.Forward)
// Read list of key/values.
var got [][]byte
for k, v := c.Seek(u64tob(0)); k != nil; k, v = c.Next() {
got = append(got, append(copyBytes(k), v...))
}
if !reflect.DeepEqual(got, points[key]) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
}
}
return true
}, &QuickConfig)
}
// Ensure the engine can accept randomly generated append-only points.
func TestEngine_WriteIndex_Quick_Append(t *testing.T) {
if testing.Short() {
t.Skip("short mode")
}
quick.Check(func(sets appendPointSets, blockSize uint) bool {
e := OpenDefaultEngine()
e.BlockSize = int(blockSize % 1024) // 1KB max block size
defer e.Close()
// Write points to index in multiple sets.
for _, set := range sets {
if err := e.WriteIndex(map[string][][]byte(set), nil, nil); err != nil {
t.Fatal(err)
}
}
// Merge all points together.
points := MergePoints([]Points(sets))
// Retrieve a sorted list of keys so results are deterministic.
keys := points.Keys()
// Start transaction to read index.
tx := e.MustBegin(false)
defer tx.Rollback()
// Iterate over results to ensure they are correct.
for _, key := range keys {
c := tx.Cursor(key, tsdb.Forward)
// Read list of key/values.
var got [][]byte
for k, v := c.Seek(u64tob(0)); k != nil; k, v = c.Next() {
got = append(got, append(copyBytes(k), v...))
}
if !reflect.DeepEqual(got, points[key]) {
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, points[key])
}
}
return true
}, &QuickConfig)
}
func BenchmarkEngine_WriteIndex_512b(b *testing.B) { benchmarkEngine_WriteIndex(b, 512) }
func BenchmarkEngine_WriteIndex_1KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 1*1024) }
func BenchmarkEngine_WriteIndex_4KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 4*1024) }
func BenchmarkEngine_WriteIndex_16KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 16*1024) }
func BenchmarkEngine_WriteIndex_32KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 32*1024) }
func BenchmarkEngine_WriteIndex_64KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 64*1024) }
func BenchmarkEngine_WriteIndex_128KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 128*1024) }
func BenchmarkEngine_WriteIndex_256KB(b *testing.B) { benchmarkEngine_WriteIndex(b, 256*1024) }
func benchmarkEngine_WriteIndex(b *testing.B, blockSize int) {
// Skip small iterations.
if b.N < 1000000 {
return
}
// Create a simple engine.
e := OpenDefaultEngine()
e.BlockSize = blockSize
defer e.Close()
// Create codec.
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
"value": {
ID: uint8(1),
Name: "value",
Type: influxql.Float,
},
})
// Generate points.
a := make(map[string][][]byte)
a["cpu"] = make([][]byte, b.N)
for i := 0; i < b.N; i++ {
a["cpu"][i] = wal.MarshalEntry(int64(i), MustEncodeFields(codec, models.Fields{"value": float64(i)}))
}
b.ResetTimer()
// Insert into engine.
if err := e.WriteIndex(a, nil, nil); err != nil {
b.Fatal(err)
}
// Calculate on-disk size per point.
bs, _ := e.SeriesBucketStats("cpu")
stats, err := e.Stats()
if err != nil {
b.Fatal(err)
}
b.Logf("pts=%9d bytes/pt=%4.01f leaf-util=%3.0f%%",
b.N,
float64(stats.Size)/float64(b.N),
(float64(bs.LeafInuse)/float64(bs.LeafAlloc))*100.0,
)
}
// Engine represents a test wrapper for bz1.Engine.
type Engine struct {
*bz1.Engine
@ -583,96 +438,22 @@ func (w *EnginePointsWriter) Open() error { return nil }
func (w *EnginePointsWriter) Close() error { return nil }
func (w *EnginePointsWriter) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
return &Cursor{direction: direction}
func (w *EnginePointsWriter) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
return &Cursor{ascending: ascending}
}
func (w *EnginePointsWriter) Flush() error { return nil }
// Cursor represents a mock that implements tsdb.Curosr.
type Cursor struct {
direction tsdb.Direction
ascending bool
}
func (c *Cursor) Direction() tsdb.Direction { return c.direction }
func (c *Cursor) Ascending() bool { return c.ascending }
func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil }
func (c *Cursor) SeekTo(key int64) (int64, interface{}) { return tsdb.EOF, nil }
func (c *Cursor) Next() ([]byte, []byte) { return nil, nil }
// Points represents a set of encoded points by key. Implements quick.Generator.
type Points map[string][][]byte
// Keys returns a sorted list of keys.
func (m Points) Keys() []string {
var keys []string
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func (Points) Generate(rand *rand.Rand, size int) reflect.Value {
return reflect.ValueOf(Points(GeneratePoints(rand, size,
rand.Intn(size),
func(_ int) time.Time { return time.Unix(0, 0).Add(time.Duration(rand.Intn(100))) },
)))
}
// appendPointSets represents sets of sequential points. Implements quick.Generator.
type appendPointSets []Points
func (appendPointSets) Generate(rand *rand.Rand, size int) reflect.Value {
sets := make([]Points, 0)
for i, n := 0, rand.Intn(size); i < n; i++ {
sets = append(sets, GeneratePoints(rand, size,
rand.Intn(size),
func(j int) time.Time {
return time.Unix(0, 0).Add((time.Duration(i) * time.Second) + (time.Duration(j) * time.Nanosecond))
},
))
}
return reflect.ValueOf(appendPointSets(sets))
}
func GeneratePoints(rand *rand.Rand, size, seriesN int, timestampFn func(int) time.Time) Points {
// Generate series with a random number of points in each.
m := make(Points)
for i := 0; i < seriesN; i++ {
key := strconv.Itoa(i)
// Generate points for the series.
for j, pointN := 0, rand.Intn(size); j < pointN; j++ {
timestamp := timestampFn(j)
data, ok := quick.Value(reflect.TypeOf([]byte(nil)), rand)
if !ok {
panic("cannot generate data")
}
m[key] = append(m[key], bz1.MarshalEntry(timestamp.UnixNano(), data.Interface().([]byte)))
}
}
return m
}
// MergePoints returns a map of all points merged together by key.
// Later points will overwrite earlier ones.
func MergePoints(a []Points) Points {
// Combine all points into one set.
m := make(Points)
for _, set := range a {
for key, values := range set {
m[key] = append(m[key], values...)
}
}
// Dedupe points.
for key, values := range m {
m[key] = tsdb.DedupeEntries(values)
}
return m
}
func (c *Cursor) Next() (int64, interface{}) { return tsdb.EOF, nil }
// MustEncodeFields encodes fields with codec. Panic on error.
func MustEncodeFields(codec *tsdb.FieldCodec, fields models.Fields) []byte {

View File

@ -255,11 +255,10 @@ func (l *Log) DiskSize() (int64, error) {
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
l.mu.RLock()
defer l.mu.RUnlock()
return l.partition.cursor(key, direction)
return l.partition.cursor(series, fields, dec, ascending)
}
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
@ -1215,11 +1214,11 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
}
// cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key
func (p *Partition) cursor(key string, direction tsdb.Direction) *cursor {
func (p *Partition) cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) *cursor {
p.mu.Lock()
defer p.mu.Unlock()
entry := p.cache[key]
entry := p.cache[series]
if entry == nil {
entry = &cacheEntry{}
}
@ -1227,13 +1226,13 @@ func (p *Partition) cursor(key string, direction tsdb.Direction) *cursor {
// if we're in the middle of a flush, combine the previous cache
// with this one for the cursor
if p.flushCache != nil {
if fc, ok := p.flushCache[key]; ok {
if fc, ok := p.flushCache[series]; ok {
c := make([][]byte, len(fc), len(fc)+len(entry.points))
copy(c, fc)
c = append(c, entry.points...)
dedupe := tsdb.DedupeEntries(c)
return newCursor(dedupe, direction)
return newCursor(dedupe, fields, dec, ascending)
}
}
@ -1242,10 +1241,11 @@ func (p *Partition) cursor(key string, direction tsdb.Direction) *cursor {
entry.isDirtySort = false
}
// build a copy so modifications to the partition don't change the result set
// Build a copy so modifications to the partition don't change the result set
a := make([][]byte, len(entry.points))
copy(a, entry.points)
return newCursor(a, direction)
return newCursor(a, fields, dec, ascending)
}
// idFromFileName parses the segment file ID from its name
@ -1408,51 +1408,62 @@ type entry struct {
type cursor struct {
cache [][]byte
position int
direction tsdb.Direction
ascending bool
fields []string
dec *tsdb.FieldCodec
}
func newCursor(cache [][]byte, direction tsdb.Direction) *cursor {
func newCursor(cache [][]byte, fields []string, dec *tsdb.FieldCodec, ascending bool) *cursor {
// position is set such that a call to Next will successfully advance
// to the next postion and return the value.
c := &cursor{cache: cache, direction: direction, position: -1}
if direction.Reverse() {
c := &cursor{
cache: cache,
ascending: ascending,
position: -1,
fields: fields,
dec: dec,
}
if !ascending {
c.position = len(c.cache)
}
return c
}
func (c *cursor) Direction() tsdb.Direction { return c.direction }
func (c *cursor) Ascending() bool { return c.ascending }
// Seek will point the cursor to the given time (or key)
func (c *cursor) Seek(seek []byte) (key, value []byte) {
func (c *cursor) SeekTo(seek int64) (key int64, value interface{}) {
seekBytes := u64tob(uint64(seek))
// Seek cache index
c.position = sort.Search(len(c.cache), func(i int) bool {
return bytes.Compare(c.cache[i][0:8], seek) != -1
return bytes.Compare(c.cache[i][0:8], seekBytes) != -1
})
// If seek is not in the cache, return the last value in the cache
if c.direction.Reverse() && c.position >= len(c.cache) {
if !c.ascending && c.position >= len(c.cache) {
c.position = len(c.cache)
}
// Make sure our position points to something in the cache
if c.position < 0 || c.position >= len(c.cache) {
return nil, nil
return tsdb.EOF, nil
}
v := c.cache[c.position]
if v == nil {
return nil, nil
return tsdb.EOF, nil
}
return v[0:8], v[8:]
return DecodeKeyValue(c.fields, c.dec, v[0:8], v[8:])
}
// Next moves the cursor to the next key/value. will return nil if at the end
func (c *cursor) Next() (key, value []byte) {
func (c *cursor) Next() (key int64, value interface{}) {
var v []byte
if c.direction.Forward() {
if c.ascending {
v = c.nextForward()
} else {
v = c.nextReverse()
@ -1460,11 +1471,11 @@ func (c *cursor) Next() (key, value []byte) {
// Iterated past the end of the cursor
if v == nil {
return nil, nil
return tsdb.EOF, nil
}
// Split v into key/value
return v[0:8], v[8:]
return DecodeKeyValue(c.fields, c.dec, v[0:8], v[8:])
}
// nextForward advances the cursor forward returning the next value
@ -1575,3 +1586,27 @@ func u64tob(v uint64) []byte {
func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// DecodeKeyValue decodes the key and value from bytes.
func DecodeKeyValue(fields []string, dec *tsdb.FieldCodec, k, v []byte) (key int64, value interface{}) {
// Convert key to a timestamp.
key = int64(btou64(k[0:8]))
// Decode values. Optimize for single field.
switch len(fields) {
case 0:
return
case 1:
decValue, err := dec.DecodeByName(fields[0], v)
if err != nil {
return
}
return key, decValue
default:
m, err := dec.DecodeFieldsWithNames(v)
if err != nil {
return
}
return key, m
}
}

View File

@ -2,7 +2,6 @@ package wal
import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
@ -33,7 +32,7 @@ func TestWAL_WritePoints(t *testing.T) {
},
})
// test that we can write to two different series
// Test that we can write to two different series
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
p2 := parsePoint("cpu,host=A value=25.3 4", codec)
p3 := parsePoint("cpu,host=B value=1.0 1", codec)
@ -41,27 +40,27 @@ func TestWAL_WritePoints(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Seek(inttob(1))
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
k, v := c.SeekTo(1)
// ensure the series are there and points are in order
if bytes.Compare(v, p1.Data()) != 0 {
if v.(float64) != 23.2 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
if v.(float64) != 25.3 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
k, v = c.Next()
if k != nil {
if k != tsdb.EOF {
t.Fatalf("expected nil on last seek: %v %v", k, v)
}
c = log.Cursor("cpu,host=B", tsdb.Forward)
c = log.Cursor("cpu,host=B", []string{"value"}, codec, true)
k, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
if v.(float64) != 1.0 {
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
@ -98,19 +97,16 @@ func TestWAL_WritePoints(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
k, v = c.Next()
if bytes.Compare(v, p6.Data()) != 0 {
c = log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 1.3 {
t.Fatal("order wrong, expected p6")
}
_, v = c.Next()
if bytes.Compare(v, p4.Data()) != 0 {
if _, v := c.Next(); v.(float64) != 1.0 {
t.Fatal("order wrong, expected p6")
}
c = log.Cursor("cpu,host=C", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p5.Data()) != 0 {
c = log.Cursor("cpu,host=C", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 1.4 {
t.Fatal("order wrong, expected p6")
}
@ -156,17 +152,14 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 23.2 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
if _, v := c.Next(); v.(float64) != 25.3 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
if _, v := c.Next(); v != nil {
t.Fatal("expected cursor to return nil")
}
@ -184,8 +177,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
log.Open()
p := points[0]
if len(p["cpu,host=A"]) != 2 {
if p := points[0]; len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A")
}
@ -195,9 +187,8 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
t.Fatalf("failed to write point: %s", err.Error())
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
c = log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 29.2 {
t.Fatal("p3 value wrong")
}
@ -205,8 +196,7 @@ func TestWAL_CorruptDataLengthSize(t *testing.T) {
points = make([]map[string][][]byte, 0)
log.Open()
p = points[0]
if len(p["cpu,host=A"]) != 1 {
if p := points[0]; len(p["cpu,host=A"]) != 1 {
t.Fatal("expected two points for cpu,host=A")
}
}
@ -235,17 +225,14 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
_, v := c.Next()
if bytes.Compare(v, p1.Data()) != 0 {
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 23.2 {
t.Fatal("p1 value wrong")
}
_, v = c.Next()
if bytes.Compare(v, p2.Data()) != 0 {
if _, v := c.Next(); v.(float64) != 25.3 {
t.Fatal("p2 value wrong")
}
_, v = c.Next()
if v != nil {
if _, v := c.Next(); v != nil {
t.Fatal("expected cursor to return nil")
}
@ -268,9 +255,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
}}
log.Open()
p := points[0]
if len(p["cpu,host=A"]) != 2 {
if p := points[0]; len(p["cpu,host=A"]) != 2 {
t.Fatal("expected two points for cpu,host=A")
}
@ -280,9 +265,8 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
t.Fatalf("failed to write point: %s", err.Error())
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
_, v = c.Next()
if bytes.Compare(v, p3.Data()) != 0 {
c = log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if _, v := c.Next(); v.(float64) != 29.2 {
t.Fatal("p3 value wrong", p3.Data(), v)
}
@ -290,8 +274,7 @@ func TestWAL_CorruptDataBlock(t *testing.T) {
points = make([]map[string][][]byte, 0)
log.Open()
p = points[0]
if len(p["cpu,host=A"]) != 1 {
if p := points[0]; len(p["cpu,host=A"]) != 1 {
t.Fatal("expected two points for cpu,host=A")
}
}
@ -341,9 +324,9 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
}
// ensure we have some data
c := log.Cursor("cpu,host=A,region=uswest10", tsdb.Forward)
c := log.Cursor("cpu,host=A,region=uswest10", []string{"value"}, codec, true)
k, _ := c.Next()
if btou64(k) != 1 {
if k != 1 {
t.Fatalf("expected first data point but got one with key: %v", k)
}
@ -524,13 +507,13 @@ func TestWAL_DeleteSeries(t *testing.T) {
}
// ensure data is there
c := log.Cursor("cpu,host=A", tsdb.Forward)
if k, _ := c.Next(); btou64(k) != 1 {
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if k, _ := c.Next(); k != 1 {
t.Fatal("expected data point for cpu,host=A")
}
c = log.Cursor("cpu,host=B", tsdb.Forward)
if k, _ := c.Next(); btou64(k) != 2 {
c = log.Cursor("cpu,host=B", []string{"value"}, codec, true)
if k, _ := c.Next(); k != 2 {
t.Fatal("expected data point for cpu,host=B")
}
@ -546,14 +529,14 @@ func TestWAL_DeleteSeries(t *testing.T) {
if len(points["cpu,host=B"]) != 0 {
t.Fatal("expected cpu,host=B to have no points in index")
}
c = log.Cursor("cpu,host=A", tsdb.Forward)
if k, _ := c.Next(); k != nil {
c = log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if k, _ := c.Next(); k != tsdb.EOF {
t.Fatal("expected data to be out of the cache cpu,host=A")
}
// ensure series is deleted
c = log.Cursor("cpu,host=B", tsdb.Forward)
if k, _ := c.Next(); k != nil {
c = log.Cursor("cpu,host=B", []string{"value"}, codec, true)
if k, _ := c.Next(); k != tsdb.EOF {
t.Fatal("expected no data for cpu,host=B")
}
@ -618,10 +601,10 @@ func TestWAL_QueryDuringCompaction(t *testing.T) {
}
verify := func() {
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, v := c.Seek(inttob(1))
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
k, v := c.SeekTo(1)
// ensure the series are there and points are in order
if bytes.Compare(v, p1.Data()) != 0 {
if v.(float64) != 23.2 {
<-finishCompaction
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
}
@ -664,21 +647,17 @@ func TestWAL_PointsSorted(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c := log.Cursor("cpu,host=A", tsdb.Forward)
k, _ := c.Next()
if btou64(k) != 1 {
c := log.Cursor("cpu,host=A", []string{"value"}, codec, true)
if k, _ := c.Next(); k != 1 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 2 {
if k, _ := c.Next(); k != 2 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 4 {
if k, _ := c.Next(); k != 4 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 6 {
if k, _ := c.Next(); k != 6 {
t.Fatal("points out of order")
}
}
@ -709,21 +688,18 @@ func TestWAL_Cursor_Reverse(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}
c := log.Cursor("cpu,host=A", tsdb.Reverse)
c := log.Cursor("cpu,host=A", []string{"value"}, codec, false)
k, _ := c.Next()
if btou64(k) != 6 {
if k != 6 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 4 {
if k, _ := c.Next(); k != 4 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 2 {
if k, _ := c.Next(); k != 2 {
t.Fatal("points out of order")
}
k, _ = c.Next()
if btou64(k) != 1 {
if k, _ := c.Next(); k != 1 {
t.Fatal("points out of order")
}
}
@ -762,9 +738,3 @@ func parsePoints(buf string, codec *tsdb.FieldCodec) []models.Point {
func parsePoint(buf string, codec *tsdb.FieldCodec) models.Point {
return parsePoints(buf, codec)[0]
}
func inttob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}

View File

@ -27,39 +27,6 @@ type Executor interface {
Execute() <-chan *models.Row
}
// Mapper is the interface all Mapper types must implement.
type Mapper interface {
Open() error
SetRemote(m Mapper) error
TagSets() []string
Fields() []string
NextChunk() (interface{}, error)
Close()
}
// StatefulMapper encapsulates a Mapper and some state that the executor needs to
// track for that mapper.
type StatefulMapper struct {
Mapper
bufferedChunk *MapperOutput // Last read chunk.
drained bool
}
// NextChunk wraps a RawMapper and some state.
func (sm *StatefulMapper) NextChunk() (*MapperOutput, error) {
c, err := sm.Mapper.NextChunk()
if err != nil {
return nil, err
}
chunk, ok := c.(*MapperOutput)
if !ok {
if chunk == interface{}(nil) {
return nil, nil
}
}
return chunk, nil
}
type SelectExecutor struct {
stmt *influxql.SelectStatement
mappers []*StatefulMapper

View File

@ -36,7 +36,7 @@ type reduceFunc func([]interface{}) interface{}
// UnmarshalFunc represents a function that can take bytes from a mapper from remote
// server and marshal it into an interface the reducer can use
type unmarshalFunc func([]byte) (interface{}, error)
type UnmarshalFunc func([]byte) (interface{}, error)
// initializemapFunc takes an aggregate call from the query and returns the mapFunc
func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
@ -149,7 +149,7 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
}
}
func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) {
func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
// if c is nil it's a raw data query
if c == nil {
return func(b []byte) (interface{}, error) {

File diff suppressed because it is too large Load Diff

View File

@ -416,7 +416,7 @@ func TestShardMapper_WriteAndSingleMapperAggregateQuery(t *testing.T) {
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openSelectMapperOrFail(t, shard, stmt)
mapper := openAggregateMapperOrFail(t, shard, stmt)
for i := range tt.expected {
got := aggIntervalAsJson(t, mapper)
@ -491,7 +491,7 @@ func TestShardMapper_SelectMapperTagSetsFields(t *testing.T) {
for _, tt := range tests {
stmt := mustParseSelectStatement(tt.stmt)
mapper := openSelectMapperOrFail(t, shard, stmt)
mapper := openAggregateMapperOrFail(t, shard, stmt)
fields := mapper.Fields()
if !reflect.DeepEqual(fields, tt.expectedFields) {
@ -537,12 +537,12 @@ func mustParseStatement(s string) influxql.Statement {
}
func openRawMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement, chunkSize int) tsdb.Mapper {
mapper := tsdb.NewSelectMapper(shard, stmt, chunkSize)
if err := mapper.Open(); err != nil {
m := tsdb.NewRawMapper(shard, stmt)
m.ChunkSize = chunkSize
if err := m.Open(); err != nil {
t.Fatalf("failed to open raw mapper: %s", err.Error())
}
return mapper
return m
}
func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string {
@ -553,16 +553,15 @@ func nextRawChunkAsJson(t *testing.T, mapper tsdb.Mapper) string {
return mustMarshalMapperOutput(r)
}
func openSelectMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.SelectMapper {
mapper := tsdb.NewSelectMapper(shard, stmt, 0)
if err := mapper.Open(); err != nil {
func openAggregateMapperOrFail(t *testing.T, shard *tsdb.Shard, stmt *influxql.SelectStatement) *tsdb.AggregateMapper {
m := tsdb.NewAggregateMapper(shard, stmt)
if err := m.Open(); err != nil {
t.Fatalf("failed to open aggregate mapper: %s", err.Error())
}
return mapper
return m
}
func aggIntervalAsJson(t *testing.T, mapper *tsdb.SelectMapper) string {
func aggIntervalAsJson(t *testing.T, mapper *tsdb.AggregateMapper) string {
r, err := mapper.NextChunk()
if err != nil {
t.Fatalf("failed to get next chunk from aggregate mapper: %s", err.Error())

View File

@ -59,6 +59,20 @@ func (d *DatabaseIndex) Measurement(name string) *Measurement {
return d.measurements[name]
}
// MeasurementsByName returns a list of measurements.
func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement {
d.mu.RLock()
defer d.mu.RUnlock()
a := make([]*Measurement, 0, len(names))
for _, name := range names {
if m := d.measurements[name]; m != nil {
a = append(a, m)
}
}
return a
}
// MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database.
// Useful for reporting and monitoring.
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int) {
@ -282,6 +296,142 @@ func (db *DatabaseIndex) DropSeries(keys []string) {
}
}
// RewriteSelectStatement performs any necessary query re-writing.
func (db *DatabaseIndex) RewriteSelectStatement(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
// Expand regex expressions in the FROM clause.
sources, err := db.ExpandSources(stmt.Sources)
if err != nil {
return nil, err
}
stmt.Sources = sources
// Expand wildcards in the fields or GROUP BY.
stmt, err = db.ExpandWildcards(stmt)
if err != nil {
return nil, err
}
stmt.RewriteDistinct()
return stmt, nil
}
// expandWildcards returns a new SelectStatement with wildcards expanded
// If only a `SELECT *` is present, without a `GROUP BY *`, both tags and fields expand in the SELECT
// If a `SELECT *` and a `GROUP BY *` are both present, then only fiels are expanded in the `SELECT` and only
// tags are expanded in the `GROUP BY`
func (db *DatabaseIndex) ExpandWildcards(stmt *influxql.SelectStatement) (*influxql.SelectStatement, error) {
// If there are no wildcards in the statement, return it as-is.
if !stmt.HasWildcard() {
return stmt, nil
}
// Use sets to avoid duplicate field names.
fieldSet := map[string]struct{}{}
dimensionSet := map[string]struct{}{}
// keep track of where the wildcards are in the select statement
hasFieldWildcard := stmt.HasFieldWildcard()
hasDimensionWildcard := stmt.HasDimensionWildcard()
// Iterate measurements in the FROM clause getting the fields & dimensions for each.
var fields influxql.Fields
var dimensions influxql.Dimensions
for _, src := range stmt.Sources {
if m, ok := src.(*influxql.Measurement); ok {
// Lookup the measurement in the database.
mm := db.Measurement(m.Name)
if mm == nil {
// This shard have never received data for the measurement. No Mapper
// required.
return stmt, nil
}
// Get the fields for this measurement.
for _, name := range mm.FieldNames() {
if _, ok := fieldSet[name]; ok {
continue
}
fieldSet[name] = struct{}{}
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: name}})
}
// Add tags to fields if a field wildcard was provided and a dimension wildcard was not.
if hasFieldWildcard && !hasDimensionWildcard {
for _, t := range mm.TagKeys() {
if _, ok := fieldSet[t]; ok {
continue
}
fieldSet[t] = struct{}{}
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: t}})
}
}
// Get the dimensions for this measurement.
if hasDimensionWildcard {
for _, t := range mm.TagKeys() {
if _, ok := dimensionSet[t]; ok {
continue
}
dimensionSet[t] = struct{}{}
dimensions = append(dimensions, &influxql.Dimension{Expr: &influxql.VarRef{Val: t}})
}
}
}
}
// Return a new SelectStatement with the wild cards rewritten.
return stmt.RewriteWildcards(fields, dimensions), nil
}
// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (di *DatabaseIndex) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates. Two regexes might produce
// duplicates when expanded.
set := map[string]influxql.Source{}
names := []string{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
if src.Regex == nil {
name := src.String()
set[name] = src
names = append(names, name)
continue
}
// Get measurements from the database that match the regex.
measurements := di.measurementsByRegex(src.Regex.Val)
// Add those measurements to the set.
for _, m := range measurements {
m2 := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Name: m.Name,
}
name := m2.String()
if _, ok := set[name]; !ok {
set[name] = m2
names = append(names, name)
}
}
default:
return nil, fmt.Errorf("expandSources: unsuported source type: %T", source)
}
}
// Sort the list of source names.
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
// assume the caller will use the appropriate locks
@ -903,6 +1053,64 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
return out
}
// SelectFields returns a list of fields in the SELECT section of stmt.
func (m *Measurement) SelectFields(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInSelect() {
if m.HasField(name) {
set.add(name)
continue
}
}
return set.list()
}
// SelectTags returns a list of non-field tags in the SELECT section of stmt.
func (m *Measurement) SelectTags(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInSelect() {
if !m.HasField(name) && m.HasTagKey(name) {
set.add(name)
}
}
return set.list()
}
// WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
func (m *Measurement) WhereFields(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInWhere() {
if name != "time" && m.HasField(name) {
set.add(name)
}
}
return set.list()
}
// DimensionTagSets returns list of tag sets from the GROUP BY section of stmt.
func (m *Measurement) DimensionTagSets(stmt *influxql.SelectStatement) ([]*influxql.TagSet, error) {
_, tagKeys := stmt.Dimensions.Normalize()
for _, n := range stmt.NamesInDimension() {
if m.HasTagKey(n) {
tagKeys = append(tagKeys, n)
}
}
// Get the sorted unique tag sets for this statement.
tagSets, err := m.TagSets(stmt, tagKeys)
if err != nil {
return nil, err
}
return tagSets, nil
}
type SelectInfo struct {
SelectFields []string
SelectTags []string
WhereFields []string
}
// Measurements represents a list of *Measurement.
type Measurements []*Measurement
@ -910,6 +1118,45 @@ func (a Measurements) Len() int { return len(a) }
func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// SelectFields returns a list of fields in the SELECT section of stmt.
func (a Measurements) SelectFields(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInSelect() {
for _, m := range a {
if m.HasField(name) {
set.add(name)
}
}
}
return set.list()
}
// SelectTags returns a list of non-field tags in the SELECT section of stmt.
func (a Measurements) SelectTags(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInSelect() {
for _, m := range a {
if !m.HasField(name) && m.HasTagKey(name) {
set.add(name)
}
}
}
return set.list()
}
// WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
func (a Measurements) WhereFields(stmt *influxql.SelectStatement) []string {
set := newStringSet()
for _, name := range stmt.NamesInWhere() {
for _, m := range a {
if name != "time" && m.HasField(name) {
set.add(name)
}
}
}
return set.list()
}
func (a Measurements) intersect(other Measurements) Measurements {
l := a
r := other

View File

@ -120,19 +120,19 @@ func (e *ShowMeasurementsExecutor) close() {
// ShowMeasurementsMapper is a mapper for collecting measurement names from a shard.
type ShowMeasurementsMapper struct {
remote Mapper
shard *Shard
stmt *influxql.ShowMeasurementsStatement
chunkSize int
state interface{}
remote Mapper
shard *Shard
stmt *influxql.ShowMeasurementsStatement
state interface{}
ChunkSize int
}
// NewShowMeasurementsMapper returns a mapper for the given shard, which will return data for the meta statement.
func NewShowMeasurementsMapper(shard *Shard, stmt *influxql.ShowMeasurementsStatement, chunkSize int) *ShowMeasurementsMapper {
func NewShowMeasurementsMapper(shard *Shard, stmt *influxql.ShowMeasurementsStatement) *ShowMeasurementsMapper {
return &ShowMeasurementsMapper{
shard: shard,
stmt: stmt,
chunkSize: chunkSize,
shard: shard,
stmt: stmt,
}
}
@ -176,10 +176,7 @@ func (m *ShowMeasurementsMapper) Open() error {
}
// SetRemote sets the remote mapper to use.
func (m *ShowMeasurementsMapper) SetRemote(remote Mapper) error {
m.remote = remote
return nil
}
func (m *ShowMeasurementsMapper) SetRemote(remote Mapper) { m.remote = remote }
// TagSets is only implemented on this mapper to satisfy the Mapper interface.
func (m *ShowMeasurementsMapper) TagSets() []string { return nil }
@ -212,13 +209,15 @@ func (m *ShowMeasurementsMapper) NextChunk() (interface{}, error) {
// nextChunk implements next chunk logic for a local shard.
func (m *ShowMeasurementsMapper) nextChunk() (interface{}, error) {
// Allocate array to hold measurement names.
names := make([]string, 0, m.chunkSize)
names := make([]string, 0, m.ChunkSize)
// Get the channel of measurement names from the state.
measurementNames := m.state.(chan string)
// Get the next chunk of names.
for n := range measurementNames {
names = append(names, n)
if len(names) == m.chunkSize {
if len(names) == m.ChunkSize {
break
}
}

View File

@ -205,7 +205,7 @@ func (m *ShowTagKeysMapper) Open() error {
// Expand regex expressions in the FROM clause.
if m.stmt.Sources != nil {
var err error
sources, err = expandSources(m.stmt.Sources, m.shard.index)
sources, err = m.shard.index.ExpandSources(m.stmt.Sources)
if err != nil {
return err
}

View File

@ -343,15 +343,23 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize int) (Mapper, error) {
shard := s.Shard(shardID)
switch st := stmt.(type) {
switch stmt := stmt.(type) {
case *influxql.SelectStatement:
return NewSelectMapper(shard, st, chunkSize), nil
if (stmt.IsRawQuery && !stmt.HasDistinct()) || stmt.IsSimpleDerivative() {
m := NewRawMapper(shard, stmt)
m.ChunkSize = chunkSize
return m, nil
}
return NewAggregateMapper(shard, stmt), nil
case *influxql.ShowMeasurementsStatement:
return NewShowMeasurementsMapper(shard, st, chunkSize), nil
m := NewShowMeasurementsMapper(shard, stmt)
m.ChunkSize = chunkSize
return m, nil
case *influxql.ShowTagKeysStatement:
return NewShowTagKeysMapper(shard, st, chunkSize), nil
return NewShowTagKeysMapper(shard, stmt, chunkSize), nil
default:
return nil, fmt.Errorf("can't create mapper for statement type: %v", st)
return nil, fmt.Errorf("can't create mapper for statement type: %T", stmt)
}
}