2014-12-31 23:55:45 +00:00
package influxdb
import (
2015-02-12 23:13:04 +00:00
"encoding/binary"
2014-12-31 23:55:45 +00:00
"encoding/json"
2015-01-28 04:36:19 +00:00
"fmt"
2015-01-10 20:22:57 +00:00
"math"
2015-01-28 01:29:15 +00:00
"regexp"
2014-12-31 23:55:45 +00:00
"sort"
"strings"
"time"
2015-01-13 17:16:43 +00:00
"github.com/influxdb/influxdb/influxql"
2014-12-31 23:55:45 +00:00
)
2015-02-13 01:02:51 +00:00
const (
maxStringLength = 64 * 1024
)
2014-12-31 23:55:45 +00:00
// database is a collection of retention policies and shards. It also has methods
// for keeping an in memory index of all the measurements, series, and tags in the database.
// Methods on this struct aren't goroutine safe. They assume that the server is handling
// any locking to make things safe.
type database struct {
name string
2015-01-20 02:44:47 +00:00
policies map [ string ] * RetentionPolicy // retention policies by name
2015-02-12 00:57:50 +00:00
continuousQueries [ ] * ContinuousQuery // continuous queries
2014-12-31 23:55:45 +00:00
defaultRetentionPolicy string
// in memory indexing structures
2015-01-02 19:24:48 +00:00
measurements map [ string ] * Measurement // measurement name to object and index
series map [ uint32 ] * Series // map series id to the Series object
names [ ] string // sorted list of the measurement names
2014-12-31 23:55:45 +00:00
}
// newDatabase returns an instance of database.
func newDatabase ( ) * database {
return & database {
2015-01-20 02:44:47 +00:00
policies : make ( map [ string ] * RetentionPolicy ) ,
continuousQueries : make ( [ ] * ContinuousQuery , 0 ) ,
measurements : make ( map [ string ] * Measurement ) ,
series : make ( map [ uint32 ] * Series ) ,
names : make ( [ ] string , 0 ) ,
2014-12-31 23:55:45 +00:00
}
}
2015-01-12 20:10:26 +00:00
// shardGroupByTimestamp returns a shard group that owns a given timestamp.
2015-01-10 15:48:50 +00:00
func ( db * database ) shardGroupByTimestamp ( policy string , timestamp time . Time ) ( * ShardGroup , error ) {
2014-12-31 23:55:45 +00:00
p := db . policies [ policy ]
if p == nil {
return nil , ErrRetentionPolicyNotFound
}
2015-01-10 15:48:50 +00:00
return p . shardGroupByTimestamp ( timestamp ) , nil
2014-12-31 23:55:45 +00:00
}
2015-01-28 04:36:19 +00:00
// Series takes a series ID and returns a series.
2015-02-01 18:47:48 +00:00
func ( db * database ) Series ( id uint32 ) * Series {
return db . series [ id ]
2015-01-28 04:36:19 +00:00
}
2014-12-31 23:55:45 +00:00
// MarshalJSON encodes a database into a JSON-encoded byte slice.
func ( db * database ) MarshalJSON ( ) ( [ ] byte , error ) {
// Copy over properties to intermediate type.
var o databaseJSON
o . Name = db . name
o . DefaultRetentionPolicy = db . defaultRetentionPolicy
for _ , rp := range db . policies {
o . Policies = append ( o . Policies , rp )
}
2015-01-20 02:44:47 +00:00
o . ContinuousQueries = db . continuousQueries
2014-12-31 23:55:45 +00:00
return json . Marshal ( & o )
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
func ( db * database ) UnmarshalJSON ( data [ ] byte ) error {
// Decode into intermediate type.
var o databaseJSON
if err := json . Unmarshal ( data , & o ) ; err != nil {
return err
}
// Copy over properties from intermediate type.
db . name = o . Name
db . defaultRetentionPolicy = o . DefaultRetentionPolicy
// Copy shard policies.
db . policies = make ( map [ string ] * RetentionPolicy )
for _ , rp := range o . Policies {
db . policies [ rp . Name ] = rp
}
2015-01-20 02:44:47 +00:00
// we need the parsed continuous queries to be in the in memory index
db . continuousQueries = make ( [ ] * ContinuousQuery , 0 , len ( o . ContinuousQueries ) )
for _ , cq := range o . ContinuousQueries {
c , _ := NewContinuousQuery ( cq . Query )
db . continuousQueries = append ( db . continuousQueries , c )
}
2014-12-31 23:55:45 +00:00
return nil
}
// databaseJSON represents the JSON-serialization format for a database.
type databaseJSON struct {
Name string ` json:"name,omitempty" `
DefaultRetentionPolicy string ` json:"defaultRetentionPolicy,omitempty" `
Policies [ ] * RetentionPolicy ` json:"policies,omitempty" `
2015-01-20 02:44:47 +00:00
ContinuousQueries [ ] * ContinuousQuery ` json:"continuousQueries,omitempty" `
2014-12-31 23:55:45 +00:00
}
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. These structures are accessed through private methods on the Measurement
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
// go routine safe access.
type Measurement struct {
2015-01-10 20:22:57 +00:00
Name string ` json:"name,omitempty" `
Fields [ ] * Field ` json:"fields,omitempty" `
2014-12-31 23:55:45 +00:00
2015-01-23 09:44:56 +00:00
// in-memory index fields
2014-12-31 23:55:45 +00:00
series map [ string ] * Series // sorted tagset string to the series object
seriesByID map [ uint32 ] * Series // lookup table for series by their id
measurement * Measurement
2015-01-23 09:44:56 +00:00
seriesByTagKeyValue map [ string ] map [ string ] seriesIDs // map from tag key to value to sorted set of series ids
2015-01-26 12:19:35 +00:00
seriesIDs seriesIDs // sorted list of series IDs in this measurement
2014-12-31 23:55:45 +00:00
}
2015-02-01 18:47:48 +00:00
// NewMeasurement allocates and initializes a new Measurement.
2014-12-31 23:55:45 +00:00
func NewMeasurement ( name string ) * Measurement {
return & Measurement {
Name : name ,
2015-01-10 20:22:57 +00:00
Fields : make ( [ ] * Field , 0 ) ,
2014-12-31 23:55:45 +00:00
series : make ( map [ string ] * Series ) ,
seriesByID : make ( map [ uint32 ] * Series ) ,
2015-01-23 09:44:56 +00:00
seriesByTagKeyValue : make ( map [ string ] map [ string ] seriesIDs ) ,
2015-01-26 12:19:35 +00:00
seriesIDs : make ( seriesIDs , 0 ) ,
2014-12-31 23:55:45 +00:00
}
}
2015-01-14 23:44:09 +00:00
// createFieldIfNotExists creates a new field with an autoincrementing ID.
2015-02-13 22:24:24 +00:00
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func ( m * Measurement ) createFieldIfNotExists ( name string , typ influxql . DataType ) error {
2015-01-10 20:22:57 +00:00
// Ignore if the field already exists.
if f := m . FieldByName ( name ) ; f != nil {
2015-02-13 22:24:24 +00:00
if f . Type != typ {
return ErrFieldTypeConflict
}
return nil
2015-01-10 20:22:57 +00:00
}
// Only 255 fields are allowed. If we go over that then return an error.
2015-01-14 23:44:09 +00:00
if len ( m . Fields ) + 1 > math . MaxUint8 {
2015-02-13 22:24:24 +00:00
return ErrFieldOverflow
2015-01-10 20:22:57 +00:00
}
// Create and append a new field.
f := & Field {
2015-01-14 23:44:09 +00:00
ID : uint8 ( len ( m . Fields ) + 1 ) ,
2015-01-10 20:22:57 +00:00
Name : name ,
Type : typ ,
}
m . Fields = append ( m . Fields , f )
2015-02-13 22:24:24 +00:00
return nil
2015-01-10 20:22:57 +00:00
}
// Field returns a field by id.
func ( m * Measurement ) Field ( id uint8 ) * Field {
2015-01-31 07:21:05 +00:00
if int ( id ) > len ( m . Fields ) {
return nil
2015-01-10 20:22:57 +00:00
}
2015-01-31 07:21:05 +00:00
return m . Fields [ id - 1 ]
2015-01-10 20:22:57 +00:00
}
// FieldByName returns a field by name.
func ( m * Measurement ) FieldByName ( name string ) * Field {
for _ , f := range m . Fields {
if f . Name == name {
return f
}
}
return nil
}
2014-12-31 23:55:45 +00:00
// addSeries will add a series to the measurementIndex. Returns false if already present
func ( m * Measurement ) addSeries ( s * Series ) bool {
if _ , ok := m . seriesByID [ s . ID ] ; ok {
return false
}
m . seriesByID [ s . ID ] = s
tagset := string ( marshalTags ( s . Tags ) )
m . series [ tagset ] = s
2015-01-26 12:19:35 +00:00
m . seriesIDs = append ( m . seriesIDs , s . ID )
2014-12-31 23:55:45 +00:00
// the series ID should always be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
2015-01-26 12:19:35 +00:00
if len ( m . seriesIDs ) > 1 && m . seriesIDs [ len ( m . seriesIDs ) - 1 ] < m . seriesIDs [ len ( m . seriesIDs ) - 2 ] {
sort . Sort ( m . seriesIDs )
2014-12-31 23:55:45 +00:00
}
// add this series id to the tag index on the measurement
for k , v := range s . Tags {
valueMap := m . seriesByTagKeyValue [ k ]
if valueMap == nil {
2015-01-23 09:44:56 +00:00
valueMap = make ( map [ string ] seriesIDs )
2014-12-31 23:55:45 +00:00
m . seriesByTagKeyValue [ k ] = valueMap
}
ids := valueMap [ v ]
ids = append ( ids , s . ID )
// most of the time the series ID will be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len ( ids ) > 1 && ids [ len ( ids ) - 1 ] < ids [ len ( ids ) - 2 ] {
sort . Sort ( ids )
}
valueMap [ v ] = ids
}
return true
}
// seriesByTags returns the Series that matches the given tagset.
func ( m * Measurement ) seriesByTags ( tags map [ string ] string ) * Series {
return m . series [ string ( marshalTags ( tags ) ) ]
}
2015-01-27 23:55:59 +00:00
func ( m * Measurement ) seriesIDsAndFilters ( stmt * influxql . SelectStatement ) ( seriesIDs , map [ uint32 ] influxql . Expr ) {
seriesIdsToExpr := make ( map [ uint32 ] influxql . Expr )
if stmt . Condition == nil {
return m . seriesIDs , nil
}
2015-01-28 01:07:55 +00:00
ids , _ , _ := m . walkWhereForSeriesIds ( stmt . Condition , seriesIdsToExpr )
2015-02-05 06:29:19 +00:00
// ids will be empty if all they had was a time in the where clause. so return all measurement series ids
if len ( ids ) == 0 && stmt . OnlyTimeDimensions ( ) {
return m . seriesIDs , nil
}
2015-01-27 23:55:59 +00:00
return ids , seriesIdsToExpr
}
// tagSets returns the unique tag sets that exist for the given tag keys. This is used to determine
// what composite series will be created by a group by. i.e. "group by region" should return:
// {"region":"uswest"}, {"region":"useast"}
// or region, service returns
// {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc...
func ( m * Measurement ) tagSets ( stmt * influxql . SelectStatement , dimensions [ ] string ) map [ string ] map [ uint32 ] influxql . Expr {
// get the unique set of series ids and the filters that should be applied to each
seriesIDs , filters := m . seriesIDsAndFilters ( stmt )
// build the tag sets
tagSets := make ( map [ string ] map [ uint32 ] influxql . Expr )
for _ , id := range seriesIDs {
// get the series and set the tag values for the dimensions we care about
s := m . seriesByID [ id ]
tags := make ( [ ] string , len ( dimensions ) )
for i , dim := range dimensions {
tags [ i ] = s . Tags [ dim ]
}
// marshal it into a string and put this series and its expr into the tagSets map
t := string ( influxql . MarshalStrings ( tags ) )
set , ok := tagSets [ t ]
if ! ok {
set = make ( map [ uint32 ] influxql . Expr )
}
set [ id ] = filters [ id ]
tagSets [ t ] = set
}
return tagSets
}
// idsForExpr will return a collection of series ids, a bool indicating if the result should be
// used (it'll be false if it's a time expr) and a field expression if the passed in expression is against a field.
func ( m * Measurement ) idsForExpr ( n * influxql . BinaryExpr ) ( seriesIDs , bool , influxql . Expr ) {
name , ok := n . LHS . ( * influxql . VarRef )
value := n . RHS
if ! ok {
name , _ = n . RHS . ( * influxql . VarRef )
value = n . LHS
}
// ignore time literals
if _ , ok := value . ( * influxql . TimeLiteral ) ; ok {
return nil , false , nil
}
// if it's a field we can't collapse it so we have to look at all series ids for this
if m . FieldByName ( name . Val ) != nil {
return m . seriesIDs , true , n
}
// tag values can only be strings so if it's not a string this is an empty set
str , ok := value . ( * influxql . StringLiteral )
if ! ok {
return nil , true , nil
}
vals , ok := m . seriesByTagKeyValue [ name . Val ]
if ! ok {
return nil , true , nil
}
return vals [ str . Val ] , true , nil
}
// walkWhereForSeriesIds will recursively walk the where clause and return a collection of series ids, a boolean indicating if this return
// value should be included in the resulting set, and an expression if the return is a field expression.
// The map that it takes maps each series id to the field expression that should be used to evaluate it when iterating over its cursor.
// Series that have no field expressions won't be in the map
2015-01-30 22:31:31 +00:00
func ( m * Measurement ) walkWhereForSeriesIds ( expr influxql . Expr , filters map [ uint32 ] influxql . Expr ) ( seriesIDs , bool , influxql . Expr ) {
switch n := expr . ( type ) {
2015-01-27 23:55:59 +00:00
case * influxql . BinaryExpr :
// if it's EQ then it's either a field expression or against a tag. we can return this
if n . Op == influxql . EQ {
ids , shouldInclude , expr := m . idsForExpr ( n )
return ids , shouldInclude , expr
} else if n . Op == influxql . AND || n . Op == influxql . OR { // if it's an AND or OR we need to union or intersect the results
var ids seriesIDs
l , il , lexpr := m . walkWhereForSeriesIds ( n . LHS , filters )
r , ir , rexpr := m . walkWhereForSeriesIds ( n . RHS , filters )
if il && ir { // we should include both the LHS and RHS of the BinaryExpr in the return
if n . Op == influxql . AND {
ids = l . intersect ( r )
} else if n . Op == influxql . OR {
ids = l . union ( r )
}
} else if ! il && ! ir { // we don't need to include either so return nothing
return nil , false , nil
} else if il { // just include the left side
ids = l
} else { // just include the right side
ids = r
}
if n . Op == influxql . OR && il && ir && ( lexpr == nil || rexpr == nil ) {
// if it's an OR and we're going to include both sides and one of those expression is nil,
// we need to clear out restrictive filters on series that don't need them anymore
idsToClear := l . intersect ( r )
for _ , id := range idsToClear {
delete ( filters , id )
}
} else {
// put the LHS field expression into the filters
if lexpr != nil {
for _ , id := range ids {
f := filters [ id ]
if f == nil {
filters [ id ] = lexpr
} else {
filters [ id ] = & influxql . BinaryExpr { LHS : f , RHS : lexpr , Op : n . Op }
}
}
}
// put the RHS field expression into the filters
if rexpr != nil {
for _ , id := range ids {
f := filters [ id ]
if f == nil {
filters [ id ] = rexpr
} else {
filters [ id ] = & influxql . BinaryExpr { LHS : f , RHS : rexpr , Op : n . Op }
}
}
}
// if the op is AND and we include both, clear out any of the non-intersecting ids.
// that is, filters that are no longer part of the end result set
if n . Op == influxql . AND && il && ir {
filtersToClear := l . union ( r ) . reject ( ids )
for _ , id := range filtersToClear {
delete ( filters , id )
}
}
}
// finally return the ids and say that we should include them
return ids , true , nil
}
return m . idsForExpr ( n )
case * influxql . ParenExpr :
// walk down the tree
return m . walkWhereForSeriesIds ( n . Expr , filters )
default :
return nil , false , nil
}
}
2015-01-23 09:44:56 +00:00
// expandExpr returns a list of expressions expanded by all possible tag combinations.
func ( m * Measurement ) expandExpr ( expr influxql . Expr ) [ ] tagSetExpr {
// Retrieve list of unique values for each tag.
valuesByTagKey := m . uniqueTagValues ( expr )
// Convert keys to slices.
keys := make ( [ ] string , 0 , len ( valuesByTagKey ) )
for key := range valuesByTagKey {
keys = append ( keys , key )
}
sort . Strings ( keys )
2014-12-31 23:55:45 +00:00
2015-01-23 09:44:56 +00:00
// Order uniques by key.
uniques := make ( [ ] [ ] string , len ( keys ) )
for i , key := range keys {
uniques [ i ] = valuesByTagKey [ key ]
}
// Reduce a condition for each combination of tag values.
2015-01-26 12:19:35 +00:00
return expandExprWithValues ( expr , keys , [ ] tagExpr { } , uniques , 0 )
2015-01-23 09:44:56 +00:00
}
2015-01-26 12:19:35 +00:00
func expandExprWithValues ( expr influxql . Expr , keys [ ] string , tagExprs [ ] tagExpr , uniques [ ] [ ] string , index int ) [ ] tagSetExpr {
2015-01-23 09:44:56 +00:00
// If we have no more keys left then execute the reduction and return.
if index == len ( keys ) {
2015-01-26 12:19:35 +00:00
// Create a map of tag key/values.
2015-01-23 09:44:56 +00:00
m := make ( map [ string ] * string , len ( keys ) )
for i , key := range keys {
2015-01-26 12:19:35 +00:00
if tagExprs [ i ] . op == influxql . EQ {
m [ key ] = & tagExprs [ i ] . values [ 0 ]
2014-12-31 23:55:45 +00:00
} else {
2015-01-23 09:44:56 +00:00
m [ key ] = nil
2014-12-31 23:55:45 +00:00
}
}
2015-01-26 12:19:35 +00:00
// TODO: Rewrite full expressions instead of VarRef replacement.
2015-01-23 09:44:56 +00:00
// Reduce using the current tag key/value set.
// Ignore it if reduces down to "false".
e := influxql . Reduce ( expr , & tagValuer { tags : m } )
if e , ok := e . ( * influxql . BooleanLiteral ) ; ok && e . Val == false {
return nil
}
2014-12-31 23:55:45 +00:00
2015-01-26 12:19:35 +00:00
return [ ] tagSetExpr { { values : copyTagExprs ( tagExprs ) , expr : e } }
2014-12-31 23:55:45 +00:00
}
2015-01-26 12:19:35 +00:00
// Otherwise expand for each possible equality value of the key.
2015-01-23 09:44:56 +00:00
var exprs [ ] tagSetExpr
for _ , v := range uniques [ index ] {
2015-01-26 12:19:35 +00:00
exprs = append ( exprs , expandExprWithValues ( expr , keys , append ( tagExprs , tagExpr { keys [ index ] , [ ] string { v } , influxql . EQ } ) , uniques , index + 1 ) ... )
2015-01-23 09:44:56 +00:00
}
2015-01-26 12:19:35 +00:00
exprs = append ( exprs , expandExprWithValues ( expr , keys , append ( tagExprs , tagExpr { keys [ index ] , uniques [ index ] , influxql . NEQ } ) , uniques , index + 1 ) ... )
2015-01-23 09:44:56 +00:00
return exprs
}
2015-01-30 22:31:31 +00:00
// seriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func ( m * Measurement ) seriesIDsAllOrByExpr ( expr influxql . Expr ) ( seriesIDs , error ) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
if expr == nil {
return m . seriesIDs , nil
} else if len ( m . seriesIDs ) == 0 {
return nil , nil
}
// Get series IDs that match the WHERE clause.
filters := map [ uint32 ] influxql . Expr { }
ids , _ , _ := m . walkWhereForSeriesIds ( expr , filters )
return ids , nil
}
2015-01-23 09:44:56 +00:00
// tagValuer is used during expression expansion to evaluate all sets of tag values.
type tagValuer struct {
tags map [ string ] * string
2014-12-31 23:55:45 +00:00
}
2015-01-23 09:44:56 +00:00
// Value returns the string value of a tag and true if it's listed in the tagset.
func ( v * tagValuer ) Value ( name string ) ( interface { } , bool ) {
if value , ok := v . tags [ name ] ; ok {
if value == nil {
return nil , true
}
return * value , true
2014-12-31 23:55:45 +00:00
}
2015-01-23 09:44:56 +00:00
return nil , false
2014-12-31 23:55:45 +00:00
}
2015-01-23 09:44:56 +00:00
// tagSetExpr represents a set of tag keys/values and associated expression.
type tagSetExpr struct {
2015-01-26 12:19:35 +00:00
values [ ] tagExpr
expr influxql . Expr
}
// tagExpr represents one or more values assigned to a given tag.
type tagExpr struct {
key string
values [ ] string
op influxql . Token // EQ or NEQ
}
func copyTagExprs ( a [ ] tagExpr ) [ ] tagExpr {
other := make ( [ ] tagExpr , len ( a ) )
copy ( other , a )
return other
2015-01-23 09:44:56 +00:00
}
2015-01-10 20:22:57 +00:00
2015-01-23 09:44:56 +00:00
// uniqueTagValues returns a list of unique tag values used in an expression.
func ( m * Measurement ) uniqueTagValues ( expr influxql . Expr ) map [ string ] [ ] string {
// Track unique value per tag.
tags := make ( map [ string ] map [ string ] struct { } )
// Find all tag values referenced in the expression.
influxql . WalkFunc ( expr , func ( n influxql . Node ) {
switch n := n . ( type ) {
case * influxql . BinaryExpr :
2015-01-26 12:19:35 +00:00
// Ignore operators that are not equality.
2015-01-23 09:44:56 +00:00
if n . Op != influxql . EQ {
return
}
// Extract ref and string literal.
var key , value string
switch lhs := n . LHS . ( type ) {
case * influxql . VarRef :
if rhs , ok := n . RHS . ( * influxql . StringLiteral ) ; ok {
key , value = lhs . Val , rhs . Val
}
case * influxql . StringLiteral :
if rhs , ok := n . RHS . ( * influxql . VarRef ) ; ok {
key , value = rhs . Val , lhs . Val
}
}
if key == "" {
return
}
// Add value to set.
if tags [ key ] == nil {
tags [ key ] = make ( map [ string ] struct { } )
}
tags [ key ] [ value ] = struct { } { }
2015-01-10 15:48:50 +00:00
}
2015-01-23 09:44:56 +00:00
} )
// Convert to map of slices.
out := make ( map [ string ] [ ] string )
for k , values := range tags {
out [ k ] = make ( [ ] string , 0 , len ( values ) )
for v := range values {
out [ k ] = append ( out [ k ] , v )
}
sort . Strings ( out [ k ] )
2015-01-10 15:48:50 +00:00
}
2015-01-23 09:44:56 +00:00
return out
2015-01-10 15:48:50 +00:00
}
2015-02-01 18:47:48 +00:00
// Measurements represents a list of *Measurement.
2014-12-31 23:55:45 +00:00
type Measurements [ ] * Measurement
2015-02-01 18:47:48 +00:00
func ( a Measurements ) Len ( ) int { return len ( a ) }
func ( a Measurements ) Less ( i , j int ) bool { return a [ i ] . Name < a [ j ] . Name }
func ( a Measurements ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
func ( a Measurements ) intersect ( other Measurements ) Measurements {
l := a
r := other
// we want to iterate through the shortest one and stop
if len ( other ) < len ( a ) {
l = other
r = a
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i , j int
result := make ( Measurements , 0 , len ( l ) )
for i < len ( l ) && j < len ( r ) {
if l [ i ] . Name == r [ j ] . Name {
result = append ( result , l [ i ] )
i ++
j ++
} else if l [ i ] . Name < r [ j ] . Name {
i ++
} else {
j ++
}
}
return result
}
func ( a Measurements ) union ( other Measurements ) Measurements {
result := make ( Measurements , 0 , len ( a ) + len ( other ) )
var i , j int
for i < len ( a ) && j < len ( other ) {
if a [ i ] . Name == other [ j ] . Name {
result = append ( result , a [ i ] )
i ++
j ++
} else if a [ i ] . Name < other [ j ] . Name {
result = append ( result , a [ i ] )
i ++
} else {
result = append ( result , other [ j ] )
j ++
}
}
// now append the remainder
if i < len ( a ) {
result = append ( result , a [ i : ] ... )
} else if j < len ( other ) {
result = append ( result , other [ j : ] ... )
}
return result
}
2014-12-31 23:55:45 +00:00
// Field represents a series field.
type Field struct {
2015-01-13 17:16:43 +00:00
ID uint8 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
Type influxql . DataType ` json:"type,omitempty" `
2014-12-31 23:55:45 +00:00
}
// Fields represents a list of fields.
type Fields [ ] * Field
2015-02-16 19:40:38 +00:00
// FieldCodec providecs encoding and decoding functionality for the fields of a given
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
type FieldCodec struct {
fieldsByID map [ uint8 ] * Field
fieldsByName map [ string ] * Field
}
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
func NewFieldCodec ( m * Measurement ) * FieldCodec {
fieldsByID := make ( map [ uint8 ] * Field , len ( m . Fields ) )
fieldsByName := make ( map [ string ] * Field , len ( m . Fields ) )
for _ , f := range m . Fields {
fieldsByID [ f . ID ] = f
fieldsByName [ f . Name ] = f
}
return & FieldCodec { fieldsByID : fieldsByID , fieldsByName : fieldsByName }
}
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
func ( f * FieldCodec ) EncodeFields ( values map [ string ] interface { } ) ( [ ] byte , error ) {
// Allocate byte slice and write field count.
b := make ( [ ] byte , 1 , 10 )
b [ 0 ] = byte ( len ( values ) )
for k , v := range values {
field := f . fieldsByName [ k ]
if field == nil {
panic ( fmt . Sprintf ( "field does not exist for %s" , k ) )
} else if influxql . InspectDataType ( v ) != field . Type {
2015-02-20 06:19:16 +00:00
return nil , fmt . Errorf ( "field \"%s\" is type %T, mapped as type %s" , k , k , field . Type )
2015-02-16 19:40:38 +00:00
}
var buf [ ] byte
switch field . Type {
case influxql . Number :
var value float64
// Convert integers to floats.
if intval , ok := v . ( int ) ; ok {
value = float64 ( intval )
} else {
value = v . ( float64 )
}
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , math . Float64bits ( value ) )
case influxql . Boolean :
value := v . ( bool )
// Only 1 byte need for a boolean.
buf = make ( [ ] byte , 2 )
if value {
buf [ 1 ] = byte ( 1 )
}
case influxql . String :
value := v . ( string )
if len ( value ) > maxStringLength {
value = value [ : maxStringLength ]
}
2015-02-17 20:33:50 +00:00
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
2015-02-16 19:40:38 +00:00
buf = make ( [ ] byte , len ( value ) + 3 )
// Set the string length, then copy the string itself.
binary . BigEndian . PutUint16 ( buf [ 1 : 3 ] , uint16 ( len ( value ) ) )
for i , c := range [ ] byte ( value ) {
buf [ i + 3 ] = byte ( c )
}
default :
panic ( fmt . Sprintf ( "unsupported value type: %T" , v ) )
}
// Always set the field ID as the leading byte.
buf [ 0 ] = field . ID
// Append temp buffer to the end.
b = append ( b , buf ... )
}
return b , nil
}
2015-02-17 00:22:42 +00:00
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
func ( f * FieldCodec ) DecodeByID ( targetID uint8 , b [ ] byte ) ( interface { } , error ) {
if len ( b ) == 0 {
return 0 , ErrFieldNotFound
}
// Read the field count from the field byte.
n := int ( b [ 0 ] )
// Start from the second byte and iterate over until we're done decoding.
b = b [ 1 : ]
for i := 0 ; i < n ; i ++ {
field , ok := f . fieldsByID [ b [ 0 ] ]
if ! ok {
panic ( fmt . Sprintf ( "field ID %d has no mapping" , b [ 0 ] ) )
}
var value interface { }
switch field . Type {
case influxql . Number :
// Move bytes forward.
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
2015-02-17 20:33:50 +00:00
value = string ( b [ 3 : 3 + size ] )
2015-02-17 00:22:42 +00:00
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type: %T" , field . Type ) )
}
if field . ID == targetID {
return value , nil
}
}
2015-02-16 19:40:38 +00:00
2015-02-17 00:22:42 +00:00
return 0 , ErrFieldNotFound
2015-02-16 19:40:38 +00:00
}
2015-02-17 00:43:35 +00:00
// DecodeFields decodes a byte slice into a set of field ids and values.
func ( f * FieldCodec ) DecodeFields ( b [ ] byte ) map [ uint8 ] interface { } {
if len ( b ) == 0 {
return nil
}
// Read the field count from the field byte.
n := int ( b [ 0 ] )
// Create a map to hold the decoded data.
values := make ( map [ uint8 ] interface { } , n )
// Start from the second byte and iterate over until we're done decoding.
b = b [ 1 : ]
for i := 0 ; i < n ; i ++ {
// First byte is the field identifier.
fieldID := b [ 0 ]
field := f . fieldsByID [ fieldID ]
if field == nil {
panic ( fmt . Sprintf ( "field ID %d has no mapping" , fieldID ) )
}
var value interface { }
switch field . Type {
case influxql . Number :
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : size ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type: %T" , f . fieldsByID [ fieldID ] ) )
}
values [ fieldID ] = value
}
return values
}
2014-12-31 23:55:45 +00:00
// Series belong to a Measurement and represent unique time series in a database
type Series struct {
ID uint32
Tags map [ string ] string
2015-01-02 19:24:48 +00:00
measurement * Measurement
2014-12-31 23:55:45 +00:00
}
2015-01-13 17:16:43 +00:00
// match returns true if all tags match the series' tags.
func ( s * Series ) match ( tags map [ string ] string ) bool {
for k , v := range tags {
if s . Tags [ k ] != v {
return false
}
}
return true
}
2015-01-26 12:19:35 +00:00
// seriesIDs is a convenience type for sorting, checking equality, and doing
// union and intersection of collections of series ids.
2015-01-23 09:44:56 +00:00
type seriesIDs [ ] uint32
2015-02-01 18:47:48 +00:00
func ( a seriesIDs ) Len ( ) int { return len ( a ) }
func ( a seriesIDs ) Less ( i , j int ) bool { return a [ i ] < a [ j ] }
func ( a seriesIDs ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
2015-01-23 09:44:56 +00:00
2015-01-26 12:19:35 +00:00
// equals assumes that both are sorted.
func ( a seriesIDs ) equals ( other seriesIDs ) bool {
if len ( a ) != len ( other ) {
return false
}
for i , s := range other {
if a [ i ] != s {
return false
}
}
return true
}
// intersect returns a new collection of series ids in sorted order that is the intersection of the two.
// The two collections must already be sorted.
func ( a seriesIDs ) intersect ( other seriesIDs ) seriesIDs {
l := a
r := other
// we want to iterate through the shortest one and stop
if len ( other ) < len ( a ) {
l = other
r = a
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i , j int
ids := make ( [ ] uint32 , 0 , len ( l ) )
for i < len ( l ) {
if l [ i ] == r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
j ++
2015-01-26 12:19:35 +00:00
} else if l [ i ] < r [ j ] {
2015-02-01 18:47:48 +00:00
i ++
2015-01-26 12:19:35 +00:00
} else {
2015-02-01 18:47:48 +00:00
j ++
2015-01-26 12:19:35 +00:00
}
}
return seriesIDs ( ids )
}
// union returns a new collection of series ids in sorted order that is the union of the two.
// The two collections must already be sorted.
2015-02-01 18:47:48 +00:00
func ( a seriesIDs ) union ( other seriesIDs ) seriesIDs {
l := a
r := other
2015-01-26 12:19:35 +00:00
ids := make ( [ ] uint32 , 0 , len ( l ) + len ( r ) )
var i , j int
for i < len ( l ) && j < len ( r ) {
if l [ i ] == r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
j ++
2015-01-26 12:19:35 +00:00
} else if l [ i ] < r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
2015-01-26 12:19:35 +00:00
} else {
ids = append ( ids , r [ j ] )
2015-02-01 18:47:48 +00:00
j ++
2015-01-26 12:19:35 +00:00
}
}
// now append the remainder
if i < len ( l ) {
ids = append ( ids , l [ i : ] ... )
} else if j < len ( r ) {
ids = append ( ids , r [ j : ] ... )
}
return ids
}
// reject returns a new collection of series ids in sorted order with the passed in set removed from the original.
// This is useful for the NOT operator. The two collections must already be sorted.
2015-02-01 18:47:48 +00:00
func ( a seriesIDs ) reject ( other seriesIDs ) seriesIDs {
l := a
r := other
2015-01-26 12:19:35 +00:00
var i , j int
ids := make ( [ ] uint32 , 0 , len ( l ) )
for i < len ( l ) && j < len ( r ) {
if l [ i ] == r [ j ] {
2015-02-01 18:47:48 +00:00
i ++
j ++
2015-01-26 12:19:35 +00:00
} else if l [ i ] < r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
2015-01-26 12:19:35 +00:00
} else {
2015-02-01 18:47:48 +00:00
j ++
2015-01-26 12:19:35 +00:00
}
}
// Append the remainder
if i < len ( l ) {
ids = append ( ids , l [ i : ] ... )
}
return seriesIDs ( ids )
}
2014-12-31 23:55:45 +00:00
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
type RetentionPolicy struct {
// Unique name within database. Required.
2015-02-05 17:54:06 +00:00
Name string ` json:"name" `
2014-12-31 23:55:45 +00:00
// Length of time to keep data around
2015-02-05 17:54:06 +00:00
Duration time . Duration ` json:"duration" `
2014-12-31 23:55:45 +00:00
2015-01-10 15:48:50 +00:00
// The number of copies to make of each shard.
2015-02-05 17:54:06 +00:00
ReplicaN uint32 ` json:"replicaN" `
2014-12-31 23:55:45 +00:00
2015-01-15 17:35:42 +00:00
shardGroups [ ] * ShardGroup
2014-12-31 23:55:45 +00:00
}
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
func NewRetentionPolicy ( name string ) * RetentionPolicy {
return & RetentionPolicy {
Name : name ,
ReplicaN : DefaultReplicaN ,
Duration : DefaultShardRetention ,
}
}
2015-01-12 20:10:26 +00:00
// shardGroupByTimestamp returns the group in the policy that owns a timestamp.
// Returns nil group does not exist.
2015-01-10 15:48:50 +00:00
func ( rp * RetentionPolicy ) shardGroupByTimestamp ( timestamp time . Time ) * ShardGroup {
2015-01-15 17:35:42 +00:00
for _ , g := range rp . shardGroups {
2015-01-10 15:48:50 +00:00
if timeBetweenInclusive ( timestamp , g . StartTime , g . EndTime ) {
return g
2014-12-31 23:55:45 +00:00
}
}
2015-01-10 15:48:50 +00:00
return nil
2014-12-31 23:55:45 +00:00
}
2015-02-10 22:07:41 +00:00
// shardGroupByID returns the group in the policy for the given ID.
// Returns nil if group does not exist.
func ( rp * RetentionPolicy ) shardGroupByID ( shardID uint64 ) * ShardGroup {
for _ , g := range rp . shardGroups {
if g . ID == shardID {
return g
}
}
return nil
}
2015-02-10 23:43:03 +00:00
func ( rp * RetentionPolicy ) removeShardGroupByID ( shardID uint64 ) {
for i , g := range rp . shardGroups {
if g . ID == shardID {
rp . shardGroups [ i ] = nil
rp . shardGroups = append ( rp . shardGroups [ : i ] , rp . shardGroups [ i + 1 : ] ... )
}
}
}
2014-12-31 23:55:45 +00:00
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
func ( rp * RetentionPolicy ) MarshalJSON ( ) ( [ ] byte , error ) {
2015-01-10 15:48:50 +00:00
var o retentionPolicyJSON
o . Name = rp . Name
o . Duration = rp . Duration
o . ReplicaN = rp . ReplicaN
2015-01-15 17:35:42 +00:00
for _ , g := range rp . shardGroups {
o . ShardGroups = append ( o . ShardGroups , g )
2015-01-10 15:48:50 +00:00
}
return json . Marshal ( & o )
2014-12-31 23:55:45 +00:00
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
func ( rp * RetentionPolicy ) UnmarshalJSON ( data [ ] byte ) error {
// Decode into intermediate type.
var o retentionPolicyJSON
if err := json . Unmarshal ( data , & o ) ; err != nil {
return err
}
// Copy over properties from intermediate type.
rp . Name = o . Name
rp . ReplicaN = o . ReplicaN
rp . Duration = o . Duration
2015-01-15 17:35:42 +00:00
rp . shardGroups = o . ShardGroups
2014-12-31 23:55:45 +00:00
return nil
}
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
type retentionPolicyJSON struct {
2015-01-15 17:35:42 +00:00
Name string ` json:"name" `
ReplicaN uint32 ` json:"replicaN,omitempty" `
SplitN uint32 ` json:"splitN,omitempty" `
Duration time . Duration ` json:"duration,omitempty" `
ShardGroups [ ] * ShardGroup ` json:"shardGroups,omitempty" `
2014-12-31 23:55:45 +00:00
}
2015-01-02 19:02:02 +00:00
// TagFilter represents a tag filter when looking up other tags or measurements.
type TagFilter struct {
2014-12-31 23:55:45 +00:00
Not bool
Key string
Value string
Regex * regexp . Regexp
}
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
// intersection of collections of series ids.
type SeriesIDs [ ] uint32
2015-02-01 18:47:48 +00:00
func ( a SeriesIDs ) Len ( ) int { return len ( a ) }
func ( a SeriesIDs ) Less ( i , j int ) bool { return a [ i ] < a [ j ] }
func ( a SeriesIDs ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
2014-12-31 23:55:45 +00:00
// Equals assumes that both are sorted. This is by design, no touchy!
func ( a SeriesIDs ) Equals ( seriesIDs SeriesIDs ) bool {
if len ( a ) != len ( seriesIDs ) {
return false
}
for i , s := range seriesIDs {
if a [ i ] != s {
return false
}
}
return true
}
// Intersect returns a new collection of series ids in sorted order that is the intersection of the two.
// The two collections must already be sorted.
func ( a SeriesIDs ) Intersect ( seriesIDs SeriesIDs ) SeriesIDs {
l := a
r := seriesIDs
// we want to iterate through the shortest one and stop
if len ( seriesIDs ) < len ( a ) {
l = seriesIDs
r = a
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i , j int
ids := make ( [ ] uint32 , 0 , len ( l ) )
2015-01-23 02:22:30 +00:00
for i < len ( l ) && j < len ( r ) {
2014-12-31 23:55:45 +00:00
if l [ i ] == r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
j ++
2014-12-31 23:55:45 +00:00
} else if l [ i ] < r [ j ] {
2015-02-01 18:47:48 +00:00
i ++
2014-12-31 23:55:45 +00:00
} else {
2015-02-01 18:47:48 +00:00
j ++
2014-12-31 23:55:45 +00:00
}
}
return SeriesIDs ( ids )
}
// Union returns a new collection of series ids in sorted order that is the union of the two.
// The two collections must already be sorted.
2015-02-01 18:47:48 +00:00
func ( a SeriesIDs ) Union ( other SeriesIDs ) SeriesIDs {
l := a
r := other
2014-12-31 23:55:45 +00:00
ids := make ( [ ] uint32 , 0 , len ( l ) + len ( r ) )
var i , j int
for i < len ( l ) && j < len ( r ) {
if l [ i ] == r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
j ++
2014-12-31 23:55:45 +00:00
} else if l [ i ] < r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
2014-12-31 23:55:45 +00:00
} else {
ids = append ( ids , r [ j ] )
2015-02-01 18:47:48 +00:00
j ++
2014-12-31 23:55:45 +00:00
}
}
// now append the remainder
if i < len ( l ) {
ids = append ( ids , l [ i : ] ... )
} else if j < len ( r ) {
ids = append ( ids , r [ j : ] ... )
}
return ids
}
// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. This is useful for the NOT operator.
// The two collections must already be sorted.
2015-02-01 18:47:48 +00:00
func ( a SeriesIDs ) Reject ( other SeriesIDs ) SeriesIDs {
l := a
r := other
2014-12-31 23:55:45 +00:00
var i , j int
ids := make ( [ ] uint32 , 0 , len ( l ) )
for i < len ( l ) && j < len ( r ) {
if l [ i ] == r [ j ] {
2015-02-01 18:47:48 +00:00
i ++
j ++
2014-12-31 23:55:45 +00:00
} else if l [ i ] < r [ j ] {
ids = append ( ids , l [ i ] )
2015-02-01 18:47:48 +00:00
i ++
2014-12-31 23:55:45 +00:00
} else {
2015-02-01 18:47:48 +00:00
j ++
2014-12-31 23:55:45 +00:00
}
}
// append the remainder
if i < len ( l ) {
ids = append ( ids , l [ i : ] ... )
}
return SeriesIDs ( ids )
}
2015-01-02 19:02:02 +00:00
// addSeriesToIndex adds the series for the given measurement to the index. Returns false if already present
2015-02-01 18:47:48 +00:00
func ( db * database ) addSeriesToIndex ( measurementName string , s * Series ) bool {
2014-12-31 23:55:45 +00:00
// if there is a measurement for this id, it's already been added
2015-02-01 18:47:48 +00:00
if db . series [ s . ID ] != nil {
2014-12-31 23:55:45 +00:00
return false
}
// get or create the measurement index and index it globally and in the measurement
2015-02-01 18:47:48 +00:00
idx := db . createMeasurementIfNotExists ( measurementName )
2014-12-31 23:55:45 +00:00
2015-01-02 19:24:48 +00:00
s . measurement = idx
2015-02-01 18:47:48 +00:00
db . series [ s . ID ] = s
2014-12-31 23:55:45 +00:00
// TODO: add this series to the global tag index
return idx . addSeries ( s )
}
// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
2015-02-01 18:47:48 +00:00
func ( db * database ) createMeasurementIfNotExists ( name string ) * Measurement {
idx := db . measurements [ name ]
2014-12-31 23:55:45 +00:00
if idx == nil {
idx = NewMeasurement ( name )
2015-02-01 18:47:48 +00:00
db . measurements [ name ] = idx
db . names = append ( db . names , name )
sort . Strings ( db . names )
2014-12-31 23:55:45 +00:00
}
return idx
}
// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set.
2015-02-01 18:47:48 +00:00
func ( db * database ) MeasurementAndSeries ( name string , tags map [ string ] string ) ( * Measurement , * Series ) {
idx := db . measurements [ name ]
2014-12-31 23:55:45 +00:00
if idx == nil {
return nil , nil
}
return idx , idx . seriesByTags ( tags )
}
2015-02-12 00:57:50 +00:00
// SeriesByID returns the Series that has the given id.
2015-01-20 02:44:47 +00:00
func ( d * database ) SeriesByID ( id uint32 ) * Series {
return d . series [ id ]
}
// Names returns all measurement names in sorted order.
2015-02-12 00:57:50 +00:00
func ( d * database ) MeasurementNames ( ) [ ] string {
2015-01-20 02:44:47 +00:00
return d . names
}
// DropSeries will clear the index of all references to a series.
func ( d * database ) DropSeries ( id uint32 ) {
panic ( "not implemented" )
}
// DropMeasurement will clear the index of all references to a measurement and its child series.
func ( d * database ) DropMeasurement ( name string ) {
panic ( "not implemented" )
}
func ( d * database ) continuousQueryByName ( name string ) * ContinuousQuery {
for _ , cq := range d . continuousQueries {
if cq . cq . Name == name {
return cq
}
}
return nil
}
2014-12-31 23:55:45 +00:00
// used to convert the tag set to bytes for use as a lookup key
func marshalTags ( tags map [ string ] string ) [ ] byte {
s := make ( [ ] string , 0 , len ( tags ) )
// pull out keys to sort
for k := range tags {
s = append ( s , k )
}
sort . Strings ( s )
// now append on the key values in key sorted order
for _ , k := range s {
s = append ( s , tags [ k ] )
}
return [ ] byte ( strings . Join ( s , "|" ) )
}
2015-01-13 17:16:43 +00:00
2015-01-23 09:44:56 +00:00
// timeBetweenInclusive returns true if t is between min and max, inclusive.
func timeBetweenInclusive ( t , min , max time . Time ) bool {
return ( t . Equal ( min ) || t . After ( min ) ) && ( t . Equal ( max ) || t . Before ( max ) )
2015-01-14 23:44:09 +00:00
}
2015-01-28 04:36:19 +00:00
// seriesIDs returns an array of series ids for the given measurements and filters to be applied to all.
// Filters are equivalent to an AND operation. If you want to do an OR, get the series IDs for one set,
// then get the series IDs for another set and use the SeriesIDs.Union to combine the two.
2015-02-01 18:47:48 +00:00
func ( db * database ) SeriesIDs ( names [ ] string , filters [ ] * TagFilter ) seriesIDs {
2015-01-28 04:36:19 +00:00
// they want all ids if no filters are specified
if len ( filters ) == 0 {
ids := seriesIDs ( make ( [ ] uint32 , 0 ) )
2015-02-01 18:47:48 +00:00
for _ , m := range db . measurements {
2015-01-28 04:36:19 +00:00
ids = ids . union ( m . seriesIDs )
}
return ids
}
ids := seriesIDs ( make ( [ ] uint32 , 0 ) )
for _ , n := range names {
2015-02-01 18:47:48 +00:00
ids = ids . union ( db . seriesIDsByName ( n , filters ) )
2015-01-28 04:36:19 +00:00
}
return ids
}
// seriesIDsByName is the same as SeriesIDs, but for a specific measurement.
2015-02-01 18:47:48 +00:00
func ( db * database ) seriesIDsByName ( name string , filters [ ] * TagFilter ) seriesIDs {
m := db . measurements [ name ]
2015-01-28 04:36:19 +00:00
if m == nil {
return nil
}
// process the filters one at a time to get the list of ids they return
idsPerFilter := make ( [ ] seriesIDs , len ( filters ) , len ( filters ) )
for i , filter := range filters {
idsPerFilter [ i ] = m . seriesIDsByFilter ( filter )
}
// collapse the set of ids
allIDs := idsPerFilter [ 0 ]
for i := 1 ; i < len ( filters ) ; i ++ {
allIDs = allIDs . intersect ( idsPerFilter [ i ] )
}
return allIDs
}
// seriesIDs returns the series ids for a given filter
func ( m * Measurement ) seriesIDsByFilter ( filter * TagFilter ) ( ids seriesIDs ) {
values := m . seriesByTagKeyValue [ filter . Key ]
if values == nil {
return
}
// handle regex filters
if filter . Regex != nil {
for k , v := range values {
if filter . Regex . MatchString ( k ) {
if ids == nil {
ids = v
} else {
ids = ids . union ( v )
}
}
}
if filter . Not {
ids = m . seriesIDs . reject ( ids )
}
return
}
// this is for the value is not null query
if filter . Not && filter . Value == "" {
for _ , v := range values {
if ids == nil {
ids = v
} else {
ids . intersect ( v )
}
}
return
}
// get the ids that have the given key/value tag pair
ids = seriesIDs ( values [ filter . Value ] )
// filter out these ids from the entire set if it's a not query
if filter . Not {
ids = m . seriesIDs . reject ( ids )
}
return
}
2015-01-28 05:51:09 +00:00
// measurementsByExpr takes and expression containing only tags and returns
// a list of matching *Measurement.
2015-02-01 18:47:48 +00:00
func ( db * database ) measurementsByExpr ( expr influxql . Expr ) ( Measurements , error ) {
2015-01-28 05:51:09 +00:00
switch e := expr . ( type ) {
case * influxql . BinaryExpr :
switch e . Op {
case influxql . EQ , influxql . NEQ :
tag , ok := e . LHS . ( * influxql . VarRef )
if ! ok {
return nil , fmt . Errorf ( "left side of '=' must be a tag name" )
}
value , ok := e . RHS . ( * influxql . StringLiteral )
if ! ok {
return nil , fmt . Errorf ( "right side of '=' must be a tag value string" )
}
tf := & TagFilter {
Not : e . Op == influxql . NEQ ,
Key : tag . Val ,
Value : value . Val ,
}
2015-02-01 18:47:48 +00:00
return db . measurementsByTagFilters ( [ ] * TagFilter { tf } ) , nil
2015-01-28 05:51:09 +00:00
case influxql . OR , influxql . AND :
2015-02-01 18:47:48 +00:00
lhsIDs , err := db . measurementsByExpr ( e . LHS )
2015-01-28 05:51:09 +00:00
if err != nil {
return nil , err
}
2015-02-01 18:47:48 +00:00
rhsIDs , err := db . measurementsByExpr ( e . RHS )
2015-01-28 05:51:09 +00:00
if err != nil {
return nil , err
}
if e . Op == influxql . OR {
return lhsIDs . union ( rhsIDs ) , nil
}
2015-02-01 18:47:48 +00:00
return lhsIDs . intersect ( rhsIDs ) , nil
2015-01-28 05:51:09 +00:00
default :
return nil , fmt . Errorf ( "invalid operator" )
}
case * influxql . ParenExpr :
2015-02-01 18:47:48 +00:00
return db . measurementsByExpr ( e . Expr )
2015-01-28 05:51:09 +00:00
}
return nil , fmt . Errorf ( "%#v" , expr )
}
2015-02-01 18:47:48 +00:00
func ( db * database ) measurementsByTagFilters ( filters [ ] * TagFilter ) Measurements {
2015-01-28 05:51:09 +00:00
// If no filters, then return all measurements.
if len ( filters ) == 0 {
2015-02-01 18:47:48 +00:00
measurements := make ( Measurements , 0 , len ( db . measurements ) )
for _ , m := range db . measurements {
2015-01-28 05:51:09 +00:00
measurements = append ( measurements , m )
}
return measurements
}
// Build a list of measurements matching the filters.
var measurements Measurements
var tagMatch bool
2015-02-01 18:47:48 +00:00
for _ , m := range db . measurements {
2015-01-28 05:51:09 +00:00
for _ , f := range filters {
tagMatch = false
if tagVals , ok := m . seriesByTagKeyValue [ f . Key ] ; ok {
if _ , ok := tagVals [ f . Value ] ; ok {
tagMatch = true
}
}
isEQ := ! f . Not
// tags match | operation is EQ | measurement matches
// --------------------------------------------------
// True | True | True
// True | False | False
// False | True | False
// False | False | True
if tagMatch == isEQ {
measurements = append ( measurements , m )
break
}
}
}
return measurements
}
// Measurements returns a list of all measurements.
2015-02-01 18:47:48 +00:00
func ( db * database ) Measurements ( ) Measurements {
measurements := make ( Measurements , 0 , len ( db . measurements ) )
for _ , m := range db . measurements {
2015-01-28 05:51:09 +00:00
measurements = append ( measurements , m )
}
return measurements
}
// tagKeys returns a list of the measurement's tag names.
func ( m * Measurement ) tagKeys ( ) [ ] string {
keys := make ( [ ] string , 0 , len ( m . seriesByTagKeyValue ) )
2015-02-01 18:47:48 +00:00
for k := range m . seriesByTagKeyValue {
2015-01-28 05:51:09 +00:00
keys = append ( keys , k )
}
2015-01-28 08:45:21 +00:00
sort . Strings ( keys )
2015-01-28 05:51:09 +00:00
return keys
}
2015-01-29 20:00:15 +00:00
func ( m * Measurement ) tagValuesByKeyAndSeriesID ( tagKeys [ ] string , ids seriesIDs ) stringSet {
// If no tag keys were passed, get all tag keys for the measurement.
if len ( tagKeys ) == 0 {
2015-02-01 18:47:48 +00:00
for k := range m . seriesByTagKeyValue {
2015-01-29 20:00:15 +00:00
tagKeys = append ( tagKeys , k )
}
}
// Make a set to hold all tag values found.
tagValues := newStringSet ( )
// Iterate all series to collect tag values.
for _ , id := range ids {
s , ok := m . seriesByID [ id ]
if ! ok {
continue
}
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _ , tagKey := range tagKeys {
if tagVal , ok := s . Tags [ tagKey ] ; ok {
tagValues . add ( tagVal )
}
}
}
return tagValues
}
type stringSet map [ string ] struct { }
func newStringSet ( ) stringSet {
return make ( map [ string ] struct { } )
}
func ( s stringSet ) add ( ss string ) {
s [ ss ] = struct { } { }
}
2015-01-30 22:31:31 +00:00
func ( s stringSet ) contains ( ss string ) bool {
_ , ok := s [ ss ]
return ok
}
2015-01-29 20:00:15 +00:00
func ( s stringSet ) list ( ) [ ] string {
l := make ( [ ] string , 0 , len ( s ) )
2015-02-01 18:47:48 +00:00
for k := range s {
2015-01-29 20:00:15 +00:00
l = append ( l , k )
}
return l
}
func ( s stringSet ) union ( o stringSet ) stringSet {
ns := newStringSet ( )
2015-02-01 18:47:48 +00:00
for k := range s {
2015-01-29 20:00:15 +00:00
ns [ k ] = struct { } { }
}
2015-02-01 18:47:48 +00:00
for k := range o {
2015-01-29 20:00:15 +00:00
ns [ k ] = struct { } { }
}
return ns
}
func ( s stringSet ) intersect ( o stringSet ) stringSet {
ns := newStringSet ( )
2015-02-01 18:47:48 +00:00
for k := range s {
2015-01-29 20:00:15 +00:00
if _ , ok := o [ k ] ; ok {
ns [ k ] = struct { } { }
}
}
2015-02-01 18:47:48 +00:00
for k := range o {
2015-01-29 20:00:15 +00:00
if _ , ok := s [ k ] ; ok {
ns [ k ] = struct { } { }
}
}
return ns
}