2015-05-29 14:31:03 +00:00
package tsdb
2015-01-26 12:19:35 +00:00
import (
2015-05-28 22:02:12 +00:00
"encoding/binary"
2015-01-26 12:19:35 +00:00
"fmt"
2015-02-23 23:07:01 +00:00
"math"
2015-03-02 06:37:09 +00:00
"sort"
2015-01-26 12:19:35 +00:00
"time"
"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/influxql"
2015-05-28 22:02:12 +00:00
"github.com/influxdb/influxdb/meta"
2015-01-26 12:19:35 +00:00
)
// tx represents a transaction that spans multiple shard data stores.
// This transaction will open and close all data stores atomically.
type tx struct {
2015-05-28 22:02:12 +00:00
now time . Time
2015-03-06 19:23:58 +00:00
// used by DecodeFields and FieldIDs. Only used in a raw query, which won't let you select from more than one measurement
2015-05-29 14:31:03 +00:00
measurement * Measurement
2015-05-28 22:02:12 +00:00
meta metaStore
store localStore
}
type metaStore interface {
RetentionPolicy ( database , name string ) ( rpi * meta . RetentionPolicyInfo , err error )
}
type localStore interface {
2015-05-29 14:31:03 +00:00
Measurement ( database , name string ) * Measurement
2015-05-28 22:02:12 +00:00
ValidateAggregateFieldsInStatement ( shardID uint64 , measurementName string , stmt * influxql . SelectStatement ) error
2015-05-29 14:31:03 +00:00
Shard ( shardID uint64 ) * Shard
2015-01-26 12:19:35 +00:00
}
// newTx return a new initialized Tx.
2015-05-28 22:02:12 +00:00
func newTx ( meta metaStore , store localStore ) * tx {
2015-01-26 12:19:35 +00:00
return & tx {
2015-05-28 22:02:12 +00:00
meta : meta ,
store : store ,
now : time . Now ( ) ,
2015-01-26 12:19:35 +00:00
}
}
// SetNow sets the current time for the transaction.
func ( tx * tx ) SetNow ( now time . Time ) { tx . now = now }
2015-02-23 23:07:01 +00:00
// CreateMappers will create a set of mappers that need to be run to execute the map phase of a MapReduceJob.
2015-03-02 06:37:09 +00:00
func ( tx * tx ) CreateMapReduceJobs ( stmt * influxql . SelectStatement , tagKeys [ ] string ) ( [ ] * influxql . MapReduceJob , error ) {
2015-03-13 22:35:36 +00:00
jobs := [ ] * influxql . MapReduceJob { }
for _ , src := range stmt . Sources {
2015-03-28 00:40:21 +00:00
mm , ok := src . ( * influxql . Measurement )
if ! ok {
return nil , fmt . Errorf ( "invalid source type: %#v" , src )
2015-03-13 22:35:36 +00:00
}
2015-01-26 12:19:35 +00:00
2015-05-28 22:02:12 +00:00
// get the index and the retention policy
rp , err := tx . meta . RetentionPolicy ( mm . Database , mm . RetentionPolicy )
2015-03-13 22:35:36 +00:00
if err != nil {
return nil , err
}
2015-05-28 22:02:12 +00:00
m := tx . store . Measurement ( mm . Database , mm . Name )
2015-03-13 22:35:36 +00:00
if m == nil {
2015-03-28 00:40:21 +00:00
return nil , ErrMeasurementNotFound ( influxql . QuoteIdent ( [ ] string { mm . Database , "" , mm . Name } ... ) )
2015-03-13 22:35:36 +00:00
}
2015-02-23 23:07:01 +00:00
2015-03-13 22:35:36 +00:00
tx . measurement = m
2015-03-02 06:37:09 +00:00
2015-03-13 22:35:36 +00:00
// Validate the fields and tags asked for exist and keep track of which are in the select vs the where
2015-05-28 22:02:12 +00:00
var selectFields [ ] string
var whereFields [ ] string
2015-03-13 22:35:36 +00:00
var selectTags [ ] string
2015-03-06 19:23:58 +00:00
2015-03-13 22:35:36 +00:00
for _ , n := range stmt . NamesInSelect ( ) {
2015-06-04 18:50:32 +00:00
if m . HasField ( n ) {
2015-05-28 22:02:12 +00:00
selectFields = append ( selectFields , n )
2015-03-13 22:35:36 +00:00
continue
}
if ! m . HasTagKey ( n ) {
return nil , fmt . Errorf ( "unknown field or tag name in select clause: %s" , n )
}
selectTags = append ( selectTags , n )
2015-06-02 20:48:33 +00:00
tagKeys = append ( tagKeys , n )
2015-03-06 19:23:58 +00:00
}
2015-03-13 22:35:36 +00:00
for _ , n := range stmt . NamesInWhere ( ) {
if n == "time" {
continue
}
2015-06-04 18:50:32 +00:00
if m . HasField ( n ) {
2015-05-28 22:02:12 +00:00
whereFields = append ( whereFields , n )
2015-03-13 22:35:36 +00:00
continue
}
if ! m . HasTagKey ( n ) {
return nil , fmt . Errorf ( "unknown field or tag name in where clause: %s" , n )
}
2015-03-06 19:23:58 +00:00
}
2015-03-13 22:35:36 +00:00
2015-05-20 18:34:24 +00:00
if len ( selectFields ) == 0 && len ( stmt . FunctionCalls ( ) ) == 0 {
return nil , fmt . Errorf ( "select statement must include at least one field or function call" )
2015-05-16 00:02:23 +00:00
}
2015-05-20 16:42:58 +00:00
// Validate that group by is not a field
for _ , d := range stmt . Dimensions {
switch e := d . Expr . ( type ) {
case * influxql . VarRef :
if ! m . HasTagKey ( e . Val ) {
return nil , fmt . Errorf ( "can not use field in group by clause: %s" , e . Val )
}
}
}
2015-03-13 22:35:36 +00:00
// Grab time range from statement.
tmin , tmax := influxql . TimeRange ( stmt . Condition )
if tmax . IsZero ( ) {
tmax = tx . now
2015-03-06 19:23:58 +00:00
}
2015-03-13 22:35:36 +00:00
if tmin . IsZero ( ) {
tmin = time . Unix ( 0 , 0 )
2015-03-06 19:23:58 +00:00
}
2015-01-26 12:19:35 +00:00
2015-03-13 22:35:36 +00:00
// Find shard groups within time range.
2015-05-28 22:02:12 +00:00
var shardGroups [ ] * meta . ShardGroupInfo
for _ , group := range rp . ShardGroups {
2015-05-19 16:35:44 +00:00
if group . Overlaps ( tmin , tmax ) {
2015-05-28 22:02:12 +00:00
g := group
shardGroups = append ( shardGroups , & g )
2015-03-13 22:35:36 +00:00
}
2015-01-27 23:55:59 +00:00
}
2015-03-13 22:35:36 +00:00
if len ( shardGroups ) == 0 {
return nil , nil
2015-02-23 23:07:01 +00:00
}
2015-04-05 15:59:16 +00:00
// get the group by interval, if there is one
var interval int64
if d , err := stmt . GroupByInterval ( ) ; err != nil {
return nil , err
} else {
interval = d . Nanoseconds ( )
}
2015-03-13 22:35:36 +00:00
// get the sorted unique tag sets for this query.
2015-05-28 22:02:12 +00:00
tagSets , err := m . TagSets ( stmt , tagKeys )
2015-04-18 16:28:47 +00:00
if err != nil {
return nil , err
}
2015-03-13 22:35:36 +00:00
for _ , t := range tagSets {
// make a job for each tagset
job := & influxql . MapReduceJob {
MeasurementName : m . Name ,
TagSet : t ,
TMin : tmin . UnixNano ( ) ,
TMax : tmax . UnixNano ( ) ,
2015-02-23 23:07:01 +00:00
}
2015-01-26 12:19:35 +00:00
2015-03-13 22:35:36 +00:00
// make a mapper for each shard that must be hit. We may need to hit multiple shards within a shard group
2015-03-30 23:38:26 +00:00
var mappers [ ] influxql . Mapper
2015-03-13 22:35:36 +00:00
// create mappers for each shard we need to hit
for _ , sg := range shardGroups {
2015-05-28 22:02:12 +00:00
// TODO: implement distributed queries
if len ( sg . Shards ) != 1 {
return nil , fmt . Errorf ( "distributed queries aren't supported yet. You have a replication policy with RF < # of servers in cluster" )
}
shard := tx . store . Shard ( sg . Shards [ 0 ] . ID )
if shard == nil {
// the store returned nil which means we haven't written any data into this shard yet, so ignore it
continue
2015-04-18 05:45:49 +00:00
}
2015-03-29 23:21:53 +00:00
2015-06-08 15:14:42 +00:00
// get the codec for this measuremnt. If this is nil it just means this measurement was
// never written into this shard, so we can skip it and continue.
codec := shard . FieldCodec ( m . Name )
if codec == nil {
continue
}
2015-05-28 22:02:12 +00:00
var mapper influxql . Mapper
mapper = & LocalMapper {
seriesKeys : t . SeriesKeys ,
2015-06-18 15:07:51 +00:00
shard : shard ,
2015-05-28 22:02:12 +00:00
db : shard . DB ( ) ,
job : job ,
2015-06-08 15:14:42 +00:00
decoder : codec ,
2015-05-28 22:02:12 +00:00
filters : t . Filters ,
whereFields : whereFields ,
selectFields : selectFields ,
selectTags : selectTags ,
tmin : tmin . UnixNano ( ) ,
tmax : tmax . UnixNano ( ) ,
interval : interval ,
// multiple mappers may need to be merged together to get the results
// for a raw query. So each mapper will have to read at least the
// limit plus the offset in data points to ensure we've hit our mark
limit : uint64 ( stmt . Limit ) + uint64 ( stmt . Offset ) ,
2015-03-13 22:35:36 +00:00
}
2015-05-28 22:02:12 +00:00
mappers = append ( mappers , mapper )
2015-02-23 23:07:01 +00:00
}
2015-03-13 22:35:36 +00:00
job . Mappers = mappers
2015-02-23 23:07:01 +00:00
2015-03-13 22:35:36 +00:00
jobs = append ( jobs , job )
}
2015-01-26 12:19:35 +00:00
}
2015-02-23 23:07:01 +00:00
// always return them in sorted order so the results from running the jobs are returned in a deterministic order
sort . Sort ( influxql . MapReduceJobs ( jobs ) )
return jobs , nil
2015-01-26 12:19:35 +00:00
}
2015-02-23 23:07:01 +00:00
// LocalMapper implements the influxql.Mapper interface for running map tasks over a shard that is local to this server
type LocalMapper struct {
2015-04-04 20:07:20 +00:00
cursorsEmpty bool // boolean that lets us know if the cursors are empty
2015-05-29 14:31:03 +00:00
decoder * FieldCodec // decoder for the raw data bytes
2015-04-04 20:07:20 +00:00
filters [ ] influxql . Expr // filters for each series
2015-06-18 15:07:51 +00:00
cursors [ ] * shardCursor // bolt cursors for each series id
2015-05-28 22:02:12 +00:00
seriesKeys [ ] string // seriesKeys to be read from this shard
2015-06-18 15:07:51 +00:00
shard * Shard // original shard
2015-04-04 20:07:20 +00:00
db * bolt . DB // bolt store for the shard accessed by this mapper
txn * bolt . Tx // read transactions by shard id
job * influxql . MapReduceJob // the MRJob this mapper belongs to
mapFunc influxql . MapFunc // the map func
fieldID uint8 // the field ID associated with the mapFunc curently being run
fieldName string // the field name associated with the mapFunc currently being run
keyBuffer [ ] int64 // the current timestamp key for each cursor
valueBuffer [ ] [ ] byte // the current value for each cursor
tmin int64 // the min of the current group by interval being iterated over
tmax int64 // the max of the current group by interval being iterated over
additionalNames [ ] string // additional field or tag names that might be requested from the map function
2015-05-28 22:02:12 +00:00
whereFields [ ] string // field names that occur in the where clause
selectFields [ ] string // field names that occur in the select clause
2015-04-04 20:07:20 +00:00
selectTags [ ] string // tag keys that occur in the select clause
isRaw bool // if the query is a non-aggregate query
2015-04-05 15:59:16 +00:00
interval int64 // the group by interval of the query, if any
2015-04-04 20:07:20 +00:00
limit uint64 // used for raw queries for LIMIT
perIntervalLimit int // used for raw queries to determine how far into a chunk we are
chunkSize int // used for raw queries to determine how much data to read before flushing to client
2015-01-26 12:19:35 +00:00
}
2015-01-27 02:14:07 +00:00
2015-03-30 23:38:26 +00:00
// Open opens the LocalMapper.
2015-02-23 23:07:01 +00:00
func ( l * LocalMapper ) Open ( ) error {
2015-06-18 15:07:51 +00:00
// Obtain shard lock to copy in-cache points.
l . shard . mu . Lock ( )
defer l . shard . mu . Unlock ( )
2015-01-27 23:55:59 +00:00
// Open the data store
2015-02-23 23:07:01 +00:00
txn , err := l . db . Begin ( false )
2015-01-27 23:55:59 +00:00
if err != nil {
return err
}
2015-02-23 23:07:01 +00:00
l . txn = txn
2015-01-27 23:55:59 +00:00
2015-02-23 23:07:01 +00:00
// create a bolt cursor for each unique series id
2015-06-18 15:07:51 +00:00
l . cursors = make ( [ ] * shardCursor , len ( l . seriesKeys ) )
2015-03-02 06:37:09 +00:00
2015-05-28 22:02:12 +00:00
for i , key := range l . seriesKeys {
2015-06-18 15:07:51 +00:00
// Retrieve key bucket.
2015-05-28 22:02:12 +00:00
b := l . txn . Bucket ( [ ] byte ( key ) )
2015-06-18 15:07:51 +00:00
// Ignore if there is no bucket or points in the cache.
partitionID := WALPartition ( [ ] byte ( key ) )
if b == nil && len ( l . shard . cache [ partitionID ] [ key ] ) == 0 {
2015-01-27 23:55:59 +00:00
continue
}
2015-06-18 15:07:51 +00:00
// Retrieve a copy of the in-cache points for the key.
cache := make ( [ ] [ ] byte , len ( l . shard . cache [ partitionID ] [ key ] ) )
copy ( cache , l . shard . cache [ partitionID ] [ key ] )
// Build a cursor that merges the bucket and cache together.
cur := & shardCursor { cache : cache }
if b != nil {
cur . cursor = b . Cursor ( )
}
l . cursors [ i ] = cur
2015-01-27 23:55:59 +00:00
}
2015-03-02 06:37:09 +00:00
return nil
2015-01-27 23:55:59 +00:00
}
2015-03-30 23:38:26 +00:00
// Close closes the LocalMapper.
2015-02-23 23:07:01 +00:00
func ( l * LocalMapper ) Close ( ) {
2015-06-18 15:07:51 +00:00
if l . txn != nil {
_ = l . txn . Rollback ( )
}
2015-01-27 23:55:59 +00:00
}
2015-03-02 06:37:09 +00:00
// Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time
2015-04-04 20:07:20 +00:00
func ( l * LocalMapper ) Begin ( c * influxql . Call , startingTime int64 , chunkSize int ) error {
2015-02-23 23:07:01 +00:00
// set up the buffers. These ensure that we return data in time order
2015-03-02 06:37:09 +00:00
mapFunc , err := influxql . InitializeMapFunc ( c )
if err != nil {
return err
}
l . mapFunc = mapFunc
2015-02-23 23:07:01 +00:00
l . keyBuffer = make ( [ ] int64 , len ( l . cursors ) )
l . valueBuffer = make ( [ ] [ ] byte , len ( l . cursors ) )
2015-04-04 20:07:20 +00:00
l . chunkSize = chunkSize
2015-04-05 15:59:16 +00:00
l . tmin = startingTime
2015-01-27 02:14:07 +00:00
2015-05-20 21:02:39 +00:00
var isCountDistinct bool
2015-03-06 19:23:58 +00:00
// determine if this is a raw data query with a single field, multiple fields, or an aggregate
var fieldName string
if c == nil { // its a raw data query
l . isRaw = true
if len ( l . selectFields ) == 1 {
2015-05-28 22:02:12 +00:00
fieldName = l . selectFields [ 0 ]
2015-03-06 19:23:58 +00:00
}
2015-04-04 20:07:20 +00:00
// if they haven't set a limit, just set it to the max int size
if l . limit == 0 {
l . limit = math . MaxUint64
}
2015-03-06 19:23:58 +00:00
} else {
2015-05-12 22:51:19 +00:00
// Check for calls like `derivative(mean(value), 1d)`
2015-05-12 16:37:58 +00:00
var nested * influxql . Call = c
if fn , ok := c . Args [ 0 ] . ( * influxql . Call ) ; ok {
nested = fn
}
2015-05-19 00:13:44 +00:00
switch lit := nested . Args [ 0 ] . ( type ) {
case * influxql . VarRef :
fieldName = lit . Val
case * influxql . Distinct :
if c . Name != "count" {
return fmt . Errorf ( "aggregate call didn't contain a field %s" , c . String ( ) )
}
2015-05-20 21:02:39 +00:00
isCountDistinct = true
2015-05-19 00:13:44 +00:00
fieldName = lit . Val
default :
2015-05-12 16:37:58 +00:00
return fmt . Errorf ( "aggregate call didn't contain a field %s" , c . String ( ) )
}
2015-05-20 20:15:44 +00:00
2015-05-20 21:02:39 +00:00
isCountDistinct = isCountDistinct || ( c . Name == "count" && nested . Name == "distinct" )
2015-03-06 19:23:58 +00:00
}
// set up the field info if a specific field was set for this mapper
if fieldName != "" {
2015-06-02 18:56:20 +00:00
fid , err := l . decoder . FieldIDByName ( fieldName )
if err != nil {
2015-06-03 21:36:18 +00:00
switch {
2015-06-08 20:19:35 +00:00
case c != nil && c . Name == "distinct" :
return fmt . Errorf ( ` %s isn't a field on measurement %s; to query the unique values for a tag use SHOW TAG VALUES FROM %[2]s WITH KEY = "%[1]s ` , fieldName , l . job . MeasurementName )
2015-06-03 21:36:18 +00:00
case isCountDistinct :
return fmt . Errorf ( "%s isn't a field on measurement %s; count(distinct) on tags isn't yet supported" , fieldName , l . job . MeasurementName )
}
2015-03-06 19:23:58 +00:00
}
2015-06-02 18:56:20 +00:00
l . fieldID = fid
2015-05-28 22:02:12 +00:00
l . fieldName = fieldName
2015-03-06 19:23:58 +00:00
}
2015-02-23 23:07:01 +00:00
// seek the bolt cursors and fill the buffers
for i , c := range l . cursors {
2015-03-09 18:17:36 +00:00
// this series may have never been written in this shard group (time range) so the cursor would be nil
if c == nil {
l . keyBuffer [ i ] = 0
l . valueBuffer [ i ] = nil
continue
}
2015-02-23 23:07:01 +00:00
k , v := c . Seek ( u64tob ( uint64 ( l . job . TMin ) ) )
if k == nil {
2015-03-02 06:37:09 +00:00
l . keyBuffer [ i ] = 0
l . valueBuffer [ i ] = nil
2015-02-23 23:07:01 +00:00
continue
}
2015-03-02 06:37:09 +00:00
l . cursorsEmpty = false
2015-02-23 23:07:01 +00:00
t := int64 ( btou64 ( k ) )
2015-03-02 06:37:09 +00:00
l . keyBuffer [ i ] = t
l . valueBuffer [ i ] = v
2015-01-27 23:55:59 +00:00
}
2015-03-02 06:37:09 +00:00
return nil
}
2015-01-27 02:14:07 +00:00
2015-03-02 06:37:09 +00:00
// NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a
// forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read.
2015-03-28 14:17:16 +00:00
// If this is a raw query, interval should be the max time to hit in the query
2015-04-05 15:59:16 +00:00
func ( l * LocalMapper ) NextInterval ( ) ( interface { } , error ) {
2015-03-02 06:37:09 +00:00
if l . cursorsEmpty || l . tmin > l . job . TMax {
return nil , nil
2015-01-27 23:55:59 +00:00
}
2015-01-27 02:14:07 +00:00
2015-04-05 15:59:16 +00:00
// after we call to the mapper, this will be the tmin for the next interval.
nextMin := l . tmin + l . interval
2015-03-02 06:37:09 +00:00
// Set the upper bound of the interval.
2015-03-28 14:17:16 +00:00
if l . isRaw {
2015-04-04 20:07:20 +00:00
l . perIntervalLimit = l . chunkSize
2015-04-05 15:59:16 +00:00
} else if l . interval > 0 {
// Set tmax to ensure that the interval lands on the boundary of the interval
if l . tmin % l . interval != 0 {
// the first interval in a query with a group by may be smaller than the others. This happens when they have a
// where time > clause that is in the middle of the bucket that the group by time creates. That will be the
// case on the first interval when the tmin % the interval isn't equal to zero
nextMin = l . tmin / l . interval * l . interval + l . interval
}
2015-04-06 20:16:53 +00:00
l . tmax = nextMin - 1
2015-03-02 06:37:09 +00:00
}
2015-01-27 02:14:07 +00:00
2015-03-02 06:37:09 +00:00
// Execute the map function. This local mapper acts as the iterator
val := l . mapFunc ( l )
2015-02-14 22:12:38 +00:00
2015-03-02 06:37:09 +00:00
// see if all the cursors are empty
l . cursorsEmpty = true
for _ , k := range l . keyBuffer {
if k != 0 {
l . cursorsEmpty = false
break
2015-02-23 23:07:01 +00:00
}
2015-03-02 06:37:09 +00:00
}
2015-01-28 01:07:55 +00:00
2015-03-28 14:17:16 +00:00
// Move the interval forward if it's not a raw query. For raw queries we use the limit to advance intervals.
if ! l . isRaw {
2015-04-05 15:59:16 +00:00
l . tmin = nextMin
2015-03-28 14:17:16 +00:00
}
2015-02-12 23:56:54 +00:00
2015-03-02 06:37:09 +00:00
return val , nil
2015-01-28 01:07:55 +00:00
}
2015-03-30 23:38:26 +00:00
// Next returns the next matching timestamped value for the LocalMapper.
2015-05-28 22:02:12 +00:00
func ( l * LocalMapper ) Next ( ) ( seriesKey string , timestamp int64 , value interface { } ) {
2015-03-02 06:37:09 +00:00
for {
2015-04-04 20:07:20 +00:00
// if it's a raw query and we've hit the limit of the number of points to read in
// for either this chunk or for the absolute query, bail
if l . isRaw && ( l . limit == 0 || l . perIntervalLimit == 0 ) {
2015-05-28 22:02:12 +00:00
return "" , int64 ( 0 ) , nil
2015-03-28 14:17:16 +00:00
}
2015-03-02 06:37:09 +00:00
// find the minimum timestamp
min := - 1
minKey := int64 ( math . MaxInt64 )
for i , k := range l . keyBuffer {
if k != 0 && k <= l . tmax && k < minKey && k >= l . tmin {
2015-02-23 23:07:01 +00:00
min = i
2015-03-02 06:37:09 +00:00
minKey = k
2015-02-23 23:07:01 +00:00
}
}
2015-01-27 23:55:59 +00:00
2015-03-02 06:37:09 +00:00
// return if there is no more data in this group by interval
if min == - 1 {
2015-05-28 22:02:12 +00:00
return "" , 0 , nil
2015-03-02 06:37:09 +00:00
}
2015-01-28 01:07:55 +00:00
2015-03-06 19:23:58 +00:00
// set the current timestamp and seriesID
2015-03-02 06:37:09 +00:00
timestamp = l . keyBuffer [ min ]
2015-05-28 22:02:12 +00:00
seriesKey = l . seriesKeys [ min ]
2015-03-06 19:23:58 +00:00
// decode either the value, or values we need. Also filter if necessary
var value interface { }
var err error
if l . isRaw && len ( l . selectFields ) > 1 {
2015-04-03 22:18:13 +00:00
if fieldsWithNames , err := l . decoder . DecodeFieldsWithNames ( l . valueBuffer [ min ] ) ; err == nil {
value = fieldsWithNames
2015-03-06 19:23:58 +00:00
2015-04-03 22:18:13 +00:00
// if there's a where clause, make sure we don't need to filter this value
if l . filters [ min ] != nil {
if ! matchesWhere ( l . filters [ min ] , fieldsWithNames ) {
value = nil
}
2015-03-06 19:23:58 +00:00
}
}
} else {
value , err = l . decoder . DecodeByID ( l . fieldID , l . valueBuffer [ min ] )
// if there's a where clase, see if we need to filter
if l . filters [ min ] != nil {
// see if the where is only on this field or on one or more other fields. if the latter, we'll have to decode everything
2015-05-28 22:02:12 +00:00
if len ( l . whereFields ) == 1 && l . whereFields [ 0 ] == l . fieldName {
2015-03-06 19:23:58 +00:00
if ! matchesWhere ( l . filters [ min ] , map [ string ] interface { } { l . fieldName : value } ) {
value = nil
}
} else { // decode everything
2015-04-03 22:18:13 +00:00
fieldsWithNames , err := l . decoder . DecodeFieldsWithNames ( l . valueBuffer [ min ] )
if err != nil || ! matchesWhere ( l . filters [ min ] , fieldsWithNames ) {
2015-03-06 19:23:58 +00:00
value = nil
}
}
}
}
2015-01-28 01:07:55 +00:00
2015-03-02 06:37:09 +00:00
// advance the cursor
nextKey , nextVal := l . cursors [ min ] . Next ( )
if nextKey == nil {
l . keyBuffer [ min ] = 0
} else {
l . keyBuffer [ min ] = int64 ( btou64 ( nextKey ) )
}
l . valueBuffer [ min ] = nextVal
2015-01-28 01:07:55 +00:00
2015-03-06 19:23:58 +00:00
// if the value didn't match our filter or if we didn't find the field keep iterating
if err != nil || value == nil {
2015-03-02 06:37:09 +00:00
continue
}
2015-03-28 14:17:16 +00:00
// if it's a raw query, we always limit the amount we read in
if l . isRaw {
2015-04-04 20:07:20 +00:00
l . limit --
l . perIntervalLimit --
2015-03-28 14:17:16 +00:00
}
2015-05-28 22:02:12 +00:00
return seriesKey , timestamp , value
2015-02-23 23:07:01 +00:00
}
}
2015-02-09 08:39:42 +00:00
2015-04-06 20:16:53 +00:00
// IsEmpty returns true if either all cursors are nil or all cursors are past the passed in max time
func ( l * LocalMapper ) IsEmpty ( tmax int64 ) bool {
if l . cursorsEmpty || l . limit == 0 {
return true
}
// look at the next time for each cursor
for _ , t := range l . keyBuffer {
// if the time is less than the max, we haven't emptied this mapper yet
if t != 0 && t <= tmax {
return false
}
}
return true
}
2015-03-06 19:23:58 +00:00
// matchesFilter returns true if the value matches the where clause
func matchesWhere ( f influxql . Expr , fields map [ string ] interface { } ) bool {
if ok , _ := influxql . Eval ( f , fields ) . ( bool ) ; ! ok {
return false
}
return true
2015-02-23 23:07:01 +00:00
}
2015-01-28 01:07:55 +00:00
2015-05-28 22:02:12 +00:00
// btou64 converts an 8-byte slice into an uint64.
func btou64 ( b [ ] byte ) uint64 { return binary . BigEndian . Uint64 ( b ) }