2015-05-22 20:08:43 +00:00
package tsdb
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
2015-06-18 15:07:51 +00:00
"io"
2015-05-23 22:06:07 +00:00
"math"
2015-06-18 15:07:51 +00:00
"os"
2015-05-22 20:08:43 +00:00
"sync"
"github.com/influxdb/influxdb/influxql"
2015-06-02 23:08:48 +00:00
"github.com/influxdb/influxdb/tsdb/internal"
2015-05-22 20:08:43 +00:00
"github.com/boltdb/bolt"
2015-06-02 23:08:48 +00:00
"github.com/gogo/protobuf/proto"
2015-05-22 20:08:43 +00:00
)
2015-06-18 15:07:51 +00:00
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors . New ( "field overflow" )
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors . New ( "field type conflict" )
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors . New ( "field not found" )
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors . New ( "field ID not mapped" )
)
// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
2015-05-22 20:08:43 +00:00
type Shard struct {
2015-05-23 22:06:07 +00:00
db * bolt . DB // underlying data store
2015-05-24 11:39:45 +00:00
index * DatabaseIndex
2015-05-26 15:41:15 +00:00
path string
2015-08-16 19:45:09 +00:00
id uint64
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
engine Engine
options EngineOptions
2015-05-23 22:06:07 +00:00
mu sync . RWMutex
2015-07-22 14:53:20 +00:00
measurementFields map [ string ] * MeasurementFields // measurement name to their fields
2015-06-18 15:07:51 +00:00
// The writer used by the logger.
LogOutput io . Writer
2015-05-22 20:08:43 +00:00
}
// NewShard returns a new initialized Shard
2015-08-16 19:45:09 +00:00
func NewShard ( id uint64 , index * DatabaseIndex , path string , options EngineOptions ) * Shard {
2015-07-22 14:53:20 +00:00
return & Shard {
2015-05-24 11:39:45 +00:00
index : index ,
2015-05-26 15:41:15 +00:00
path : path ,
2015-08-16 19:45:09 +00:00
id : id ,
2015-07-22 14:53:20 +00:00
options : options ,
measurementFields : make ( map [ string ] * MeasurementFields ) ,
2015-06-18 15:07:51 +00:00
LogOutput : os . Stderr ,
2015-05-22 20:08:43 +00:00
}
}
2015-06-08 19:07:05 +00:00
// Path returns the path set on the shard when it was created.
func ( s * Shard ) Path ( ) string { return s . path }
2015-05-22 20:08:43 +00:00
// open initializes and opens the shard's store.
2015-05-26 15:41:15 +00:00
func ( s * Shard ) Open ( ) error {
2015-06-18 15:07:51 +00:00
if err := func ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
s . index . mu . Lock ( )
defer s . index . mu . Unlock ( )
2015-06-18 15:07:51 +00:00
// Return if the shard is already open
2015-07-22 14:53:20 +00:00
if s . engine != nil {
2015-06-18 15:07:51 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Initialize underlying engine.
e , err := NewEngine ( s . path , s . options )
2015-06-18 15:07:51 +00:00
if err != nil {
2015-07-22 14:53:20 +00:00
return fmt . Errorf ( "new engine: %s" , err )
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
s . engine = e
2015-05-22 20:08:43 +00:00
2015-07-22 14:53:20 +00:00
// Set log output on the engine.
s . engine . SetLogOutput ( s . LogOutput )
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
// Open engine.
if err := s . engine . Open ( ) ; err != nil {
return fmt . Errorf ( "open engine: %s" , err )
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
// Load metadata index.
if err := s . engine . LoadMetadataIndex ( s . index , s . measurementFields ) ; err != nil {
2015-06-18 15:07:51 +00:00
return fmt . Errorf ( "load metadata index: %s" , err )
}
2015-05-22 20:08:43 +00:00
return nil
2015-06-18 15:07:51 +00:00
} ( ) ; err != nil {
s . close ( )
return err
}
return nil
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Close shuts down the shard's store.
2015-05-22 20:08:43 +00:00
func ( s * Shard ) Close ( ) error {
2015-06-04 18:50:32 +00:00
s . mu . Lock ( )
2015-07-22 14:53:20 +00:00
defer s . mu . Unlock ( )
return s . close ( )
2015-06-18 15:07:51 +00:00
}
func ( s * Shard ) close ( ) error {
2015-07-22 14:53:20 +00:00
if s . engine != nil {
return s . engine . Close ( )
2015-05-22 20:08:43 +00:00
}
return nil
}
2015-05-28 22:02:12 +00:00
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
// into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func ( s * Shard ) FieldCodec ( measurementName string ) * FieldCodec {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
m := s . measurementFields [ measurementName ]
if m == nil {
2015-08-13 19:12:12 +00:00
return NewFieldCodec ( nil )
2015-05-28 22:02:12 +00:00
}
2015-07-22 14:53:20 +00:00
return m . Codec
2015-05-28 22:02:12 +00:00
}
2015-05-22 20:08:43 +00:00
// struct to hold information for a field to create on a measurement
2015-07-22 14:53:20 +00:00
type FieldCreate struct {
Measurement string
2015-07-23 16:33:37 +00:00
Field * Field
2015-05-22 20:08:43 +00:00
}
// struct to hold information for a series to create
2015-07-22 14:53:20 +00:00
type SeriesCreate struct {
Measurement string
Series * Series
2015-05-22 20:08:43 +00:00
}
// WritePoints will write the raw data points and any new metadata to the index in the shard
2015-05-22 21:39:55 +00:00
func ( s * Shard ) WritePoints ( points [ ] Point ) error {
2015-05-23 22:06:07 +00:00
seriesToCreate , fieldsToCreate , err := s . validateSeriesAndFields ( points )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// add any new series to the in-memory index
if len ( seriesToCreate ) > 0 {
s . index . mu . Lock ( )
2015-05-22 20:08:43 +00:00
for _ , ss := range seriesToCreate {
2015-07-22 14:53:20 +00:00
s . index . CreateSeriesIndexIfNotExists ( ss . Measurement , ss . Series )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
s . index . mu . Unlock ( )
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// add any new fields and keep track of what needs to be saved
measurementFieldsToSave , err := s . createFieldsAndMeasurements ( fieldsToCreate )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
// make sure all data is encoded before attempting to save to bolt
for _ , p := range points {
2015-07-22 14:53:20 +00:00
// Ignore if raw data has already been marshaled.
if p . Data ( ) != nil {
continue
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
// This was populated earlier, don't need to validate that it's there.
s . mu . RLock ( )
mf := s . measurementFields [ p . Name ( ) ]
s . mu . RUnlock ( )
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
return ErrFieldNotFound
2015-06-18 15:07:51 +00:00
}
2015-07-22 14:53:20 +00:00
data , err := mf . Codec . EncodeFields ( p . Fields ( ) )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
2015-07-22 14:53:20 +00:00
p . SetData ( data )
2015-05-22 20:08:43 +00:00
}
2015-07-22 14:53:20 +00:00
// Write to the engine.
if err := s . engine . WritePoints ( points , measurementFieldsToSave , seriesToCreate ) ; err != nil {
return fmt . Errorf ( "engine: %s" , err )
2015-06-18 15:07:51 +00:00
}
2015-05-22 20:08:43 +00:00
return nil
}
2015-05-28 22:02:12 +00:00
func ( s * Shard ) ValidateAggregateFieldsInStatement ( measurementName string , stmt * influxql . SelectStatement ) error {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
validateType := func ( aname , fname string , t influxql . DataType ) error {
if t != influxql . Float && t != influxql . Integer {
return fmt . Errorf ( "aggregate '%s' requires numerical field values. Field '%s' is of type %s" ,
aname , fname , t )
}
return nil
}
m := s . measurementFields [ measurementName ]
if m == nil {
return fmt . Errorf ( "measurement not found: %s" , measurementName )
}
// If a numerical aggregate is requested, ensure it is only performed on numeric data or on a
// nested aggregate on numeric data.
for _ , a := range stmt . FunctionCalls ( ) {
// Check for fields like `derivative(mean(value), 1d)`
var nested * influxql . Call = a
if fn , ok := nested . Args [ 0 ] . ( * influxql . Call ) ; ok {
nested = fn
}
switch lit := nested . Args [ 0 ] . ( type ) {
case * influxql . VarRef :
if influxql . IsNumeric ( nested ) {
f := m . Fields [ lit . Val ]
if err := validateType ( a . Name , f . Name , f . Type ) ; err != nil {
return err
}
}
case * influxql . Distinct :
if nested . Name != "count" {
return fmt . Errorf ( "aggregate call didn't contain a field %s" , a . String ( ) )
}
if influxql . IsNumeric ( nested ) {
f := m . Fields [ lit . Val ]
if err := validateType ( a . Name , f . Name , f . Type ) ; err != nil {
return err
}
}
default :
return fmt . Errorf ( "aggregate call didn't contain a field %s" , a . String ( ) )
}
}
return nil
}
2015-07-22 14:53:20 +00:00
// DeleteSeries deletes a list of series.
func ( s * Shard ) DeleteSeries ( keys [ ] string ) error {
return s . engine . DeleteSeries ( keys )
2015-06-02 15:20:20 +00:00
}
2015-07-22 14:53:20 +00:00
// DeleteMeasurement deletes a measurement and all underlying series.
func ( s * Shard ) DeleteMeasurement ( name string , seriesKeys [ ] string ) error {
2015-06-18 15:07:51 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-07-22 14:53:20 +00:00
if err := s . engine . DeleteMeasurement ( name , seriesKeys ) ; err != nil {
2015-06-03 15:32:50 +00:00
return err
}
2015-06-16 18:40:36 +00:00
// Remove entry from shard index.
delete ( s . measurementFields , name )
2015-07-22 14:53:20 +00:00
2015-06-03 15:32:50 +00:00
return nil
}
2015-07-22 14:53:20 +00:00
func ( s * Shard ) createFieldsAndMeasurements ( fieldsToCreate [ ] * FieldCreate ) ( map [ string ] * MeasurementFields , error ) {
2015-05-23 22:06:07 +00:00
if len ( fieldsToCreate ) == 0 {
return nil , nil
}
s . index . mu . Lock ( )
s . mu . Lock ( )
defer s . index . mu . Unlock ( )
defer s . mu . Unlock ( )
// add fields
2015-07-22 14:53:20 +00:00
measurementsToSave := make ( map [ string ] * MeasurementFields )
2015-05-23 22:06:07 +00:00
for _ , f := range fieldsToCreate {
2015-07-22 14:53:20 +00:00
m := s . measurementFields [ f . Measurement ]
2015-05-23 22:06:07 +00:00
if m == nil {
2015-07-22 14:53:20 +00:00
m = measurementsToSave [ f . Measurement ]
2015-05-23 22:06:07 +00:00
if m == nil {
2015-07-23 16:33:37 +00:00
m = & MeasurementFields { Fields : make ( map [ string ] * Field ) }
2015-05-23 22:06:07 +00:00
}
2015-07-22 14:53:20 +00:00
s . measurementFields [ f . Measurement ] = m
2015-05-23 22:06:07 +00:00
}
2015-07-22 14:53:20 +00:00
measurementsToSave [ f . Measurement ] = m
2015-05-23 22:06:07 +00:00
// add the field to the in memory index
2015-08-10 18:46:57 +00:00
if err := m . CreateFieldIfNotExists ( f . Field . Name , f . Field . Type ) ; err != nil {
2015-05-23 22:06:07 +00:00
return nil , err
}
// ensure the measurement is in the index and the field is there
2015-07-22 14:53:20 +00:00
measurement := s . index . CreateMeasurementIndexIfNotExists ( f . Measurement )
measurement . fieldNames [ f . Field . Name ] = struct { } { }
2015-05-23 22:06:07 +00:00
}
return measurementsToSave , nil
}
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed
2015-07-22 14:53:20 +00:00
func ( s * Shard ) validateSeriesAndFields ( points [ ] Point ) ( [ ] * SeriesCreate , [ ] * FieldCreate , error ) {
var seriesToCreate [ ] * SeriesCreate
var fieldsToCreate [ ] * FieldCreate
2015-05-23 22:06:07 +00:00
// get the mutex for the in memory index, which is shared across shards
s . index . mu . RLock ( )
defer s . index . mu . RUnlock ( )
// get the shard mutex for locally defined fields
s . mu . RLock ( )
defer s . mu . RUnlock ( )
for _ , p := range points {
// see if the series should be added to the index
2015-05-28 21:47:52 +00:00
if ss := s . index . series [ string ( p . Key ( ) ) ] ; ss == nil {
2015-08-16 19:45:09 +00:00
series := NewSeries ( string ( p . Key ( ) ) , p . Tags ( ) )
2015-07-22 14:53:20 +00:00
seriesToCreate = append ( seriesToCreate , & SeriesCreate { p . Name ( ) , series } )
2015-08-16 19:45:09 +00:00
} else if ! ss . shardIDs [ s . id ] {
// this is the first time this series is being written into this shard, persist it
ss . shardIDs [ s . id ] = true
seriesToCreate = append ( seriesToCreate , & SeriesCreate { p . Name ( ) , ss } )
2015-05-23 22:06:07 +00:00
}
// see if the field definitions need to be saved to the shard
mf := s . measurementFields [ p . Name ( ) ]
if mf == nil {
for name , value := range p . Fields ( ) {
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
continue // skip validation since all fields are new
}
// validate field types and encode data
for name , value := range p . Fields ( ) {
if f := mf . Fields [ name ] ; f != nil {
2015-06-09 21:57:19 +00:00
// Field present in shard metadata, make sure there is no type conflict.
2015-05-23 22:06:07 +00:00
if f . Type != influxql . InspectDataType ( value ) {
2015-06-11 22:14:49 +00:00
return nil , nil , fmt . Errorf ( "field type conflict: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s" , name , p . Name ( ) , value , f . Type )
2015-05-23 22:06:07 +00:00
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
2015-07-23 16:33:37 +00:00
fieldsToCreate = append ( fieldsToCreate , & FieldCreate { p . Name ( ) , & Field { Name : name , Type : influxql . InspectDataType ( value ) } } )
2015-05-23 22:06:07 +00:00
}
}
return seriesToCreate , fieldsToCreate , nil
}
2015-06-18 15:07:51 +00:00
// SeriesCount returns the number of series buckets on the shard.
2015-07-22 14:53:20 +00:00
func ( s * Shard ) SeriesCount ( ) ( int , error ) { return s . engine . SeriesCount ( ) }
2015-06-18 15:07:51 +00:00
2015-07-22 14:53:20 +00:00
type MeasurementFields struct {
2015-07-23 16:33:37 +00:00
Fields map [ string ] * Field ` json:"fields" `
2015-07-22 14:53:20 +00:00
Codec * FieldCodec
2015-05-23 22:06:07 +00:00
}
2015-06-02 23:08:48 +00:00
// MarshalBinary encodes the object to a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) MarshalBinary ( ) ( [ ] byte , error ) {
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
for _ , f := range m . Fields {
id := int32 ( f . ID )
name := f . Name
2015-06-10 20:02:26 +00:00
t := int32 ( f . Type )
2015-06-02 23:08:48 +00:00
pb . Fields = append ( pb . Fields , & internal . Field { ID : & id , Name : & name , Type : & t } )
}
return proto . Marshal ( & pb )
}
2015-06-03 11:36:39 +00:00
// UnmarshalBinary decodes the object from a binary format.
2015-07-22 14:53:20 +00:00
func ( m * MeasurementFields ) UnmarshalBinary ( buf [ ] byte ) error {
2015-06-02 23:08:48 +00:00
var pb internal . MeasurementFields
if err := proto . Unmarshal ( buf , & pb ) ; err != nil {
return err
}
2015-07-23 16:33:37 +00:00
m . Fields = make ( map [ string ] * Field )
2015-06-02 23:08:48 +00:00
for _ , f := range pb . Fields {
2015-07-23 16:33:37 +00:00
m . Fields [ f . GetName ( ) ] = & Field { ID : uint8 ( f . GetID ( ) ) , Name : f . GetName ( ) , Type : influxql . DataType ( f . GetType ( ) ) }
2015-06-02 23:08:48 +00:00
}
return nil
}
2015-08-10 18:46:57 +00:00
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
2015-05-23 22:06:07 +00:00
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
2015-08-10 18:46:57 +00:00
func ( m * MeasurementFields ) CreateFieldIfNotExists ( name string , typ influxql . DataType ) error {
2015-05-23 22:06:07 +00:00
// Ignore if the field already exists.
if f := m . Fields [ name ] ; f != nil {
if f . Type != typ {
return ErrFieldTypeConflict
}
return nil
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Only 255 fields are allowed. If we go over that then return an error.
if len ( m . Fields ) + 1 > math . MaxUint8 {
return ErrFieldOverflow
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create and append a new field.
2015-07-23 16:33:37 +00:00
f := & Field {
2015-05-23 22:06:07 +00:00
ID : uint8 ( len ( m . Fields ) + 1 ) ,
Name : name ,
Type : typ ,
}
m . Fields [ name ] = f
2015-07-22 14:53:20 +00:00
m . Codec = NewFieldCodec ( m . Fields )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Field represents a series field.
2015-07-23 16:33:37 +00:00
type Field struct {
2015-05-23 22:06:07 +00:00
ID uint8 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
Type influxql . DataType ` json:"type,omitempty" `
}
2015-05-22 20:08:43 +00:00
2015-06-28 06:54:34 +00:00
// FieldCodec provides encoding and decoding functionality for the fields of a given
2015-05-23 22:06:07 +00:00
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
// However, this is here until tx.go and the engine get refactored into tsdb.
type FieldCodec struct {
2015-07-23 16:33:37 +00:00
fieldsByID map [ uint8 ] * Field
fieldsByName map [ string ] * Field
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
2015-07-23 16:33:37 +00:00
func NewFieldCodec ( fields map [ string ] * Field ) * FieldCodec {
fieldsByID := make ( map [ uint8 ] * Field , len ( fields ) )
fieldsByName := make ( map [ string ] * Field , len ( fields ) )
2015-05-23 22:06:07 +00:00
for _ , f := range fields {
fieldsByID [ f . ID ] = f
fieldsByName [ f . Name ] = f
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
return & FieldCodec { fieldsByID : fieldsByID , fieldsByName : fieldsByName }
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) EncodeFields ( values map [ string ] interface { } ) ( [ ] byte , error ) {
2015-05-23 22:06:07 +00:00
// Allocate byte slice
b := make ( [ ] byte , 0 , 10 )
for k , v := range values {
field := f . fieldsByName [ k ]
if field == nil {
panic ( fmt . Sprintf ( "field does not exist for %s" , k ) )
} else if influxql . InspectDataType ( v ) != field . Type {
2015-06-10 21:57:27 +00:00
return nil , fmt . Errorf ( "field \"%s\" is type %T, mapped as type %s" , k , v , field . Type )
2015-05-23 22:06:07 +00:00
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
var buf [ ] byte
switch field . Type {
case influxql . Float :
value := v . ( float64 )
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , math . Float64bits ( value ) )
case influxql . Integer :
var value uint64
switch v . ( type ) {
case int :
value = uint64 ( v . ( int ) )
case int32 :
value = uint64 ( v . ( int32 ) )
case int64 :
value = uint64 ( v . ( int64 ) )
default :
panic ( fmt . Sprintf ( "invalid integer type: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , value )
case influxql . Boolean :
value := v . ( bool )
// Only 1 byte need for a boolean.
buf = make ( [ ] byte , 2 )
if value {
buf [ 1 ] = byte ( 1 )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
case influxql . String :
value := v . ( string )
if len ( value ) > maxStringLength {
value = value [ : maxStringLength ]
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
buf = make ( [ ] byte , len ( value ) + 3 )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Set the string length, then copy the string itself.
binary . BigEndian . PutUint16 ( buf [ 1 : 3 ] , uint16 ( len ( value ) ) )
for i , c := range [ ] byte ( value ) {
buf [ i + 3 ] = byte ( c )
2015-05-22 20:08:43 +00:00
}
default :
2015-05-23 22:06:07 +00:00
panic ( fmt . Sprintf ( "unsupported value type during encode fields: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Always set the field ID as the leading byte.
buf [ 0 ] = field . ID
// Append temp buffer to the end.
b = append ( b , buf ... )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return b , nil
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
func ( f * FieldCodec ) FieldIDByName ( s string ) ( uint8 , error ) {
fi := f . fieldsByName [ s ]
if fi == nil {
2015-06-30 17:29:09 +00:00
return 0 , ErrFieldNotFound
2015-05-28 22:02:12 +00:00
}
return fi . ID , nil
}
2015-05-23 22:06:07 +00:00
// DecodeFields decodes a byte slice into a set of field ids and values.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) DecodeFields ( b [ ] byte ) ( map [ uint8 ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
if len ( b ) == 0 {
return nil , nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create a map to hold the decoded data.
values := make ( map [ uint8 ] interface { } , 0 )
for {
if len ( b ) < 1 {
// No more bytes.
break
}
// First byte is the field identifier.
fieldID := b [ 0 ]
field := f . fieldsByID [ fieldID ]
if field == nil {
// See note in DecodeByID() regarding field-mapping failures.
return nil , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : size + 3 ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode fields: %T" , f . fieldsByID [ fieldID ] ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
values [ fieldID ] = value
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return values , nil
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// DecodeFieldsWithNames decodes a byte slice into a set of field names and values
2015-05-28 22:02:12 +00:00
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeFieldsWithNames ( b [ ] byte ) ( map [ string ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
fields , err := f . DecodeFields ( b )
if err != nil {
return nil , err
}
m := make ( map [ string ] interface { } )
for id , v := range fields {
field := f . fieldsByID [ id ]
if field != nil {
m [ field . Name ] = v
2015-05-22 20:08:43 +00:00
}
}
2015-05-23 22:06:07 +00:00
return m , nil
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeByID ( targetID uint8 , b [ ] byte ) ( interface { } , error ) {
if len ( b ) == 0 {
return 0 , ErrFieldNotFound
}
for {
if len ( b ) < 1 {
// No more bytes.
break
}
field , ok := f . fieldsByID [ b [ 0 ] ]
if ! ok {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return 0 , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
// Move bytes forward.
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : 3 + size ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode by id: %T" , field . Type ) )
}
if field . ID == targetID {
return value , nil
}
}
return 0 , ErrFieldNotFound
}
2015-06-30 17:29:09 +00:00
// DecodeByName scans a byte slice for a field with the given name, converts it to its
// expected type, and return that value.
func ( f * FieldCodec ) DecodeByName ( name string , b [ ] byte ) ( interface { } , error ) {
2015-07-09 18:24:06 +00:00
fi := f . fieldByName ( name )
if fi == nil {
2015-06-30 18:17:04 +00:00
return 0 , ErrFieldNotFound
2015-06-30 17:29:09 +00:00
}
2015-07-09 18:24:06 +00:00
return f . DecodeByID ( fi . ID , b )
2015-06-30 17:29:09 +00:00
}
2015-05-23 22:06:07 +00:00
// FieldByName returns the field by its name. It will return a nil if not found
2015-07-23 16:33:37 +00:00
func ( f * FieldCodec ) fieldByName ( name string ) * Field {
2015-05-23 22:06:07 +00:00
return f . fieldsByName [ name ]
2015-05-22 20:08:43 +00:00
}
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshalJSON ( v interface { } ) [ ] byte {
b , err := json . Marshal ( v )
if err != nil {
panic ( "marshal: " + err . Error ( ) )
}
return b
}
// mustUnmarshalJSON decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshalJSON ( b [ ] byte , v interface { } ) {
if err := json . Unmarshal ( b , v ) ; err != nil {
panic ( "unmarshal: " + err . Error ( ) )
}
}
// u64tob converts a uint64 into an 8-byte slice.
func u64tob ( v uint64 ) [ ] byte {
b := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( b , v )
return b
}