Wire up field expression evaluation

pull/1369/head
Paul Dix 2015-01-27 20:07:55 -05:00
parent 861163c03e
commit f2419caeb5
2 changed files with 86 additions and 73 deletions

View File

@ -221,9 +221,7 @@ func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (serie
if stmt.Condition == nil {
return m.seriesIDs, nil
}
ids, _, expr := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr)
warn("ids: ", ids, expr)
warn("foo: ", seriesIdsToExpr)
ids, _, _ := m.walkWhereForSeriesIds(stmt.Condition, seriesIdsToExpr)
return ids, seriesIdsToExpr
}
@ -276,7 +274,6 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
// if it's a field we can't collapse it so we have to look at all series ids for this
if m.FieldByName(name.Val) != nil {
warn("field: ", name.Val, n.String())
return m.seriesIDs, true, n
}
@ -299,14 +296,11 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, bool, influ
// The map that it takes maps each series id to the field expression that should be used to evaluate it when iterating over its cursor.
// Series that have no field expressions won't be in the map
func (m *Measurement) walkWhereForSeriesIds(node influxql.Node, filters map[uint32]influxql.Expr) (seriesIDs, bool, influxql.Expr) {
warn(". ", node.String())
switch n := node.(type) {
case *influxql.BinaryExpr:
warn("op: ", n.Op)
// if it's EQ then it's either a field expression or against a tag. we can return this
if n.Op == influxql.EQ {
ids, shouldInclude, expr := m.idsForExpr(n)
warn("eq ", ids, shouldInclude)
return ids, shouldInclude, expr
} else if n.Op == influxql.AND || n.Op == influxql.OR { // if it's an AND or OR we need to union or intersect the results
var ids seriesIDs
@ -327,12 +321,10 @@ func (m *Measurement) walkWhereForSeriesIds(node influxql.Node, filters map[uint
ids = r
}
warn("... ", n.Op, lexpr, rexpr)
if n.Op == influxql.OR && il && ir && (lexpr == nil || rexpr == nil) {
// if it's an OR and we're going to include both sides and one of those expression is nil,
// we need to clear out restrictive filters on series that don't need them anymore
idsToClear := l.intersect(r)
warn("clearning")
for _, id := range idsToClear {
delete(filters, id)
}
@ -365,7 +357,6 @@ func (m *Measurement) walkWhereForSeriesIds(node influxql.Node, filters map[uint
// that is, filters that are no longer part of the end result set
if n.Op == influxql.AND && il && ir {
filtersToClear := l.union(r).reject(ids)
warn("filt ", filtersToClear)
for _, id := range filtersToClear {
delete(filters, id)
}

148
tx.go
View File

@ -1,7 +1,6 @@
package influxdb
import (
"bytes"
"fmt"
"sync"
"time"
@ -127,25 +126,28 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
}
tagSets := m.tagSets(stmt, dimensions)
// Convert time range to bytes.
kmin := u64tob(uint64(tmin.UnixNano()))
kmax := u64tob(uint64(tmax.UnixNano()))
// Create an iterator for every shard.
var itrs []influxql.Iterator
for tag, set := range tagSets {
for _, group := range shardGroups {
// TODO: only create iterators for the shards we actually have to hit in a group
for _, sh := range group.Shards {
// create a series cursor for each unique series id
cursors := make([]*seriesCursor, 0, len(set))
for id, cond := range set {
cursors = append(cursors, &seriesCursor{id: id, condition: cond})
}
// create the shard iterator that will map over all series for the shard
itr := &shardIterator{
fieldID: f.ID,
tags: tag,
conditions: set, // TODO: only pass in conditions for series that are in this shard
db: sh.store,
cur: &multiCursor{
kmin: kmin,
kmax: kmax,
},
fieldName: f.Name,
fieldID: f.ID,
tags: tag,
db: sh.store,
cursors: cursors,
tmin: tmin.UnixNano(),
tmax: tmax.UnixNano(),
}
// Add to tx so the bolt transaction can be opened/closed.
@ -172,28 +174,14 @@ func splitIdent(s string) (db, rp, m string, err error) {
// shardIterator represents an iterator for traversing over a single series.
type shardIterator struct {
fieldName string
fieldID uint8
tags string // encoded dimensional tag values
conditions map[uint32]influxql.Expr
cursors []*seriesCursor
keyValues []keyValue
db *bolt.DB // data stores by shard id
txn *bolt.Tx // read transactions by shard id
cur *multiCursor
}
func (i *shardIterator) Tags() string { return i.tags }
func (i *shardIterator) Next() (key int64, value interface{}) {
// Ignore if there's no more data.
k, v := i.cur.Next()
if k == nil {
return 0, nil
}
// Read timestamp & field value.
key = int64(btou64(k))
value = unmarshalValue(v, i.fieldID)
return key, value
tmin, tmax int64
}
func (i *shardIterator) open() error {
@ -205,17 +193,19 @@ func (i *shardIterator) open() error {
i.txn = txn
// Open cursors for each series id
for id, _ := range i.conditions {
b := i.txn.Bucket(u32tob(id))
for _, c := range i.cursors {
b := i.txn.Bucket(u32tob(c.id))
if b == nil {
continue
}
// add the cursor fo this series on the shard
i.cur.cursors = append(i.cur.cursors, b.Cursor())
c.cur = b.Cursor()
}
i.cur.initialize()
i.keyValues = make([]keyValue, len(i.cursors))
for j, cur := range i.cursors {
i.keyValues[j].key, i.keyValues[j].value = cur.Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
}
return nil
}
@ -225,46 +215,78 @@ func (i *shardIterator) close() error {
return nil
}
type multiCursor struct {
cursors []*bolt.Cursor
keyValues []keyValue
kmin, kmax []byte // min/max keys
}
func (i *shardIterator) Tags() string { return i.tags }
type keyValue struct {
key []byte
value []byte
}
func (c *multiCursor) initialize() {
c.keyValues = make([]keyValue, len(c.cursors))
for i, cur := range c.cursors {
c.keyValues[i].key, c.keyValues[i].value = cur.Seek(c.kmin)
}
}
func (c *multiCursor) Next() (key, value []byte) {
func (i *shardIterator) Next() (key int64, value interface{}) {
min := -1
for i, kv := range c.keyValues {
if kv.key != nil && bytes.Compare(kv.key, c.kmax) == -1 {
min = i
for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
min = ind
}
}
// if min is -1 we've exhausted all cursors for the given time range
if min == -1 {
return nil, nil
return 0, nil
}
kv := c.keyValues[min]
kv := i.keyValues[min]
key = kv.key
value = kv.value
c.keyValues[min].key, c.keyValues[min].value = c.cursors[min].Next()
if bytes.Compare(c.keyValues[min].key, c.kmax) == 1 {
c.keyValues[min].key = nil
i.keyValues[min].key, i.keyValues[min].value = i.cursors[min].Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
return key, value
}
type keyValue struct {
key int64
value interface{}
}
type seriesCursor struct {
id uint32
condition influxql.Expr
cur *bolt.Cursor
initialized bool
}
func (c *seriesCursor) Next(fieldName string, fieldID uint8, tmin, tmax int64) (key int64, value interface{}) {
// TODO: clean this up when we make it so series ids are only queried against the shards they exist in.
// Right now we query for all series ids on a query against each shard, even if that shard may not have the
// data, so cur could be nil.
if c.cur == nil {
return 0, nil
}
return
for {
var k, v []byte
if !c.initialized {
k, v = c.cur.Seek(u64tob(uint64(tmin)))
c.initialized = true
} else {
k, v = c.cur.Next()
}
// Exit if there is no more data.
if k == nil {
return 0, nil
}
// Marshal key & value.
key, value = int64(btou64(k)), unmarshalValue(v, fieldID)
if key > tmax {
return 0, nil
}
// Evaluate condition. Move to next key/value if non-true.
if c.condition != nil {
if ok, _ := influxql.Eval(c.condition, map[string]interface{}{fieldName: value}).(bool); !ok {
continue
}
}
return key, value
}
}