2015-05-22 20:08:43 +00:00
package tsdb
import (
"encoding/binary"
"encoding/json"
"errors"
2015-09-04 22:43:57 +00:00
"expvar"
2015-05-22 20:08:43 +00:00
"fmt"
2015-06-18 15:07:51 +00:00
"io"
2015-05-23 22:06:07 +00:00
"math"
2015-06-18 15:07:51 +00:00
"os"
2016-02-04 15:12:52 +00:00
"sort"
2016-03-06 14:52:34 +00:00
"strings"
2015-05-22 20:08:43 +00:00
"sync"
2015-11-04 21:06:06 +00:00
"github.com/gogo/protobuf/proto"
2016-02-10 17:26:18 +00:00
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/internal"
2015-05-22 20:08:43 +00:00
)
2015-09-04 22:43:57 +00:00
const (
2015-10-29 03:59:10 +00:00
statWriteReq = "writeReq"
statSeriesCreate = "seriesCreate"
statFieldsCreate = "fieldsCreate"
statWritePointsFail = "writePointsFail"
statWritePointsOK = "writePointsOk"
statWriteBytes = "writeBytes"
2015-09-04 22:43:57 +00:00
)
2015-06-18 15:07:51 +00:00
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors . New ( "field overflow" )
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors . New ( "field type conflict" )
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors . New ( "field not found" )
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors . New ( "field ID not mapped" )
)
2016-02-24 13:33:07 +00:00
// A ShardError implements the error interface, and contains extra
// context about the shard that generated the error.
type ShardError struct {
id uint64
Err error
}
// NewShardError returns a new ShardError.
2016-02-24 13:34:19 +00:00
func NewShardError ( id uint64 , err error ) error {
2016-02-24 13:33:07 +00:00
if err == nil {
2016-02-24 13:34:19 +00:00
return nil
2016-02-24 13:33:07 +00:00
}
return ShardError { id : id , Err : err }
}
func ( e ShardError ) Error ( ) string {
return fmt . Sprintf ( "[shard %d] %s" , e . id , e . Err )
}
2015-06-18 15:07:51 +00:00
// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
2015-05-22 20:08:43 +00:00
type Shard struct {
2016-02-26 19:41:54 +00:00
index * DatabaseIndex
path string
walPath string
id uint64
2016-02-23 20:07:21 +00:00
2016-02-26 19:41:54 +00:00
database string
retentionPolicy string
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
engine Engine
options EngineOptions
2015-05-23 22:06:07 +00:00
mu sync . RWMutex
2015-07-22 14:53:20 +00:00
measurementFields map [ string ] * MeasurementFields // measurement name to their fields
2015-06-18 15:07:51 +00:00
2015-09-04 22:43:57 +00:00
// expvar-based stats.
statMap * expvar . Map
2015-06-18 15:07:51 +00:00
// The writer used by the logger.
LogOutput io . Writer
2015-05-22 20:08:43 +00:00
}
2015-08-21 15:22:04 +00:00
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
2016-02-26 19:41:54 +00:00
func NewShard ( id uint64 , index * DatabaseIndex , path string , walPath string , options EngineOptions ) * Shard {
2015-09-04 22:43:57 +00:00
// Configure statistics collection.
2016-02-26 19:41:54 +00:00
key := fmt . Sprintf ( "shard:%s:%d" , path , id )
db , rp := DecodeStorePath ( path )
2016-02-23 20:07:21 +00:00
tags := map [ string ] string {
2016-02-26 19:41:54 +00:00
"path" : path ,
2016-02-23 20:07:21 +00:00
"id" : fmt . Sprintf ( "%d" , id ) ,
"engine" : options . EngineVersion ,
2016-02-26 19:41:54 +00:00
"database" : db ,
"retentionPolicy" : rp ,
2016-02-23 20:07:21 +00:00
}
2015-09-04 22:43:57 +00:00
statMap := influxdb . NewStatistics ( key , "shard" , tags )
2015-07-22 14:53:20 +00:00
return & Shard {
2015-05-24 11:39:45 +00:00
index : index ,
2015-08-16 19:45:09 +00:00
id : id ,
2016-02-26 19:41:54 +00:00
path : path ,
walPath : walPath ,
2015-07-22 14:53:20 +00:00
options : options ,
measurementFields : make ( map [ string ] * MeasurementFields ) ,
2015-06-18 15:07:51 +00:00
2016-02-26 19:41:54 +00:00
database : db ,
retentionPolicy : rp ,
2015-09-04 22:43:57 +00:00
statMap : statMap ,
2015-06-18 15:07:51 +00:00
LogOutput : os . Stderr ,
2015-05-22 20:08:43 +00:00
}
}
2015-06-08 19:07:05 +00:00
// Path returns the path set on the shard when it was created.
2016-02-26 19:41:54 +00:00
func ( s * Shard ) Path ( ) string { return s . path }
2015-06-08 19:07:05 +00:00
2016-02-10 20:04:18 +00:00
// Open initializes and opens the shard's store.
2015-05-26 15:41:15 +00:00
func ( s * Shard ) Open ( ) error {
2015-06-18 15:07:51 +00:00
if err := func ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// Return if the shard is already open
2015-07-22 14:53:20 +00:00
if s . engine != nil {
2015-06-18 15:07:51 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Initialize underlying engine.
2016-02-26 19:41:54 +00:00
e , err := NewEngine ( s . path , s . walPath , s . options )
2015-06-18 15:07:51 +00:00
if err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
s . engine = e
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Set log output on the engine.
s . engine . SetLogOutput ( s . LogOutput )
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
// Open engine.
if err := s . engine . Open ( ) ; err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
// Load metadata index.
2015-09-06 22:49:15 +00:00
if err := s . engine . LoadMetadataIndex ( s , s . index , s . measurementFields ) ; err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2015-05-22 20:08:43 +00:00
return nil
2015-06-18 15:07:51 +00:00
} ( ) ; err != nil {
s . close ( )
2016-02-24 13:33:07 +00:00
return NewShardError ( s . id , err )
2015-06-18 15:07:51 +00:00
}
return nil
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Close shuts down the shard's store.
2015-05-22 20:08:43 +00:00
func ( s * Shard ) Close ( ) error {
2015-06-04 18:50:32 +00:00
s . mu . Lock ( )
2015-07-22 14:53:20 +00:00
defer s . mu . Unlock ( )
return s . close ( )
2015-06-18 15:07:51 +00:00
}
func ( s * Shard ) close ( ) error {
2016-02-02 15:33:20 +00:00
if s . engine == nil {
return nil
2015-05-22 20:08:43 +00:00
}
2016-02-02 15:33:20 +00:00
err := s . engine . Close ( )
if err == nil {
s . engine = nil
}
return err
2015-05-22 20:08:43 +00:00
}
2015-08-25 21:44:42 +00:00
// DiskSize returns the size on disk of this shard
func ( s * Shard ) DiskSize ( ) ( int64 , error ) {
2016-02-26 19:41:54 +00:00
stats , err := os . Stat ( s . path )
2015-08-25 21:44:42 +00:00
if err != nil {
return 0 , err
}
2015-12-19 19:32:44 +00:00
return stats . Size ( ) , nil
2015-08-25 21:44:42 +00:00
}
2016-02-10 20:04:18 +00:00
// FieldCodec returns the field encoding for a measurement.
2015-05-28 22:02:12 +00:00
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
// into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func ( s * Shard ) FieldCodec ( measurementName string ) * FieldCodec {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
m := s . measurementFields [ measurementName ]
if m == nil {
2015-08-13 19:12:12 +00:00
return NewFieldCodec ( nil )
2015-05-28 22:02:12 +00:00
}
2015-07-22 14:53:20 +00:00
return m . Codec
2015-05-28 22:02:12 +00:00
}
2016-02-10 20:04:18 +00:00
// FieldCreate holds information for a field to create on a measurement
2015-07-22 14:53:20 +00:00
type FieldCreate struct {
Measurement string
2015-07-23 16:33:37 +00:00
Field * Field
2015-05-22 20:08:43 +00:00
}
2016-02-10 20:04:18 +00:00
// SeriesCreate holds information for a series to create
2015-07-22 14:53:20 +00:00
type SeriesCreate struct {
Measurement string
Series * Series
2015-05-22 20:08:43 +00:00
}
// WritePoints will write the raw data points and any new metadata to the index in the shard
2015-09-16 20:33:08 +00:00
func ( s * Shard ) WritePoints ( points [ ] models . Point ) error {
2015-09-04 22:43:57 +00:00
s . statMap . Add ( statWriteReq , 1 )
2015-08-21 19:27:51 +00:00
seriesToCreate , fieldsToCreate , seriesToAddShardTo , err := s . validateSeriesAndFields ( points )
2015-05-23 22:06:07 +00:00
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
2015-09-04 22:43:57 +00:00
s . statMap . Add ( statSeriesCreate , int64 ( len ( seriesToCreate ) ) )
s . statMap . Add ( statFieldsCreate , int64 ( len ( fieldsToCreate ) ) )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// add any new series to the in-memory index
if len ( seriesToCreate ) > 0 {
2015-05-22 20:08:43 +00:00
for _ , ss := range seriesToCreate {
2015-07-22 14:53:20 +00:00
s . index . CreateSeriesIndexIfNotExists ( ss . Measurement , ss . Series )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
}
2015-05-22 20:08:43 +00:00
2015-08-21 19:27:51 +00:00
if len ( seriesToAddShardTo ) > 0 {
for _ , k := range seriesToAddShardTo {
2016-01-11 18:00:25 +00:00
ss := s . index . Series ( k )
2015-08-21 19:27:51 +00:00
if ss != nil {
2016-01-11 18:00:25 +00:00
ss . AssignShard ( s . id )
2015-08-21 19:27:51 +00:00
}
}
}
2015-05-23 22:06:07 +00:00
// add any new fields and keep track of what needs to be saved
measurementFieldsToSave , err := s . createFieldsAndMeasurements ( fieldsToCreate )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
// make sure all data is encoded before attempting to save to bolt
2015-09-29 02:50:00 +00:00
// only required for the b1 and bz1 formats
2015-10-01 19:30:13 +00:00
if s . engine . Format ( ) != TSM1Format {
2015-09-29 02:50:00 +00:00
for _ , p := range points {
// Ignore if raw data has already been marshaled.
if p . Data ( ) != nil {
continue
}
// This was populated earlier, don't need to validate that it's there.
s . mu . RLock ( )
mf := s . measurementFields [ p . Name ( ) ]
s . mu . RUnlock ( )
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
return ErrFieldNotFound
}
data , err := mf . Codec . EncodeFields ( p . Fields ( ) )
if err != nil {
return err
}
p . SetData ( data )
}
}
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Write to the engine.
if err := s . engine . WritePoints ( points , measurementFieldsToSave , seriesToCreate ) ; err != nil {
2015-09-04 22:43:57 +00:00
s . statMap . Add ( statWritePointsFail , 1 )
2015-07-22 14:53:20 +00:00
return fmt . Errorf ( "engine: %s" , err )
2015-06-18 15:07:51 +00:00
}
2015-09-04 22:43:57 +00:00
s . statMap . Add ( statWritePointsOK , int64 ( len ( points ) ) )
2015-06-18 15:07:51 +00:00
2015-05-22 20:08:43 +00:00
return nil
}
2015-07-22 14:53:20 +00:00
// DeleteSeries deletes a list of series.
2016-02-03 17:23:31 +00:00
func ( s * Shard ) DeleteSeries ( seriesKeys [ ] string ) error {
return s . engine . DeleteSeries ( seriesKeys )
2015-06-02 15:20:20 +00:00
}
2015-07-22 14:53:20 +00:00
// DeleteMeasurement deletes a measurement and all underlying series.
func ( s * Shard ) DeleteMeasurement ( name string , seriesKeys [ ] string ) error {
2015-06-18 15:07:51 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-07-22 14:53:20 +00:00
if err := s . engine . DeleteMeasurement ( name , seriesKeys ) ; err != nil {
2015-06-03 15:32:50 +00:00
return err
}
2015-06-16 18:40:36 +00:00
// Remove entry from shard index.
delete ( s . measurementFields , name )
2015-07-22 14:53:20 +00:00
2015-06-03 15:32:50 +00:00
return nil
}
2015-07-22 14:53:20 +00:00
func ( s * Shard ) createFieldsAndMeasurements ( fieldsToCreate [ ] * FieldCreate ) ( map [ string ] * MeasurementFields , error ) {
2015-05-23 22:06:07 +00:00
if len ( fieldsToCreate ) == 0 {
return nil , nil
}
s . mu . Lock ( )
defer s . mu . Unlock ( )
// add fields
2015-07-22 14:53:20 +00:00
measurementsToSave := make ( map [ string ] * MeasurementFields )
2015-05-23 22:06:07 +00:00
for _ , f := range fieldsToCreate {
2015-07-22 14:53:20 +00:00
m := s . measurementFields [ f . Measurement ]
2015-05-23 22:06:07 +00:00
if m == nil {
2015-07-22 14:53:20 +00:00
m = measurementsToSave [ f . Measurement ]
2015-05-23 22:06:07 +00:00
if m == nil {
2015-07-23 16:33:37 +00:00
m = & MeasurementFields { Fields : make ( map [ string ] * Field ) }
2015-05-23 22:06:07 +00:00
}
2015-07-22 14:53:20 +00:00
s . measurementFields [ f . Measurement ] = m
2015-05-23 22:06:07 +00:00
}
2015-07-22 14:53:20 +00:00
measurementsToSave [ f . Measurement ] = m
2015-05-23 22:06:07 +00:00
2015-11-04 21:06:06 +00:00
// Add the field to the in memory index
if err := m . CreateFieldIfNotExists ( f . Field . Name , f . Field . Type , false ) ; err != nil {
2015-05-23 22:06:07 +00:00
return nil , err
}
// ensure the measurement is in the index and the field is there
2015-07-22 14:53:20 +00:00
measurement := s . index . CreateMeasurementIndexIfNotExists ( f . Measurement )
2015-12-03 19:52:27 +00:00
measurement . SetFieldName ( f . Field . Name )
2015-05-23 22:06:07 +00:00
}
return measurementsToSave , nil
}
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed
2015-09-16 20:33:08 +00:00
func ( s * Shard ) validateSeriesAndFields ( points [ ] models . Point ) ( [ ] * SeriesCreate , [ ] * FieldCreate , [ ] string , error ) {
2015-07-22 14:53:20 +00:00
var seriesToCreate [ ] * SeriesCreate
var fieldsToCreate [ ] * FieldCreate
2015-08-21 19:27:51 +00:00
var seriesToAddShardTo [ ] string
2015-05-23 22:06:07 +00:00
// get the shard mutex for locally defined fields
s . mu . RLock ( )
defer s . mu . RUnlock ( )
for _ , p := range points {
// see if the series should be added to the index
2016-01-11 18:00:25 +00:00
if ss := s . index . Series ( string ( p . Key ( ) ) ) ; ss == nil {
2015-08-16 19:45:09 +00:00
series := NewSeries ( string ( p . Key ( ) ) , p . Tags ( ) )
2015-07-22 14:53:20 +00:00
seriesToCreate = append ( seriesToCreate , & SeriesCreate { p . Name ( ) , series } )
2015-08-21 19:27:51 +00:00
seriesToAddShardTo = append ( seriesToAddShardTo , series . Key )
2015-08-16 19:45:09 +00:00
} else if ! ss . shardIDs [ s . id ] {
// this is the first time this series is being written into this shard, persist it
seriesToCreate = append ( seriesToCreate , & SeriesCreate { p . Name ( ) , ss } )
2015-08-21 19:27:51 +00:00
seriesToAddShardTo = append ( seriesToAddShardTo , ss . Key )
2015-05-23 22:06:07 +00:00
}
// see if the field definitions need to be saved to the shard
mf := s . measurementFields [ p . Name ( ) ]
if mf == nil {
for name , value := range p . Fields ( ) {
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
continue // skip validation since all fields are new
}
// validate field types and encode data
for name , value := range p . Fields ( ) {
if f := mf . Fields [ name ] ; f != nil {
2015-06-09 21:57:19 +00:00
// Field present in shard metadata, make sure there is no type conflict.
2015-05-23 22:06:07 +00:00
if f . Type != influxql . InspectDataType ( value ) {
2015-08-21 19:27:51 +00:00
return nil , nil , nil , fmt . Errorf ( "field type conflict: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s" , name , p . Name ( ) , value , f . Type )
2015-05-23 22:06:07 +00:00
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
}
2015-08-21 19:27:51 +00:00
return seriesToCreate , fieldsToCreate , seriesToAddShardTo , nil
2015-05-23 22:06:07 +00:00
}
2015-06-18 15:07:51 +00:00
// SeriesCount returns the number of series buckets on the shard.
2015-07-22 14:53:20 +00:00
func ( s * Shard ) SeriesCount ( ) ( int , error ) { return s . engine . SeriesCount ( ) }
2015-06-18 15:07:51 +00:00
2015-09-03 16:48:37 +00:00
// WriteTo writes the shard's data to w.
2015-09-04 22:43:57 +00:00
func ( s * Shard ) WriteTo ( w io . Writer ) ( int64 , error ) {
n , err := s . engine . WriteTo ( w )
s . statMap . Add ( statWriteBytes , int64 ( n ) )
return n , err
}
2015-09-03 16:48:37 +00:00
2015-11-04 21:06:06 +00:00
// CreateIterator returns an iterator for the data in the shard.
func ( s * Shard ) CreateIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
2016-02-19 20:38:02 +00:00
if influxql . Sources ( opt . Sources ) . HasSystemSource ( ) {
return s . createSystemIterator ( opt )
}
2015-11-04 21:06:06 +00:00
return s . engine . CreateIterator ( opt )
2015-11-04 21:06:06 +00:00
}
2016-02-19 20:38:02 +00:00
// createSystemIterator returns an iterator for a system source.
func ( s * Shard ) createSystemIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
// Only support a single system source.
if len ( opt . Sources ) > 1 {
return nil , errors . New ( "cannot select from multiple system sources" )
}
m := opt . Sources [ 0 ] . ( * influxql . Measurement )
switch m . Name {
2016-02-23 23:43:19 +00:00
case "_fieldKeys" :
return NewFieldKeysIterator ( s , opt )
2016-02-19 20:38:02 +00:00
case "_measurements" :
return NewMeasurementIterator ( s , opt )
2016-02-25 21:28:45 +00:00
case "_series" :
return NewSeriesIterator ( s , opt )
2016-02-19 20:38:02 +00:00
case "_tagKeys" :
return NewTagKeysIterator ( s , opt )
2016-03-06 14:52:34 +00:00
case "_tags" :
return NewTagValuesIterator ( s , opt )
2016-02-19 20:38:02 +00:00
default :
return nil , fmt . Errorf ( "unknown system source: %s" , m . Name )
}
}
2015-11-04 21:06:06 +00:00
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func ( s * Shard ) FieldDimensions ( sources influxql . Sources ) ( fields , dimensions map [ string ] struct { } , err error ) {
fields = make ( map [ string ] struct { } )
dimensions = make ( map [ string ] struct { } )
for _ , src := range sources {
switch m := src . ( type ) {
case * influxql . Measurement :
// Retrieve measurement.
mm := s . index . Measurement ( m . Name )
if mm == nil {
continue
}
// Append fields and dimensions.
for _ , name := range mm . FieldNames ( ) {
fields [ name ] = struct { } { }
}
for _ , key := range mm . TagKeys ( ) {
dimensions [ key ] = struct { } { }
}
}
}
return
}
2016-02-10 20:04:18 +00:00
// SeriesKeys returns a list of series in the shard.
2016-02-05 14:49:42 +00:00
func ( s * Shard ) SeriesKeys ( opt influxql . IteratorOptions ) ( influxql . SeriesList , error ) {
2016-02-08 15:02:08 +00:00
if influxql . Sources ( opt . Sources ) . HasSystemSource ( ) {
// Only support a single system source.
if len ( opt . Sources ) > 1 {
return nil , errors . New ( "cannot select from multiple system sources" )
}
2016-03-06 14:52:34 +00:00
// Meta queries don't need to know the series name and
// always have a single series of strings.
auxFields := make ( [ ] influxql . DataType , len ( opt . Aux ) )
for i := range auxFields {
auxFields [ i ] = influxql . String
}
return [ ] influxql . Series { { Aux : auxFields } } , nil
2016-02-08 15:02:08 +00:00
}
2016-02-19 20:38:02 +00:00
return s . engine . SeriesKeys ( opt )
2016-02-04 18:00:50 +00:00
}
2016-03-04 18:01:41 +00:00
// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func ( s * Shard ) ExpandSources ( sources influxql . Sources ) ( influxql . Sources , error ) {
// Use a map as a set to prevent duplicates.
set := map [ string ] influxql . Source { }
// Iterate all sources, expanding regexes when they're found.
for _ , source := range sources {
switch src := source . ( type ) {
case * influxql . Measurement :
// Add non-regex measurements directly to the set.
if src . Regex == nil {
set [ src . String ( ) ] = src
continue
}
// Loop over matching measurements.
for _ , m := range s . index . MeasurementsByRegex ( src . Regex . Val ) {
other := & influxql . Measurement {
Database : src . Database ,
RetentionPolicy : src . RetentionPolicy ,
Name : m . Name ,
}
set [ other . String ( ) ] = other
}
default :
return nil , fmt . Errorf ( "expandSources: unsupported source type: %T" , source )
}
}
// Convert set to sorted slice.
names := make ( [ ] string , 0 , len ( set ) )
for name := range set {
names = append ( names , name )
}
sort . Strings ( names )
// Convert set to a list of Sources.
expanded := make ( influxql . Sources , 0 , len ( set ) )
for _ , name := range names {
expanded = append ( expanded , set [ name ] )
}
return expanded , nil
}
2016-02-19 20:38:02 +00:00
// Shards represents a sortable list of shards.
type Shards [ ] * Shard
2015-11-04 21:06:06 +00:00
2016-02-19 20:38:02 +00:00
func ( a Shards ) Len ( ) int { return len ( a ) }
func ( a Shards ) Less ( i , j int ) bool { return a [ i ] . id < a [ j ] . id }
func ( a Shards ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
2015-11-04 21:06:06 +00:00
2016-02-10 20:04:18 +00:00
// MeasurementFields holds the fields of a measurement and their codec.
2015-07-22 14:53:20 +00:00
type MeasurementFields struct {
2015-07-23 16:33:37 +00:00
Fields map [ string ] * Field ` json:"fields" `
2015-07-22 14:53:20 +00:00
Codec * FieldCodec
2015-05-23 22:06:07 +00:00
}
2015-06-02 23:08:48 +00:00
// MarshalBinary encodes the object to a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) MarshalBinary ( ) ( [ ] byte , error ) {
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
for _ , f := range m . Fields {
id := int32 ( f . ID )
name := f . Name
2015-06-10 20:02:26 +00:00
t := int32 ( f . Type )
2015-06-02 23:08:48 +00:00
pb . Fields = append ( pb . Fields , & internal . Field { ID : & id , Name : & name , Type : & t } )
}
return proto . Marshal ( & pb )
}
2015-06-03 11:36:39 +00:00
// UnmarshalBinary decodes the object from a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) UnmarshalBinary ( buf [ ] byte ) error {
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
if err := proto . Unmarshal ( buf , & pb ) ; err != nil {
return err
}
2015-12-19 19:32:44 +00:00
m . Fields = make ( map [ string ] * Field , len ( pb . Fields ) )
2015-06-02 23:08:48 +00:00
for _ , f := range pb . Fields {
2015-07-23 16:33:37 +00:00
m . Fields [ f . GetName ( ) ] = & Field { ID : uint8 ( f . GetID ( ) ) , Name : f . GetName ( ) , Type : influxql . DataType ( f . GetType ( ) ) }
2015-06-02 23:08:48 +00:00
}
return nil
}
2015-08-10 18:46:57 +00:00
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
2015-05-23 22:06:07 +00:00
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
2015-10-06 22:49:37 +00:00
func ( m * MeasurementFields ) CreateFieldIfNotExists ( name string , typ influxql . DataType , limitCount bool ) error {
2015-05-23 22:06:07 +00:00
// Ignore if the field already exists.
if f := m . Fields [ name ] ; f != nil {
if f . Type != typ {
return ErrFieldTypeConflict
}
return nil
2015-05-22 20:08:43 +00:00
}
2015-10-06 22:49:37 +00:00
// If we're supposed to limit the number of fields, only 255 are allowed. If we go over that then return an error.
if len ( m . Fields ) + 1 > math . MaxUint8 && limitCount {
2015-05-23 22:06:07 +00:00
return ErrFieldOverflow
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create and append a new field.
2015-07-23 16:33:37 +00:00
f := & Field {
2015-05-23 22:06:07 +00:00
ID : uint8 ( len ( m . Fields ) + 1 ) ,
Name : name ,
Type : typ ,
}
m . Fields [ name ] = f
2015-07-22 14:53:20 +00:00
m . Codec = NewFieldCodec ( m . Fields )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Field represents a series field.
2015-07-23 16:33:37 +00:00
type Field struct {
2015-05-23 22:06:07 +00:00
ID uint8 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
Type influxql . DataType ` json:"type,omitempty" `
}
2015-05-22 20:08:43 +00:00
2015-06-28 06:54:34 +00:00
// FieldCodec provides encoding and decoding functionality for the fields of a given
2015-05-23 22:06:07 +00:00
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
// However, this is here until tx.go and the engine get refactored into tsdb.
type FieldCodec struct {
2015-07-23 16:33:37 +00:00
fieldsByID map [ uint8 ] * Field
fieldsByName map [ string ] * Field
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
2015-07-23 16:33:37 +00:00
func NewFieldCodec ( fields map [ string ] * Field ) * FieldCodec {
fieldsByID := make ( map [ uint8 ] * Field , len ( fields ) )
fieldsByName := make ( map [ string ] * Field , len ( fields ) )
2015-05-23 22:06:07 +00:00
for _ , f := range fields {
fieldsByID [ f . ID ] = f
fieldsByName [ f . Name ] = f
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
return & FieldCodec { fieldsByID : fieldsByID , fieldsByName : fieldsByName }
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) EncodeFields ( values map [ string ] interface { } ) ( [ ] byte , error ) {
2015-05-23 22:06:07 +00:00
// Allocate byte slice
b := make ( [ ] byte , 0 , 10 )
for k , v := range values {
field := f . fieldsByName [ k ]
if field == nil {
panic ( fmt . Sprintf ( "field does not exist for %s" , k ) )
} else if influxql . InspectDataType ( v ) != field . Type {
2015-06-10 21:57:27 +00:00
return nil , fmt . Errorf ( "field \"%s\" is type %T, mapped as type %s" , k , v , field . Type )
2015-05-23 22:06:07 +00:00
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
var buf [ ] byte
switch field . Type {
case influxql . Float :
value := v . ( float64 )
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , math . Float64bits ( value ) )
case influxql . Integer :
var value uint64
switch v . ( type ) {
case int :
value = uint64 ( v . ( int ) )
case int32 :
value = uint64 ( v . ( int32 ) )
case int64 :
value = uint64 ( v . ( int64 ) )
default :
panic ( fmt . Sprintf ( "invalid integer type: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , value )
case influxql . Boolean :
value := v . ( bool )
// Only 1 byte need for a boolean.
buf = make ( [ ] byte , 2 )
if value {
buf [ 1 ] = byte ( 1 )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
case influxql . String :
value := v . ( string )
if len ( value ) > maxStringLength {
value = value [ : maxStringLength ]
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
buf = make ( [ ] byte , len ( value ) + 3 )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Set the string length, then copy the string itself.
binary . BigEndian . PutUint16 ( buf [ 1 : 3 ] , uint16 ( len ( value ) ) )
for i , c := range [ ] byte ( value ) {
buf [ i + 3 ] = byte ( c )
2015-05-22 20:08:43 +00:00
}
default :
2015-05-23 22:06:07 +00:00
panic ( fmt . Sprintf ( "unsupported value type during encode fields: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Always set the field ID as the leading byte.
buf [ 0 ] = field . ID
// Append temp buffer to the end.
b = append ( b , buf ... )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return b , nil
2015-05-22 20:08:43 +00:00
}
2016-02-10 20:04:18 +00:00
// FieldIDByName returns the ID of the field with the given name s.
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
func ( f * FieldCodec ) FieldIDByName ( s string ) ( uint8 , error ) {
fi := f . fieldsByName [ s ]
if fi == nil {
2015-06-30 17:29:09 +00:00
return 0 , ErrFieldNotFound
2015-05-28 22:02:12 +00:00
}
return fi . ID , nil
}
2015-05-23 22:06:07 +00:00
// DecodeFields decodes a byte slice into a set of field ids and values.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) DecodeFields ( b [ ] byte ) ( map [ uint8 ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
if len ( b ) == 0 {
return nil , nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create a map to hold the decoded data.
values := make ( map [ uint8 ] interface { } , 0 )
for {
if len ( b ) < 1 {
// No more bytes.
break
}
// First byte is the field identifier.
fieldID := b [ 0 ]
field := f . fieldsByID [ fieldID ]
if field == nil {
// See note in DecodeByID() regarding field-mapping failures.
return nil , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : size + 3 ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode fields: %T" , f . fieldsByID [ fieldID ] ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
values [ fieldID ] = value
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return values , nil
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// DecodeFieldsWithNames decodes a byte slice into a set of field names and values
2015-05-28 22:02:12 +00:00
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeFieldsWithNames ( b [ ] byte ) ( map [ string ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
fields , err := f . DecodeFields ( b )
if err != nil {
return nil , err
}
m := make ( map [ string ] interface { } )
for id , v := range fields {
field := f . fieldsByID [ id ]
if field != nil {
m [ field . Name ] = v
2015-05-22 20:08:43 +00:00
}
}
2015-05-23 22:06:07 +00:00
return m , nil
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeByID ( targetID uint8 , b [ ] byte ) ( interface { } , error ) {
if len ( b ) == 0 {
return 0 , ErrFieldNotFound
}
for {
if len ( b ) < 1 {
// No more bytes.
break
}
field , ok := f . fieldsByID [ b [ 0 ] ]
if ! ok {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return 0 , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
// Move bytes forward.
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : 3 + size ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode by id: %T" , field . Type ) )
}
if field . ID == targetID {
return value , nil
}
}
return 0 , ErrFieldNotFound
}
2015-06-30 17:29:09 +00:00
// DecodeByName scans a byte slice for a field with the given name, converts it to its
// expected type, and return that value.
func ( f * FieldCodec ) DecodeByName ( name string , b [ ] byte ) ( interface { } , error ) {
2015-09-29 02:50:00 +00:00
fi := f . FieldByName ( name )
if fi == nil {
return 0 , ErrFieldNotFound
}
return f . DecodeByID ( fi . ID , b )
2015-06-30 17:29:09 +00:00
}
2016-02-10 20:04:18 +00:00
// Fields returns a unsorted list of the codecs fields.
2015-12-19 19:32:44 +00:00
func ( f * FieldCodec ) Fields ( ) [ ] * Field {
a := make ( [ ] * Field , 0 , len ( f . fieldsByID ) )
2015-09-06 22:49:15 +00:00
for _ , f := range f . fieldsByID {
a = append ( a , f )
}
2015-12-19 19:32:44 +00:00
return a
2015-09-06 22:49:15 +00:00
}
2015-05-23 22:06:07 +00:00
// FieldByName returns the field by its name. It will return a nil if not found
2015-09-06 22:49:15 +00:00
func ( f * FieldCodec ) FieldByName ( name string ) * Field {
2015-05-23 22:06:07 +00:00
return f . fieldsByName [ name ]
2015-05-22 20:08:43 +00:00
}
2016-02-19 20:38:02 +00:00
// shardIteratorCreator creates iterators for a local shard.
// This simply wraps the shard so that Close() does not close the underlying shard.
type shardIteratorCreator struct {
sh * Shard
}
func ( ic * shardIteratorCreator ) Close ( ) error { return nil }
func ( ic * shardIteratorCreator ) CreateIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
return ic . sh . CreateIterator ( opt )
}
func ( ic * shardIteratorCreator ) FieldDimensions ( sources influxql . Sources ) ( fields , dimensions map [ string ] struct { } , err error ) {
return ic . sh . FieldDimensions ( sources )
}
func ( ic * shardIteratorCreator ) SeriesKeys ( opt influxql . IteratorOptions ) ( influxql . SeriesList , error ) {
return ic . sh . SeriesKeys ( opt )
}
2016-03-04 18:01:41 +00:00
func ( ic * shardIteratorCreator ) ExpandSources ( sources influxql . Sources ) ( influxql . Sources , error ) {
return ic . sh . ExpandSources ( sources )
}
2016-02-19 20:38:02 +00:00
2016-02-23 23:43:19 +00:00
func NewFieldKeysIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
fn := func ( m * Measurement ) [ ] string {
keys := m . FieldNames ( )
sort . Strings ( keys )
return keys
}
return newMeasurementKeysIterator ( sh , fn , opt )
}
2016-02-04 15:12:52 +00:00
// MeasurementIterator represents a string iterator that emits all measurement names in a shard.
type MeasurementIterator struct {
mms Measurements
source * influxql . Measurement
}
// NewMeasurementIterator returns a new instance of MeasurementIterator.
func NewMeasurementIterator ( sh * Shard , opt influxql . IteratorOptions ) ( * MeasurementIterator , error ) {
itr := & MeasurementIterator { }
// Extract source.
if len ( opt . Sources ) > 0 {
itr . source , _ = opt . Sources [ 0 ] . ( * influxql . Measurement )
}
// Retrieve measurements from shard. Filter if condition specified.
if opt . Condition == nil {
itr . mms = sh . index . Measurements ( )
} else {
2016-03-06 14:52:34 +00:00
mms , _ , err := sh . index . measurementsByExpr ( opt . Condition )
2016-02-04 15:12:52 +00:00
if err != nil {
return nil , err
}
itr . mms = mms
}
// Sort measurements by name.
sort . Sort ( itr . mms )
return itr , nil
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * MeasurementIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-02-04 15:12:52 +00:00
// Close closes the iterator.
func ( itr * MeasurementIterator ) Close ( ) error { return nil }
// Next emits the next measurement name.
func ( itr * MeasurementIterator ) Next ( ) * influxql . FloatPoint {
if len ( itr . mms ) == 0 {
return nil
}
mm := itr . mms [ 0 ]
itr . mms = itr . mms [ 1 : ]
return & influxql . FloatPoint {
Name : "measurements" ,
Aux : [ ] interface { } { mm . Name } ,
}
}
2016-02-25 21:28:45 +00:00
// seriesIterator emits series ids.
type seriesIterator struct {
keys [ ] string // remaining series
fields [ ] string // fields to emit (key)
}
// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
// Retrieve a list of all measurements.
mms := sh . index . Measurements ( )
sort . Sort ( mms )
// Only equality operators are allowed.
var err error
influxql . WalkFunc ( opt . Condition , func ( n influxql . Node ) {
switch n := n . ( type ) {
case * influxql . BinaryExpr :
switch n . Op {
case influxql . EQ , influxql . NEQ , influxql . EQREGEX , influxql . NEQREGEX ,
influxql . OR , influxql . AND :
default :
err = errors . New ( "invalid tag comparison operator" )
}
}
} )
if err != nil {
return nil , err
}
// Generate a list of all series keys.
keys := newStringSet ( )
for _ , mm := range mms {
ids , err := mm . seriesIDsAllOrByExpr ( opt . Condition )
if err != nil {
return nil , err
}
for _ , id := range ids {
keys . add ( mm . SeriesByID ( id ) . Key )
}
}
return & seriesIterator {
keys : keys . list ( ) ,
fields : opt . Aux ,
} , nil
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * seriesIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-02-25 21:28:45 +00:00
// Close closes the iterator.
func ( itr * seriesIterator ) Close ( ) error { return nil }
// Next emits the next point in the iterator.
func ( itr * seriesIterator ) Next ( ) * influxql . FloatPoint {
// If there are no more keys then return nil.
if len ( itr . keys ) == 0 {
return nil
}
// Prepare auxiliary fields.
aux := make ( [ ] interface { } , len ( itr . fields ) )
for i , f := range itr . fields {
switch f {
case "key" :
aux [ i ] = itr . keys [ 0 ]
}
}
// Return next key.
p := & influxql . FloatPoint {
Aux : aux ,
}
itr . keys = itr . keys [ 1 : ]
return p
}
2016-02-23 23:43:19 +00:00
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
fn := func ( m * Measurement ) [ ] string {
return m . TagKeys ( )
2016-02-04 18:00:50 +00:00
}
2016-02-23 23:43:19 +00:00
return newMeasurementKeysIterator ( sh , fn , opt )
2016-02-04 18:00:50 +00:00
}
2016-03-06 14:52:34 +00:00
// tagValuesIterator emits key/tag values
type tagValuesIterator struct {
series [ ] * Series // remaining series
keys [ ] string // tag keys to select from a series
fields [ ] string // fields to emit (key or value)
buf struct {
s * Series // current series
keys [ ] string // current tag's keys
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
if opt . Condition == nil {
return nil , errors . New ( "a condition is required" )
}
mms , ok , err := sh . index . measurementsByExpr ( opt . Condition )
if err != nil {
return nil , err
} else if ! ok {
mms = sh . index . Measurements ( )
sort . Sort ( mms )
}
filterExpr := influxql . CloneExpr ( opt . Condition )
filterExpr = influxql . RewriteExpr ( filterExpr , func ( e influxql . Expr ) influxql . Expr {
switch e := e . ( type ) {
case * influxql . BinaryExpr :
switch e . Op {
case influxql . EQ , influxql . NEQ , influxql . EQREGEX , influxql . NEQREGEX :
tag , ok := e . LHS . ( * influxql . VarRef )
if ! ok || tag . Val == "name" || strings . HasPrefix ( tag . Val , "_" ) {
return nil
}
}
}
return e
} )
var series [ ] * Series
keys := newStringSet ( )
for _ , mm := range mms {
ss , ok , err := mm . tagKeysByExpr ( opt . Condition )
if err != nil {
return nil , err
} else if ! ok {
keys . add ( mm . TagKeys ( ) ... )
} else {
keys = keys . union ( ss )
}
ids , err := mm . seriesIDsAllOrByExpr ( filterExpr )
if err != nil {
return nil , err
}
for _ , id := range ids {
series = append ( series , mm . SeriesByID ( id ) )
}
}
return & tagValuesIterator {
series : series ,
keys : keys . list ( ) ,
fields : opt . Aux ,
} , nil
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * tagValuesIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-03-06 14:52:34 +00:00
// Close closes the iterator.
func ( itr * tagValuesIterator ) Close ( ) error { return nil }
// Next emits the next point in the iterator.
func ( itr * tagValuesIterator ) Next ( ) * influxql . FloatPoint {
for {
// If there are no more values then move to the next key.
if len ( itr . buf . keys ) == 0 {
if len ( itr . series ) == 0 {
return nil
}
itr . buf . s = itr . series [ 0 ]
itr . buf . keys = itr . keys
itr . series = itr . series [ 1 : ]
continue
}
key := itr . buf . keys [ 0 ]
value , ok := itr . buf . s . Tags [ key ]
if ! ok {
itr . buf . keys = itr . buf . keys [ 1 : ]
continue
}
// Prepare auxiliary fields.
auxFields := make ( [ ] interface { } , len ( itr . fields ) )
for i , f := range itr . fields {
switch f {
case "_tagKey" :
auxFields [ i ] = key
case "value" :
auxFields [ i ] = value
}
}
// Return next key.
p := & influxql . FloatPoint {
Name : itr . buf . s . measurement . Name ,
Aux : auxFields ,
}
itr . buf . keys = itr . buf . keys [ 1 : ]
return p
}
}
2016-02-23 23:43:19 +00:00
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func ( m * Measurement ) [ ] string
func newMeasurementKeysIterator ( sh * Shard , fn measurementKeyFunc , opt influxql . IteratorOptions ) ( * measurementKeysIterator , error ) {
itr := & measurementKeysIterator { fn : fn }
2016-02-04 18:00:50 +00:00
// Retrieve measurements from shard. Filter if condition specified.
if opt . Condition == nil {
itr . mms = sh . index . Measurements ( )
} else {
2016-03-06 14:52:34 +00:00
mms , _ , err := sh . index . measurementsByExpr ( opt . Condition )
2016-02-04 18:00:50 +00:00
if err != nil {
return nil , err
}
itr . mms = mms
}
// Sort measurements by name.
sort . Sort ( itr . mms )
return itr , nil
}
2016-02-23 23:43:19 +00:00
// measurementKeysIterator iterates over measurements and gets keys from each measurement.
type measurementKeysIterator struct {
mms Measurements // remaining measurements
buf struct {
mm * Measurement // current measurement
keys [ ] string // current measurement's keys
}
fn measurementKeyFunc
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * measurementKeysIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-02-04 18:00:50 +00:00
// Close closes the iterator.
2016-02-23 23:43:19 +00:00
func ( itr * measurementKeysIterator ) Close ( ) error { return nil }
2016-02-04 18:00:50 +00:00
// Next emits the next tag key name.
2016-02-23 23:43:19 +00:00
func ( itr * measurementKeysIterator ) Next ( ) * influxql . FloatPoint {
2016-02-04 18:00:50 +00:00
for {
// If there are no more keys then move to the next measurements.
if len ( itr . buf . keys ) == 0 {
if len ( itr . mms ) == 0 {
return nil
}
itr . buf . mm = itr . mms [ 0 ]
2016-02-23 23:43:19 +00:00
itr . buf . keys = itr . fn ( itr . buf . mm )
2016-02-04 18:00:50 +00:00
itr . mms = itr . mms [ 1 : ]
continue
}
// Return next key.
p := & influxql . FloatPoint {
Name : itr . buf . mm . Name ,
Aux : [ ] interface { } { itr . buf . keys [ 0 ] } ,
}
itr . buf . keys = itr . buf . keys [ 1 : ]
return p
}
}
2015-11-04 21:06:06 +00:00
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
func IsNumeric ( c * influxql . Call ) bool {
switch c . Name {
case "count" , "first" , "last" , "distinct" :
return false
default :
return true
}
}
2015-05-22 20:08:43 +00:00
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshalJSON ( v interface { } ) [ ] byte {
b , err := json . Marshal ( v )
if err != nil {
panic ( "marshal: " + err . Error ( ) )
}
return b
}
// mustUnmarshalJSON decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshalJSON ( b [ ] byte , v interface { } ) {
if err := json . Unmarshal ( b , v ) ; err != nil {
panic ( "unmarshal: " + err . Error ( ) )
}
}