2015-05-22 20:08:43 +00:00
package tsdb
import (
"errors"
"fmt"
2015-06-18 15:07:51 +00:00
"io"
2016-04-30 04:56:57 +00:00
"log"
2015-06-18 15:07:51 +00:00
"os"
2016-05-18 14:34:06 +00:00
"path/filepath"
2016-02-04 15:12:52 +00:00
"sort"
2016-06-30 16:49:53 +00:00
"strings"
2015-05-22 20:08:43 +00:00
"sync"
2016-07-07 16:13:56 +00:00
"sync/atomic"
2016-04-30 04:56:57 +00:00
"time"
2015-05-22 20:08:43 +00:00
2015-11-04 21:06:06 +00:00
"github.com/gogo/protobuf/proto"
2016-02-10 17:26:18 +00:00
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
2016-04-05 12:54:11 +00:00
internal "github.com/influxdata/influxdb/tsdb/internal"
2015-05-22 20:08:43 +00:00
)
2016-05-18 15:04:50 +00:00
// monitorStatInterval is the interval at which the shard is inspected
// for the purpose of determining certain monitoring statistics.
const monitorStatInterval = 30 * time . Second
2015-09-04 22:43:57 +00:00
const (
2016-08-17 19:33:46 +00:00
statWriteReq = "writeReq"
statWriteReqOK = "writeReqOk"
statWriteReqErr = "writeReqErr"
statSeriesCreate = "seriesCreate"
statFieldsCreate = "fieldsCreate"
statWritePointsErr = "writePointsErr"
statWritePointsOK = "writePointsOk"
statWriteBytes = "writeBytes"
statDiskBytes = "diskBytes"
2015-09-04 22:43:57 +00:00
)
2015-06-18 15:07:51 +00:00
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors . New ( "field overflow" )
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors . New ( "field type conflict" )
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors . New ( "field not found" )
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors . New ( "field ID not mapped" )
2016-03-29 22:32:34 +00:00
// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors . New ( "engine is closed" )
2016-05-27 22:47:33 +00:00
// ErrShardDisabled is returned when a the shard is not available for
// queries or writes.
ErrShardDisabled = errors . New ( "shard is disabled" )
2015-06-18 15:07:51 +00:00
)
2016-02-24 13:33:07 +00:00
// A ShardError implements the error interface, and contains extra
// context about the shard that generated the error.
type ShardError struct {
id uint64
Err error
}
// NewShardError returns a new ShardError.
2016-02-24 13:34:19 +00:00
func NewShardError ( id uint64 , err error ) error {
2016-02-24 13:33:07 +00:00
if err == nil {
2016-02-24 13:34:19 +00:00
return nil
2016-02-24 13:33:07 +00:00
}
return ShardError { id : id , Err : err }
}
func ( e ShardError ) Error ( ) string {
return fmt . Sprintf ( "[shard %d] %s" , e . id , e . Err )
}
2015-06-18 15:07:51 +00:00
// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
2015-05-22 20:08:43 +00:00
type Shard struct {
2016-02-26 19:41:54 +00:00
index * DatabaseIndex
path string
walPath string
id uint64
2016-02-23 20:07:21 +00:00
2016-02-26 19:41:54 +00:00
database string
retentionPolicy string
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
options EngineOptions
2015-05-23 22:06:07 +00:00
2016-05-18 14:34:06 +00:00
mu sync . RWMutex
engine Engine
closing chan struct { }
2016-05-27 22:47:33 +00:00
enabled bool
2015-06-18 15:07:51 +00:00
2015-09-04 22:43:57 +00:00
// expvar-based stats.
2016-08-24 10:40:13 +00:00
stats * ShardStatistics
defaultTags models . StatisticTags
2015-09-04 22:43:57 +00:00
2016-04-30 04:56:57 +00:00
logger * log . Logger
2016-07-07 14:48:12 +00:00
// used by logger. Referenced so it can be passed down to new caches.
logOutput io . Writer
2016-04-30 04:56:57 +00:00
2016-06-01 22:17:18 +00:00
EnableOnOpen bool
2015-05-22 20:08:43 +00:00
}
2015-08-21 15:22:04 +00:00
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
2016-02-26 19:41:54 +00:00
func NewShard ( id uint64 , index * DatabaseIndex , path string , walPath string , options EngineOptions ) * Shard {
db , rp := DecodeStorePath ( path )
2016-04-30 04:56:57 +00:00
s := & Shard {
2016-04-01 19:30:09 +00:00
index : index ,
id : id ,
path : path ,
walPath : walPath ,
options : options ,
2016-05-18 14:34:06 +00:00
closing : make ( chan struct { } ) ,
2015-06-18 15:07:51 +00:00
2016-07-07 16:13:56 +00:00
stats : & ShardStatistics { } ,
2016-08-24 10:40:13 +00:00
defaultTags : models . StatisticTags {
2016-07-07 16:13:56 +00:00
"path" : path ,
"id" : fmt . Sprintf ( "%d" , id ) ,
"database" : db ,
"retentionPolicy" : rp ,
2016-08-19 10:12:35 +00:00
"engine" : options . EngineVersion ,
} ,
2016-07-07 16:13:56 +00:00
2016-02-26 19:41:54 +00:00
database : db ,
retentionPolicy : rp ,
2016-07-07 14:48:12 +00:00
logger : log . New ( os . Stderr , "[shard] " , log . LstdFlags ) ,
logOutput : os . Stderr ,
2016-06-01 22:17:18 +00:00
EnableOnOpen : true ,
2015-05-22 20:08:43 +00:00
}
2016-04-30 04:56:57 +00:00
return s
2015-05-22 20:08:43 +00:00
}
2016-07-07 14:48:12 +00:00
// SetLogOutput sets the writer to which log output will be written. It is safe
// for concurrent use.
2016-04-20 20:07:08 +00:00
func ( s * Shard ) SetLogOutput ( w io . Writer ) {
2016-07-07 14:48:12 +00:00
s . logger . SetOutput ( w )
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err == nil {
2016-04-20 20:07:08 +00:00
s . engine . SetLogOutput ( w )
}
2016-07-07 14:48:12 +00:00
s . mu . Lock ( )
s . logOutput = w
s . mu . Unlock ( )
2016-04-20 20:07:08 +00:00
}
2016-05-27 22:47:33 +00:00
// SetEnabled enables the shard for queries and write. When disabled, all
// writes and queries return an error and compactions are stopped for the shard.
func ( s * Shard ) SetEnabled ( enabled bool ) {
s . mu . Lock ( )
// Prevent writes and queries
s . enabled = enabled
2016-06-01 22:17:18 +00:00
if s . engine != nil {
// Disable background compactions and snapshotting
s . engine . SetEnabled ( enabled )
}
2016-05-27 22:47:33 +00:00
s . mu . Unlock ( )
}
2016-07-07 16:13:56 +00:00
// ShardStatistics maintains statistics for a shard.
type ShardStatistics struct {
2016-08-17 19:33:46 +00:00
WriteReq int64
WriteReqOK int64
WriteReqErr int64
SeriesCreated int64
FieldsCreated int64
WritePointsErr int64
WritePointsOK int64
BytesWritten int64
DiskBytes int64
2016-07-07 16:13:56 +00:00
}
// Statistics returns statistics for periodic monitoring.
func ( s * Shard ) Statistics ( tags map [ string ] string ) [ ] models . Statistic {
2016-07-15 20:53:06 +00:00
if err := s . ready ( ) ; err != nil {
return nil
}
2016-08-17 19:33:46 +00:00
seriesN , _ := s . engine . SeriesCount ( )
2016-09-09 22:16:53 +00:00
tags = s . defaultTags . Merge ( tags )
2016-07-07 16:13:56 +00:00
statistics := [ ] models . Statistic { {
Name : "shard" ,
2016-09-09 22:16:53 +00:00
Tags : tags ,
2016-07-07 16:13:56 +00:00
Values : map [ string ] interface { } {
2016-08-17 19:33:46 +00:00
statWriteReq : atomic . LoadInt64 ( & s . stats . WriteReq ) ,
statWriteReqOK : atomic . LoadInt64 ( & s . stats . WriteReqOK ) ,
statWriteReqErr : atomic . LoadInt64 ( & s . stats . WriteReqErr ) ,
statSeriesCreate : seriesN ,
statWritePointsErr : atomic . LoadInt64 ( & s . stats . WritePointsErr ) ,
statWritePointsOK : atomic . LoadInt64 ( & s . stats . WritePointsOK ) ,
statWriteBytes : atomic . LoadInt64 ( & s . stats . BytesWritten ) ,
statDiskBytes : atomic . LoadInt64 ( & s . stats . DiskBytes ) ,
2016-07-07 16:13:56 +00:00
} ,
} }
statistics = append ( statistics , s . engine . Statistics ( tags ) ... )
return statistics
}
2015-06-08 19:07:05 +00:00
// Path returns the path set on the shard when it was created.
2016-02-26 19:41:54 +00:00
func ( s * Shard ) Path ( ) string { return s . path }
2015-06-08 19:07:05 +00:00
2016-02-10 20:04:18 +00:00
// Open initializes and opens the shard's store.
2015-05-26 15:41:15 +00:00
func ( s * Shard ) Open ( ) error {
2015-06-18 15:07:51 +00:00
if err := func ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// Return if the shard is already open
2015-07-22 14:53:20 +00:00
if s . engine != nil {
2015-06-18 15:07:51 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Initialize underlying engine.
2016-02-26 19:41:54 +00:00
e , err := NewEngine ( s . path , s . walPath , s . options )
2015-06-18 15:07:51 +00:00
if err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Set log output on the engine.
2016-07-07 14:48:12 +00:00
e . SetLogOutput ( s . logOutput )
2015-06-18 15:07:51 +00:00
2016-06-01 22:17:18 +00:00
// Disable compactions while loading the index
e . SetEnabled ( false )
2015-07-22 14:53:20 +00:00
// Open engine.
2016-05-27 12:45:16 +00:00
if err := e . Open ( ) ; err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
// Load metadata index.
2016-04-30 04:56:57 +00:00
start := time . Now ( )
2016-05-27 12:45:16 +00:00
if err := e . LoadMetadataIndex ( s . id , s . index ) ; err != nil {
2016-02-24 13:33:07 +00:00
return err
2015-06-18 15:07:51 +00:00
}
2016-05-18 21:21:57 +00:00
count := s . index . SeriesShardN ( s . id )
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . SeriesCreated , int64 ( count ) )
2016-05-18 21:21:57 +00:00
2016-05-27 12:45:16 +00:00
s . engine = e
2016-04-30 04:56:57 +00:00
s . logger . Printf ( "%s database index loaded in %s" , s . path , time . Now ( ) . Sub ( start ) )
2015-06-18 15:07:51 +00:00
2016-05-18 14:34:06 +00:00
go s . monitorSize ( )
2015-05-22 20:08:43 +00:00
return nil
2015-06-18 15:07:51 +00:00
} ( ) ; err != nil {
s . close ( )
2016-02-24 13:33:07 +00:00
return NewShardError ( s . id , err )
2015-06-18 15:07:51 +00:00
}
2016-06-01 22:17:18 +00:00
if s . EnableOnOpen {
// enable writes, queries and compactions
s . SetEnabled ( true )
}
2016-05-27 22:47:33 +00:00
2015-06-18 15:07:51 +00:00
return nil
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Close shuts down the shard's store.
2015-05-22 20:08:43 +00:00
func ( s * Shard ) Close ( ) error {
2015-06-04 18:50:32 +00:00
s . mu . Lock ( )
2015-07-22 14:53:20 +00:00
defer s . mu . Unlock ( )
return s . close ( )
2015-06-18 15:07:51 +00:00
}
func ( s * Shard ) close ( ) error {
2016-02-02 15:33:20 +00:00
if s . engine == nil {
return nil
2015-05-22 20:08:43 +00:00
}
2016-02-02 15:33:20 +00:00
2016-05-18 14:34:06 +00:00
// Close the closing channel at most once.
select {
case <- s . closing :
default :
close ( s . closing )
}
2016-04-27 20:59:00 +00:00
// Don't leak our shard ID and series keys in the index
s . index . RemoveShard ( s . id )
2016-02-02 15:33:20 +00:00
err := s . engine . Close ( )
if err == nil {
s . engine = nil
}
return err
2015-05-22 20:08:43 +00:00
}
2016-05-27 22:47:33 +00:00
// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
func ( s * Shard ) ready ( ) error {
var err error
2016-03-29 22:32:34 +00:00
s . mu . RLock ( )
2016-05-27 22:47:33 +00:00
if s . engine == nil {
err = ErrEngineClosed
} else if ! s . enabled {
err = ErrShardDisabled
}
2016-03-29 22:32:34 +00:00
s . mu . RUnlock ( )
2016-05-27 22:47:33 +00:00
return err
2016-03-29 22:32:34 +00:00
}
2015-08-25 21:44:42 +00:00
// DiskSize returns the size on disk of this shard
func ( s * Shard ) DiskSize ( ) ( int64 , error ) {
2016-05-18 14:34:06 +00:00
var size int64
2016-05-18 15:04:50 +00:00
err := filepath . Walk ( s . path , func ( _ string , fi os . FileInfo , err error ) error {
2016-05-20 05:19:44 +00:00
if err != nil {
return err
}
2016-05-18 15:04:50 +00:00
if ! fi . IsDir ( ) {
size += fi . Size ( )
2016-05-18 14:34:06 +00:00
}
return err
} )
2015-08-25 21:44:42 +00:00
if err != nil {
return 0 , err
}
2016-05-18 14:34:06 +00:00
2016-05-18 15:04:50 +00:00
err = filepath . Walk ( s . walPath , func ( _ string , fi os . FileInfo , err error ) error {
2016-05-20 05:19:44 +00:00
if err != nil {
return err
}
2016-05-18 15:04:50 +00:00
if ! fi . IsDir ( ) {
size += fi . Size ( )
2016-05-18 14:34:06 +00:00
}
return err
} )
return size , err
2015-08-25 21:44:42 +00:00
}
2016-02-10 20:04:18 +00:00
// FieldCreate holds information for a field to create on a measurement
2015-07-22 14:53:20 +00:00
type FieldCreate struct {
Measurement string
2015-07-23 16:33:37 +00:00
Field * Field
2015-05-22 20:08:43 +00:00
}
2016-02-10 20:04:18 +00:00
// SeriesCreate holds information for a series to create
2015-07-22 14:53:20 +00:00
type SeriesCreate struct {
Measurement string
Series * Series
2015-05-22 20:08:43 +00:00
}
// WritePoints will write the raw data points and any new metadata to the index in the shard
2015-09-16 20:33:08 +00:00
func ( s * Shard ) WritePoints ( points [ ] models . Point ) error {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return err
2016-03-29 22:32:34 +00:00
}
2016-04-01 19:30:09 +00:00
s . mu . RLock ( )
defer s . mu . RUnlock ( )
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . WriteReq , 1 )
2015-09-04 22:43:57 +00:00
2016-04-01 19:30:09 +00:00
fieldsToCreate , err := s . validateSeriesAndFields ( points )
2015-05-23 22:06:07 +00:00
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . FieldsCreated , int64 ( len ( fieldsToCreate ) ) )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// add any new fields and keep track of what needs to be saved
2016-04-01 19:30:09 +00:00
if err := s . createFieldsAndMeasurements ( fieldsToCreate ) ; err != nil {
2015-05-23 22:06:07 +00:00
return err
2015-05-22 20:08:43 +00:00
}
2015-07-22 14:53:20 +00:00
// Write to the engine.
2016-04-01 19:30:09 +00:00
if err := s . engine . WritePoints ( points ) ; err != nil {
2016-08-17 19:33:46 +00:00
atomic . AddInt64 ( & s . stats . WritePointsErr , int64 ( len ( points ) ) )
atomic . AddInt64 ( & s . stats . WriteReqErr , 1 )
2015-07-22 14:53:20 +00:00
return fmt . Errorf ( "engine: %s" , err )
2015-06-18 15:07:51 +00:00
}
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . WritePointsOK , int64 ( len ( points ) ) )
2016-08-17 19:33:46 +00:00
atomic . AddInt64 ( & s . stats . WriteReqOK , 1 )
2015-06-18 15:07:51 +00:00
2015-05-22 20:08:43 +00:00
return nil
}
2016-04-29 22:31:57 +00:00
func ( s * Shard ) ContainsSeries ( seriesKeys [ ] string ) ( map [ string ] bool , error ) {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return nil , err
2016-04-29 22:31:57 +00:00
}
return s . engine . ContainsSeries ( seriesKeys )
}
2015-07-22 14:53:20 +00:00
// DeleteSeries deletes a list of series.
2016-04-27 19:01:07 +00:00
func ( s * Shard ) DeleteSeries ( seriesKeys [ ] string ) error {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return err
2016-03-29 22:32:34 +00:00
}
2016-04-29 22:31:57 +00:00
if err := s . engine . DeleteSeries ( seriesKeys ) ; err != nil {
return err
}
return nil
2016-04-27 19:01:07 +00:00
}
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func ( s * Shard ) DeleteSeriesRange ( seriesKeys [ ] string , min , max int64 ) error {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return err
2016-04-27 19:01:07 +00:00
}
2016-05-27 22:47:33 +00:00
2016-04-29 22:31:57 +00:00
if err := s . engine . DeleteSeriesRange ( seriesKeys , min , max ) ; err != nil {
return err
}
return nil
2015-06-02 15:20:20 +00:00
}
2015-07-22 14:53:20 +00:00
// DeleteMeasurement deletes a measurement and all underlying series.
func ( s * Shard ) DeleteMeasurement ( name string , seriesKeys [ ] string ) error {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return err
2016-03-29 22:32:34 +00:00
}
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
if err := s . engine . DeleteMeasurement ( name , seriesKeys ) ; err != nil {
2015-06-03 15:32:50 +00:00
return err
}
return nil
}
2016-04-01 19:30:09 +00:00
func ( s * Shard ) createFieldsAndMeasurements ( fieldsToCreate [ ] * FieldCreate ) error {
2015-05-23 22:06:07 +00:00
if len ( fieldsToCreate ) == 0 {
2016-04-01 19:30:09 +00:00
return nil
2015-05-23 22:06:07 +00:00
}
// add fields
for _ , f := range fieldsToCreate {
2016-04-01 19:30:09 +00:00
m := s . engine . MeasurementFields ( f . Measurement )
2015-05-23 22:06:07 +00:00
2015-11-04 21:06:06 +00:00
// Add the field to the in memory index
if err := m . CreateFieldIfNotExists ( f . Field . Name , f . Field . Type , false ) ; err != nil {
2016-04-01 19:30:09 +00:00
return err
2015-05-23 22:06:07 +00:00
}
// ensure the measurement is in the index and the field is there
2015-07-22 14:53:20 +00:00
measurement := s . index . CreateMeasurementIndexIfNotExists ( f . Measurement )
2015-12-03 19:52:27 +00:00
measurement . SetFieldName ( f . Field . Name )
2015-05-23 22:06:07 +00:00
}
2016-04-01 19:30:09 +00:00
return nil
2015-05-23 22:06:07 +00:00
}
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed
2016-04-01 19:30:09 +00:00
func ( s * Shard ) validateSeriesAndFields ( points [ ] models . Point ) ( [ ] * FieldCreate , error ) {
2015-07-22 14:53:20 +00:00
var fieldsToCreate [ ] * FieldCreate
2015-05-23 22:06:07 +00:00
// get the shard mutex for locally defined fields
for _ , p := range points {
2016-08-10 15:01:59 +00:00
// verify the tags and fields
tags := p . Tags ( )
2016-06-30 16:49:53 +00:00
if v := tags . Get ( [ ] byte ( "time" ) ) ; v != nil {
2016-08-10 15:01:59 +00:00
s . logger . Printf ( "dropping tag 'time' from '%s'\n" , p . PrecisionString ( "" ) )
2016-06-30 16:49:53 +00:00
tags . Delete ( [ ] byte ( "time" ) )
2016-08-10 15:01:59 +00:00
p . SetTags ( tags )
}
fields := p . Fields ( )
if _ , ok := fields [ "time" ] ; ok {
s . logger . Printf ( "dropping field 'time' from '%s'\n" , p . PrecisionString ( "" ) )
delete ( fields , "time" )
if len ( fields ) == 0 {
continue
}
}
2015-05-23 22:06:07 +00:00
// see if the series should be added to the index
2016-04-01 15:49:08 +00:00
key := string ( p . Key ( ) )
ss := s . index . Series ( key )
2016-04-01 19:30:09 +00:00
if ss == nil {
2016-07-21 12:11:06 +00:00
if s . options . Config . MaxSeriesPerDatabase > 0 && len ( s . index . series ) + 1 > s . options . Config . MaxSeriesPerDatabase {
return nil , fmt . Errorf ( "max series per database exceeded: %s" , key )
}
2016-08-10 15:01:59 +00:00
ss = NewSeries ( key , tags )
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . SeriesCreated , 1 )
2015-05-23 22:06:07 +00:00
}
2016-04-01 19:30:09 +00:00
ss = s . index . CreateSeriesIndexIfNotExists ( p . Name ( ) , ss )
2016-04-29 22:31:57 +00:00
s . index . AssignShard ( ss . Key , s . id )
2016-04-01 19:30:09 +00:00
2015-05-23 22:06:07 +00:00
// see if the field definitions need to be saved to the shard
2016-04-01 19:30:09 +00:00
mf := s . engine . MeasurementFields ( p . Name ( ) )
2016-03-29 22:32:34 +00:00
2015-05-23 22:06:07 +00:00
if mf == nil {
2016-08-10 15:01:59 +00:00
for name , value := range fields {
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
continue // skip validation since all fields are new
}
// validate field types and encode data
2016-08-10 15:01:59 +00:00
for name , value := range fields {
2016-04-01 19:30:09 +00:00
if f := mf . Field ( name ) ; f != nil {
2015-06-09 21:57:19 +00:00
// Field present in shard metadata, make sure there is no type conflict.
2015-05-23 22:06:07 +00:00
if f . Type != influxql . InspectDataType ( value ) {
2016-08-31 12:00:57 +00:00
return nil , fmt . Errorf ( "%s: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s" , ErrFieldTypeConflict , name , p . Name ( ) , value , f . Type )
2015-05-23 22:06:07 +00:00
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
}
2016-04-01 19:30:09 +00:00
return fieldsToCreate , nil
2015-05-23 22:06:07 +00:00
}
2015-06-18 15:07:51 +00:00
// SeriesCount returns the number of series buckets on the shard.
2016-03-29 21:55:09 +00:00
func ( s * Shard ) SeriesCount ( ) ( int , error ) {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return 0 , err
2016-03-29 22:32:34 +00:00
}
2016-03-29 21:55:09 +00:00
return s . engine . SeriesCount ( )
}
2015-06-18 15:07:51 +00:00
2015-09-03 16:48:37 +00:00
// WriteTo writes the shard's data to w.
2015-09-04 22:43:57 +00:00
func ( s * Shard ) WriteTo ( w io . Writer ) ( int64 , error ) {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return 0 , err
2016-03-29 22:32:34 +00:00
}
2015-09-04 22:43:57 +00:00
n , err := s . engine . WriteTo ( w )
2016-07-07 16:13:56 +00:00
atomic . AddInt64 ( & s . stats . BytesWritten , int64 ( n ) )
2015-09-04 22:43:57 +00:00
return n , err
}
2015-09-03 16:48:37 +00:00
2015-11-04 21:06:06 +00:00
// CreateIterator returns an iterator for the data in the shard.
func ( s * Shard ) CreateIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
2016-05-27 22:47:33 +00:00
if err := s . ready ( ) ; err != nil {
return nil , err
2016-03-29 22:32:34 +00:00
}
2016-02-19 20:38:02 +00:00
if influxql . Sources ( opt . Sources ) . HasSystemSource ( ) {
return s . createSystemIterator ( opt )
}
2016-05-23 19:32:52 +00:00
opt . Sources = influxql . Sources ( opt . Sources ) . Filter ( s . database , s . retentionPolicy )
2015-11-04 21:06:06 +00:00
return s . engine . CreateIterator ( opt )
2015-11-04 21:06:06 +00:00
}
2016-02-19 20:38:02 +00:00
// createSystemIterator returns an iterator for a system source.
func ( s * Shard ) createSystemIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
// Only support a single system source.
if len ( opt . Sources ) > 1 {
return nil , errors . New ( "cannot select from multiple system sources" )
}
m := opt . Sources [ 0 ] . ( * influxql . Measurement )
switch m . Name {
2016-02-23 23:43:19 +00:00
case "_fieldKeys" :
return NewFieldKeysIterator ( s , opt )
2016-02-25 21:28:45 +00:00
case "_series" :
return NewSeriesIterator ( s , opt )
2016-02-19 20:38:02 +00:00
case "_tagKeys" :
return NewTagKeysIterator ( s , opt )
default :
return nil , fmt . Errorf ( "unknown system source: %s" , m . Name )
}
}
2015-11-04 21:06:06 +00:00
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
2016-05-16 16:08:28 +00:00
func ( s * Shard ) FieldDimensions ( sources influxql . Sources ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error ) {
Fix panic:runtime error: invalid memory address or nil pointer dereference
github.com/influxdata/influxdb/tsdb.(*Shard).FieldDimensions(0xc820244000, 0xc821b70fb0, 0x1, 0x1, 0xc822b9cc00, 0xc822b9cc30, 0x0, 0x0)
/Users/jason/go/src/github.com/influxdata/influxdb/tsdb/shard.go:588 +0xa62
github.com/influxdata/influxdb/tsdb.(*shardIteratorCreator).FieldDimensions(0xc8202b6078, 0xc821b70fb0, 0x1, 0x1, 0xc822b9cbd0, 0x0, 0x0, 0x0)
/Users/jason/go/src/github.com/influxdata/influxdb/tsdb/shard.go:818 +0x53
github.com/influxdata/influxdb/influxql.IteratorCreators.FieldDimensions(0xc821b71250, 0x1, 0x1, 0xc821b70fb0, 0x1, 0x1, 0xc822b9cba0, 0xc822b9cbd0, 0x0, 0x0)
/Users/jason/go/src/github.com/influxdata/influxdb/influxql/iterator.go:639 +0x15a
github.com/influxdata/influxdb/influxql.(*IteratorCreators).FieldDimensions(0xc822a32ae0, 0xc821b70fb0, 0x1, 0x1, 0x20, 0x18, 0x0, 0x0)
<autogenerated>:163 +0xd3
2016-07-18 22:35:33 +00:00
if err := s . ready ( ) ; err != nil {
return nil , nil , err
}
2016-07-28 22:38:08 +00:00
if sources . HasSystemSource ( ) {
2016-05-16 16:08:28 +00:00
// Only support a single system source.
if len ( sources ) > 1 {
return nil , nil , errors . New ( "cannot select from multiple system sources" )
}
switch m := sources [ 0 ] . ( type ) {
case * influxql . Measurement :
switch m . Name {
case "_fieldKeys" :
return map [ string ] influxql . DataType {
"fieldKey" : influxql . String ,
"fieldType" : influxql . String ,
} , nil , nil
case "_series" :
2016-07-28 22:38:08 +00:00
return map [ string ] influxql . DataType {
"key" : influxql . String ,
} , nil , nil
2016-05-16 16:08:28 +00:00
case "_tagKeys" :
return map [ string ] influxql . DataType {
2016-07-28 22:38:08 +00:00
"tagKey" : influxql . String ,
2016-05-16 16:08:28 +00:00
} , nil , nil
}
}
return nil , nil , nil
}
fields = make ( map [ string ] influxql . DataType )
2015-11-04 21:06:06 +00:00
dimensions = make ( map [ string ] struct { } )
for _ , src := range sources {
switch m := src . ( type ) {
case * influxql . Measurement :
// Retrieve measurement.
mm := s . index . Measurement ( m . Name )
if mm == nil {
continue
}
// Append fields and dimensions.
2016-05-18 18:43:48 +00:00
mf := s . engine . MeasurementFields ( m . Name )
if mf != nil {
for name , typ := range mf . FieldSet ( ) {
fields [ name ] = typ
2016-05-16 16:08:28 +00:00
}
2015-11-04 21:06:06 +00:00
}
for _ , key := range mm . TagKeys ( ) {
dimensions [ key ] = struct { } { }
}
}
}
return
}
2016-03-04 18:01:41 +00:00
// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func ( s * Shard ) ExpandSources ( sources influxql . Sources ) ( influxql . Sources , error ) {
// Use a map as a set to prevent duplicates.
set := map [ string ] influxql . Source { }
// Iterate all sources, expanding regexes when they're found.
for _ , source := range sources {
switch src := source . ( type ) {
case * influxql . Measurement :
// Add non-regex measurements directly to the set.
if src . Regex == nil {
set [ src . String ( ) ] = src
continue
}
// Loop over matching measurements.
for _ , m := range s . index . MeasurementsByRegex ( src . Regex . Val ) {
other := & influxql . Measurement {
Database : src . Database ,
RetentionPolicy : src . RetentionPolicy ,
Name : m . Name ,
}
set [ other . String ( ) ] = other
}
default :
return nil , fmt . Errorf ( "expandSources: unsupported source type: %T" , source )
}
}
// Convert set to sorted slice.
names := make ( [ ] string , 0 , len ( set ) )
for name := range set {
names = append ( names , name )
}
sort . Strings ( names )
// Convert set to a list of Sources.
expanded := make ( influxql . Sources , 0 , len ( set ) )
for _ , name := range names {
expanded = append ( expanded , set [ name ] )
}
return expanded , nil
}
2016-04-29 00:29:09 +00:00
// Restore restores data to the underlying engine for the shard.
// The shard is reopened after restore.
func ( s * Shard ) Restore ( r io . Reader , basePath string ) error {
s . mu . Lock ( )
// Restore to engine.
if err := s . engine . Restore ( r , basePath ) ; err != nil {
s . mu . Unlock ( )
return err
}
s . mu . Unlock ( )
// Close shard.
if err := s . Close ( ) ; err != nil {
return err
}
// Reopen engine.
2016-05-02 17:47:31 +00:00
return s . Open ( )
2016-04-29 00:29:09 +00:00
}
2016-05-09 15:53:34 +00:00
// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files
func ( s * Shard ) CreateSnapshot ( ) ( string , error ) {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
return s . engine . CreateSnapshot ( )
}
2016-05-18 14:34:06 +00:00
func ( s * Shard ) monitorSize ( ) {
2016-05-18 15:04:50 +00:00
t := time . NewTicker ( monitorStatInterval )
2016-05-18 14:34:06 +00:00
defer t . Stop ( )
for {
select {
case <- s . closing :
return
case <- t . C :
size , err := s . DiskSize ( )
if err != nil {
2016-05-18 15:04:50 +00:00
s . logger . Printf ( "Error collecting shard size: %v" , err )
2016-05-18 14:34:06 +00:00
continue
}
2016-07-07 16:13:56 +00:00
atomic . StoreInt64 ( & s . stats . DiskBytes , size )
2016-05-18 14:34:06 +00:00
}
}
}
2016-02-19 20:38:02 +00:00
// Shards represents a sortable list of shards.
type Shards [ ] * Shard
2015-11-04 21:06:06 +00:00
2016-02-19 20:38:02 +00:00
func ( a Shards ) Len ( ) int { return len ( a ) }
func ( a Shards ) Less ( i , j int ) bool { return a [ i ] . id < a [ j ] . id }
func ( a Shards ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
2015-11-04 21:06:06 +00:00
2016-02-10 20:04:18 +00:00
// MeasurementFields holds the fields of a measurement and their codec.
2015-07-22 14:53:20 +00:00
type MeasurementFields struct {
2016-04-01 19:30:09 +00:00
mu sync . RWMutex
2016-05-18 12:34:11 +00:00
fields map [ string ] * Field
2015-05-23 22:06:07 +00:00
}
2016-04-01 19:30:09 +00:00
func NewMeasurementFields ( ) * MeasurementFields {
return & MeasurementFields { fields : make ( map [ string ] * Field ) }
}
2015-06-02 23:08:48 +00:00
// MarshalBinary encodes the object to a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) MarshalBinary ( ) ( [ ] byte , error ) {
2016-04-01 19:30:09 +00:00
m . mu . RLock ( )
defer m . mu . RUnlock ( )
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
2016-04-01 19:30:09 +00:00
for _ , f := range m . fields {
2015-06-02 23:08:48 +00:00
id := int32 ( f . ID )
name := f . Name
2015-06-10 20:02:26 +00:00
t := int32 ( f . Type )
2015-06-02 23:08:48 +00:00
pb . Fields = append ( pb . Fields , & internal . Field { ID : & id , Name : & name , Type : & t } )
}
return proto . Marshal ( & pb )
}
2015-06-03 11:36:39 +00:00
// UnmarshalBinary decodes the object from a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) UnmarshalBinary ( buf [ ] byte ) error {
2016-04-01 19:30:09 +00:00
m . mu . Lock ( )
defer m . mu . Unlock ( )
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
if err := proto . Unmarshal ( buf , & pb ) ; err != nil {
return err
}
2016-04-01 19:30:09 +00:00
m . fields = make ( map [ string ] * Field , len ( pb . Fields ) )
2015-06-02 23:08:48 +00:00
for _ , f := range pb . Fields {
2016-04-01 19:30:09 +00:00
m . fields [ f . GetName ( ) ] = & Field { ID : uint8 ( f . GetID ( ) ) , Name : f . GetName ( ) , Type : influxql . DataType ( f . GetType ( ) ) }
2015-06-02 23:08:48 +00:00
}
return nil
}
2015-08-10 18:46:57 +00:00
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
2015-05-23 22:06:07 +00:00
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
2015-10-06 22:49:37 +00:00
func ( m * MeasurementFields ) CreateFieldIfNotExists ( name string , typ influxql . DataType , limitCount bool ) error {
2016-04-02 02:41:22 +00:00
m . mu . RLock ( )
2016-04-01 19:30:09 +00:00
2015-05-23 22:06:07 +00:00
// Ignore if the field already exists.
2016-04-01 19:30:09 +00:00
if f := m . fields [ name ] ; f != nil {
2015-05-23 22:06:07 +00:00
if f . Type != typ {
2016-04-02 02:41:22 +00:00
m . mu . RUnlock ( )
2015-05-23 22:06:07 +00:00
return ErrFieldTypeConflict
}
2016-04-02 02:41:22 +00:00
m . mu . RUnlock ( )
2015-05-23 22:06:07 +00:00
return nil
2015-05-22 20:08:43 +00:00
}
2016-04-02 02:41:22 +00:00
m . mu . RUnlock ( )
2015-05-22 20:08:43 +00:00
2016-04-02 02:41:22 +00:00
m . mu . Lock ( )
2016-04-25 14:45:14 +00:00
defer m . mu . Unlock ( )
2016-04-02 02:41:22 +00:00
if f := m . fields [ name ] ; f != nil {
return nil
2015-05-23 22:06:07 +00:00
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create and append a new field.
2015-07-23 16:33:37 +00:00
f := & Field {
2016-04-01 19:30:09 +00:00
ID : uint8 ( len ( m . fields ) + 1 ) ,
2015-05-23 22:06:07 +00:00
Name : name ,
Type : typ ,
}
2016-04-01 19:30:09 +00:00
m . fields [ name ] = f
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2016-04-01 19:30:09 +00:00
func ( m * MeasurementFields ) Field ( name string ) * Field {
m . mu . RLock ( )
f := m . fields [ name ]
m . mu . RUnlock ( )
return f
}
2016-05-16 16:08:28 +00:00
func ( m * MeasurementFields ) FieldSet ( ) map [ string ] influxql . DataType {
m . mu . RLock ( )
defer m . mu . RUnlock ( )
fields := make ( map [ string ] influxql . DataType )
for name , f := range m . fields {
fields [ name ] = f . Type
}
return fields
}
2015-05-23 22:06:07 +00:00
// Field represents a series field.
2015-07-23 16:33:37 +00:00
type Field struct {
2015-05-23 22:06:07 +00:00
ID uint8 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
Type influxql . DataType ` json:"type,omitempty" `
}
2015-05-22 20:08:43 +00:00
2016-02-19 20:38:02 +00:00
// shardIteratorCreator creates iterators for a local shard.
// This simply wraps the shard so that Close() does not close the underlying shard.
type shardIteratorCreator struct {
2016-08-08 16:39:38 +00:00
sh * Shard
maxSeriesN int
2016-02-19 20:38:02 +00:00
}
func ( ic * shardIteratorCreator ) Close ( ) error { return nil }
func ( ic * shardIteratorCreator ) CreateIterator ( opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
2016-08-08 16:39:38 +00:00
itr , err := ic . sh . CreateIterator ( opt )
if err != nil {
return nil , err
} else if itr == nil {
return nil , nil
}
// Enforce series limit at creation time.
if ic . maxSeriesN > 0 {
stats := itr . Stats ( )
if stats . SeriesN > ic . maxSeriesN {
itr . Close ( )
return nil , fmt . Errorf ( "max select series count exceeded: %d series" , stats . SeriesN )
}
}
return itr , nil
2016-02-19 20:38:02 +00:00
}
2016-05-16 16:08:28 +00:00
func ( ic * shardIteratorCreator ) FieldDimensions ( sources influxql . Sources ) ( fields map [ string ] influxql . DataType , dimensions map [ string ] struct { } , err error ) {
2016-02-19 20:38:02 +00:00
return ic . sh . FieldDimensions ( sources )
}
2016-03-04 18:01:41 +00:00
func ( ic * shardIteratorCreator ) ExpandSources ( sources influxql . Sources ) ( influxql . Sources , error ) {
return ic . sh . ExpandSources ( sources )
}
2016-02-19 20:38:02 +00:00
2016-02-23 23:43:19 +00:00
func NewFieldKeysIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
2016-05-10 17:16:55 +00:00
itr := & fieldKeysIterator { sh : sh }
// Retrieve measurements from shard. Filter if condition specified.
if opt . Condition == nil {
itr . mms = sh . index . Measurements ( )
} else {
mms , _ , err := sh . index . measurementsByExpr ( opt . Condition )
if err != nil {
return nil , err
}
itr . mms = mms
}
// Sort measurements by name.
sort . Sort ( itr . mms )
return itr , nil
}
// fieldKeysIterator iterates over measurements and gets field keys from each measurement.
type fieldKeysIterator struct {
sh * Shard
mms Measurements // remaining measurements
buf struct {
mm * Measurement // current measurement
2016-05-18 18:43:48 +00:00
fields [ ] Field // current measurement's fields
2016-05-10 17:16:55 +00:00
}
}
// Stats returns stats about the points processed.
func ( itr * fieldKeysIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
// Close closes the iterator.
func ( itr * fieldKeysIterator ) Close ( ) error { return nil }
// Next emits the next tag key name.
func ( itr * fieldKeysIterator ) Next ( ) ( * influxql . FloatPoint , error ) {
for {
// If there are no more keys then move to the next measurements.
if len ( itr . buf . fields ) == 0 {
if len ( itr . mms ) == 0 {
return nil , nil
}
itr . buf . mm = itr . mms [ 0 ]
2016-05-18 18:43:48 +00:00
mf := itr . sh . engine . MeasurementFields ( itr . buf . mm . Name )
if mf != nil {
fset := mf . FieldSet ( )
if len ( fset ) == 0 {
itr . mms = itr . mms [ 1 : ]
continue
}
keys := make ( [ ] string , 0 , len ( fset ) )
for k := range fset {
keys = append ( keys , k )
}
2016-05-10 17:16:55 +00:00
sort . Strings ( keys )
2016-05-18 18:43:48 +00:00
itr . buf . fields = make ( [ ] Field , len ( keys ) )
2016-05-10 17:16:55 +00:00
for i , name := range keys {
2016-05-18 18:43:48 +00:00
itr . buf . fields [ i ] = Field { Name : name , Type : fset [ name ] }
2016-05-10 17:16:55 +00:00
}
}
itr . mms = itr . mms [ 1 : ]
continue
}
// Return next key.
field := itr . buf . fields [ 0 ]
p := & influxql . FloatPoint {
Name : itr . buf . mm . Name ,
Aux : [ ] interface { } { field . Name , field . Type . String ( ) } ,
}
itr . buf . fields = itr . buf . fields [ 1 : ]
return p , nil
2016-02-23 23:43:19 +00:00
}
}
2016-02-25 21:28:45 +00:00
// seriesIterator emits series ids.
type seriesIterator struct {
2016-05-02 15:17:08 +00:00
mms Measurements
keys struct {
buf [ ] string
i int
}
point influxql . FloatPoint // reusable point
opt influxql . IteratorOptions
2016-02-25 21:28:45 +00:00
}
// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
// Only equality operators are allowed.
var err error
influxql . WalkFunc ( opt . Condition , func ( n influxql . Node ) {
switch n := n . ( type ) {
case * influxql . BinaryExpr :
switch n . Op {
case influxql . EQ , influxql . NEQ , influxql . EQREGEX , influxql . NEQREGEX ,
influxql . OR , influxql . AND :
default :
err = errors . New ( "invalid tag comparison operator" )
}
}
} )
if err != nil {
return nil , err
}
2016-05-02 15:17:08 +00:00
// Read and sort all measurements.
mms := sh . index . Measurements ( )
sort . Sort ( mms )
2016-02-25 21:28:45 +00:00
return & seriesIterator {
2016-05-02 15:17:08 +00:00
mms : mms ,
point : influxql . FloatPoint {
Aux : make ( [ ] interface { } , len ( opt . Aux ) ) ,
} ,
opt : opt ,
2016-02-25 21:28:45 +00:00
} , nil
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * seriesIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-02-25 21:28:45 +00:00
// Close closes the iterator.
func ( itr * seriesIterator ) Close ( ) error { return nil }
// Next emits the next point in the iterator.
2016-04-17 20:00:59 +00:00
func ( itr * seriesIterator ) Next ( ) ( * influxql . FloatPoint , error ) {
2016-05-02 15:17:08 +00:00
for {
// Load next measurement's keys if there are no more remaining.
if itr . keys . i >= len ( itr . keys . buf ) {
if err := itr . nextKeys ( ) ; err != nil {
return nil , err
}
if len ( itr . keys . buf ) == 0 {
return nil , nil
}
}
2016-02-25 21:28:45 +00:00
2016-05-02 15:17:08 +00:00
// Read the next key.
key := itr . keys . buf [ itr . keys . i ]
itr . keys . i ++
// Write auxiliary fields.
for i , f := range itr . opt . Aux {
2016-05-16 16:08:28 +00:00
switch f . Val {
2016-05-02 15:17:08 +00:00
case "key" :
itr . point . Aux [ i ] = key
}
2016-02-25 21:28:45 +00:00
}
2016-05-02 15:17:08 +00:00
return & itr . point , nil
2016-02-25 21:28:45 +00:00
}
2016-05-02 15:17:08 +00:00
}
2016-02-25 21:28:45 +00:00
2016-05-02 15:17:08 +00:00
// nextKeys reads all keys for the next measurement.
func ( itr * seriesIterator ) nextKeys ( ) error {
for {
// Ensure previous keys are cleared out.
itr . keys . i , itr . keys . buf = 0 , itr . keys . buf [ : 0 ]
// Read next measurement.
if len ( itr . mms ) == 0 {
return nil
}
mm := itr . mms [ 0 ]
itr . mms = itr . mms [ 1 : ]
// Read all series keys.
ids , err := mm . seriesIDsAllOrByExpr ( itr . opt . Condition )
if err != nil {
return err
} else if len ( ids ) == 0 {
continue
}
itr . keys . buf = mm . AppendSeriesKeysByID ( itr . keys . buf , ids )
sort . Strings ( itr . keys . buf )
2016-02-25 21:28:45 +00:00
2016-05-02 15:17:08 +00:00
return nil
}
2016-02-25 21:28:45 +00:00
}
2016-02-23 23:43:19 +00:00
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
fn := func ( m * Measurement ) [ ] string {
return m . TagKeys ( )
2016-02-04 18:00:50 +00:00
}
2016-02-23 23:43:19 +00:00
return newMeasurementKeysIterator ( sh , fn , opt )
2016-02-04 18:00:50 +00:00
}
2016-06-30 16:49:53 +00:00
// tagValuesIterator emits key/tag values
type tagValuesIterator struct {
series [ ] * Series // remaining series
keys [ ] string // tag keys to select from a series
fields [ ] string // fields to emit (key or value)
buf struct {
s * Series // current series
keys [ ] string // current tag's keys
}
}
// NewTagValuesIterator returns a new instance of TagValuesIterator.
func NewTagValuesIterator ( sh * Shard , opt influxql . IteratorOptions ) ( influxql . Iterator , error ) {
if opt . Condition == nil {
return nil , errors . New ( "a condition is required" )
}
measurementExpr := influxql . CloneExpr ( opt . Condition )
measurementExpr = influxql . Reduce ( influxql . RewriteExpr ( measurementExpr , func ( e influxql . Expr ) influxql . Expr {
switch e := e . ( type ) {
case * influxql . BinaryExpr :
switch e . Op {
case influxql . EQ , influxql . NEQ , influxql . EQREGEX , influxql . NEQREGEX :
tag , ok := e . LHS . ( * influxql . VarRef )
if ! ok || tag . Val != "_name" {
return nil
}
}
}
return e
} ) , nil )
mms , ok , err := sh . index . measurementsByExpr ( measurementExpr )
if err != nil {
return nil , err
} else if ! ok {
mms = sh . index . Measurements ( )
sort . Sort ( mms )
}
// If there are no measurements, return immediately.
if len ( mms ) == 0 {
return & tagValuesIterator { } , nil
}
filterExpr := influxql . CloneExpr ( opt . Condition )
filterExpr = influxql . Reduce ( influxql . RewriteExpr ( filterExpr , func ( e influxql . Expr ) influxql . Expr {
switch e := e . ( type ) {
case * influxql . BinaryExpr :
switch e . Op {
case influxql . EQ , influxql . NEQ , influxql . EQREGEX , influxql . NEQREGEX :
tag , ok := e . LHS . ( * influxql . VarRef )
if ! ok || strings . HasPrefix ( tag . Val , "_" ) {
return nil
}
}
}
return e
} ) , nil )
var series [ ] * Series
keys := newStringSet ( )
for _ , mm := range mms {
ss , ok , err := mm . TagKeysByExpr ( opt . Condition )
if err != nil {
return nil , err
} else if ! ok {
keys . add ( mm . TagKeys ( ) ... )
} else {
keys = keys . union ( ss )
}
ids , err := mm . seriesIDsAllOrByExpr ( filterExpr )
if err != nil {
return nil , err
}
for _ , id := range ids {
series = append ( series , mm . SeriesByID ( id ) )
}
}
return & tagValuesIterator {
series : series ,
keys : keys . list ( ) ,
fields : influxql . VarRefs ( opt . Aux ) . Strings ( ) ,
} , nil
}
// Stats returns stats about the points processed.
func ( itr * tagValuesIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
// Close closes the iterator.
func ( itr * tagValuesIterator ) Close ( ) error { return nil }
// Next emits the next point in the iterator.
func ( itr * tagValuesIterator ) Next ( ) ( * influxql . FloatPoint , error ) {
for {
// If there are no more values then move to the next key.
if len ( itr . buf . keys ) == 0 {
if len ( itr . series ) == 0 {
return nil , nil
}
itr . buf . s = itr . series [ 0 ]
itr . buf . keys = itr . keys
itr . series = itr . series [ 1 : ]
continue
}
key := itr . buf . keys [ 0 ]
value := itr . buf . s . Tags . GetString ( key )
if value == "" {
itr . buf . keys = itr . buf . keys [ 1 : ]
continue
}
// Prepare auxiliary fields.
auxFields := make ( [ ] interface { } , len ( itr . fields ) )
for i , f := range itr . fields {
switch f {
case "_tagKey" :
auxFields [ i ] = key
case "value" :
auxFields [ i ] = value
}
}
// Return next key.
p := & influxql . FloatPoint {
Name : itr . buf . s . measurement . Name ,
Aux : auxFields ,
}
itr . buf . keys = itr . buf . keys [ 1 : ]
return p , nil
}
}
2016-02-23 23:43:19 +00:00
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func ( m * Measurement ) [ ] string
func newMeasurementKeysIterator ( sh * Shard , fn measurementKeyFunc , opt influxql . IteratorOptions ) ( * measurementKeysIterator , error ) {
itr := & measurementKeysIterator { fn : fn }
2016-02-04 18:00:50 +00:00
// Retrieve measurements from shard. Filter if condition specified.
if opt . Condition == nil {
itr . mms = sh . index . Measurements ( )
} else {
2016-03-06 14:52:34 +00:00
mms , _ , err := sh . index . measurementsByExpr ( opt . Condition )
2016-02-04 18:00:50 +00:00
if err != nil {
return nil , err
}
itr . mms = mms
}
// Sort measurements by name.
sort . Sort ( itr . mms )
return itr , nil
}
2016-02-23 23:43:19 +00:00
// measurementKeysIterator iterates over measurements and gets keys from each measurement.
type measurementKeysIterator struct {
mms Measurements // remaining measurements
buf struct {
mm * Measurement // current measurement
keys [ ] string // current measurement's keys
}
fn measurementKeyFunc
}
2016-03-17 15:55:37 +00:00
// Stats returns stats about the points processed.
func ( itr * measurementKeysIterator ) Stats ( ) influxql . IteratorStats { return influxql . IteratorStats { } }
2016-02-04 18:00:50 +00:00
// Close closes the iterator.
2016-02-23 23:43:19 +00:00
func ( itr * measurementKeysIterator ) Close ( ) error { return nil }
2016-02-04 18:00:50 +00:00
// Next emits the next tag key name.
2016-04-17 20:00:59 +00:00
func ( itr * measurementKeysIterator ) Next ( ) ( * influxql . FloatPoint , error ) {
2016-02-04 18:00:50 +00:00
for {
// If there are no more keys then move to the next measurements.
if len ( itr . buf . keys ) == 0 {
if len ( itr . mms ) == 0 {
2016-04-17 20:00:59 +00:00
return nil , nil
2016-02-04 18:00:50 +00:00
}
itr . buf . mm = itr . mms [ 0 ]
2016-02-23 23:43:19 +00:00
itr . buf . keys = itr . fn ( itr . buf . mm )
2016-02-04 18:00:50 +00:00
itr . mms = itr . mms [ 1 : ]
continue
}
// Return next key.
p := & influxql . FloatPoint {
Name : itr . buf . mm . Name ,
Aux : [ ] interface { } { itr . buf . keys [ 0 ] } ,
}
itr . buf . keys = itr . buf . keys [ 1 : ]
2016-04-17 20:00:59 +00:00
return p , nil
2016-02-04 18:00:50 +00:00
}
}