2015-05-22 20:08:43 +00:00
package tsdb
import (
2015-06-18 15:07:51 +00:00
"bytes"
2015-05-22 20:08:43 +00:00
"encoding/binary"
"encoding/json"
"errors"
"fmt"
2015-06-18 15:07:51 +00:00
"hash/fnv"
"io"
"log"
2015-05-23 22:06:07 +00:00
"math"
2015-06-18 15:07:51 +00:00
"os"
"sort"
2015-05-22 20:08:43 +00:00
"sync"
"time"
"github.com/influxdb/influxdb/influxql"
2015-06-02 23:08:48 +00:00
"github.com/influxdb/influxdb/tsdb/internal"
2015-05-22 20:08:43 +00:00
"github.com/boltdb/bolt"
2015-06-02 23:08:48 +00:00
"github.com/gogo/protobuf/proto"
2015-05-22 20:08:43 +00:00
)
2015-06-18 15:07:51 +00:00
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors . New ( "field overflow" )
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors . New ( "field type conflict" )
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors . New ( "field not found" )
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors . New ( "field ID not mapped" )
// ErrWALPartitionNotFound is returns when flushing a WAL partition that
// does not exist.
ErrWALPartitionNotFound = errors . New ( "wal partition not found" )
)
// topLevelBucketN is the number of non-series buckets in the bolt db.
const topLevelBucketN = 3
// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
2015-05-22 20:08:43 +00:00
type Shard struct {
2015-05-23 22:06:07 +00:00
db * bolt . DB // underlying data store
2015-05-24 11:39:45 +00:00
index * DatabaseIndex
2015-05-26 15:41:15 +00:00
path string
2015-06-18 15:07:51 +00:00
cache map [ uint8 ] map [ string ] [ ] [ ] byte // values by <wal partition,series>
walSize int // approximate size of the WAL, in bytes
flush chan struct { } // signals background flush
flushTimer * time . Timer // signals time-based flush
2015-05-23 22:06:07 +00:00
mu sync . RWMutex
measurementFields map [ string ] * measurementFields // measurement name to their fields
2015-06-18 15:07:51 +00:00
// These coordinate closing and waiting for running goroutines.
wg sync . WaitGroup
closing chan struct { }
// Used for out-of-band error messages.
logger * log . Logger
// The maximum size and time thresholds for flushing the WAL.
2015-07-02 14:05:29 +00:00
MaxWALSize int
WALFlushInterval time . Duration
WALPartitionFlushDelay time . Duration
2015-06-18 15:07:51 +00:00
// The writer used by the logger.
LogOutput io . Writer
2015-05-22 20:08:43 +00:00
}
// NewShard returns a new initialized Shard
2015-05-26 15:41:15 +00:00
func NewShard ( index * DatabaseIndex , path string ) * Shard {
2015-06-18 15:07:51 +00:00
s := & Shard {
2015-05-24 11:39:45 +00:00
index : index ,
2015-05-26 15:41:15 +00:00
path : path ,
2015-06-18 15:07:51 +00:00
flush : make ( chan struct { } , 1 ) ,
2015-05-23 22:06:07 +00:00
measurementFields : make ( map [ string ] * measurementFields ) ,
2015-06-18 15:07:51 +00:00
2015-07-02 14:05:29 +00:00
MaxWALSize : DefaultMaxWALSize ,
WALFlushInterval : DefaultWALFlushInterval ,
WALPartitionFlushDelay : DefaultWALPartitionFlushDelay ,
2015-06-18 15:07:51 +00:00
LogOutput : os . Stderr ,
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Initialize all partitions of the cache.
s . cache = make ( map [ uint8 ] map [ string ] [ ] [ ] byte )
for i := uint8 ( 0 ) ; i < WALPartitionN ; i ++ {
s . cache [ i ] = make ( map [ string ] [ ] [ ] byte )
}
return s
2015-05-22 20:08:43 +00:00
}
2015-06-08 19:07:05 +00:00
// Path returns the path set on the shard when it was created.
func ( s * Shard ) Path ( ) string { return s . path }
2015-05-22 20:08:43 +00:00
// open initializes and opens the shard's store.
2015-05-26 15:41:15 +00:00
func ( s * Shard ) Open ( ) error {
2015-06-18 15:07:51 +00:00
if err := func ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// Return if the shard is already open
if s . db != nil {
return nil
}
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// Open store on shard.
store , err := bolt . Open ( s . path , 0666 , & bolt . Options { Timeout : 1 * time . Second } )
if err != nil {
return err
}
s . db = store
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// Initialize store.
if err := s . db . Update ( func ( tx * bolt . Tx ) error {
_ , _ = tx . CreateBucketIfNotExists ( [ ] byte ( "series" ) )
_ , _ = tx . CreateBucketIfNotExists ( [ ] byte ( "fields" ) )
_ , _ = tx . CreateBucketIfNotExists ( [ ] byte ( "wal" ) )
return nil
} ) ; err != nil {
return fmt . Errorf ( "init: %s" , err )
}
if err := s . loadMetadataIndex ( ) ; err != nil {
return fmt . Errorf ( "load metadata index: %s" , err )
}
// Initialize logger.
s . logger = log . New ( s . LogOutput , "[shard] " , log . LstdFlags )
// Start flush interval timer.
s . flushTimer = time . NewTimer ( s . WALFlushInterval )
// Start background goroutines.
s . wg . Add ( 1 )
s . closing = make ( chan struct { } )
go s . autoflusher ( s . closing )
2015-05-22 20:08:43 +00:00
return nil
2015-06-18 15:07:51 +00:00
} ( ) ; err != nil {
s . close ( )
return err
}
// Flush on-disk WAL before we return to the caller.
2015-07-07 11:11:01 +00:00
if err := s . Flush ( 0 ) ; err != nil {
2015-06-18 15:07:51 +00:00
return fmt . Errorf ( "flush: %s" , err )
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
return nil
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Close shuts down the shard's store.
2015-05-22 20:08:43 +00:00
func ( s * Shard ) Close ( ) error {
2015-06-04 18:50:32 +00:00
s . mu . Lock ( )
2015-06-18 15:07:51 +00:00
err := s . close ( )
s . mu . Unlock ( )
// Wait for open goroutines to finish.
s . wg . Wait ( )
2015-06-04 18:50:32 +00:00
2015-06-18 15:07:51 +00:00
return err
}
func ( s * Shard ) close ( ) error {
2015-05-22 20:08:43 +00:00
if s . db != nil {
2015-06-18 15:07:51 +00:00
s . db . Close ( )
}
if s . closing != nil {
close ( s . closing )
s . closing = nil
2015-05-22 20:08:43 +00:00
}
return nil
}
2015-05-28 22:02:12 +00:00
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
2015-06-04 18:50:32 +00:00
// into the tsdb package this should be removed. No one outside tsdb should know the underlying store.
2015-05-28 22:02:12 +00:00
func ( s * Shard ) DB ( ) * bolt . DB {
return s . db
}
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
// into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func ( s * Shard ) FieldCodec ( measurementName string ) * FieldCodec {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
m := s . measurementFields [ measurementName ]
if m == nil {
return nil
}
return m . codec
}
2015-05-22 20:08:43 +00:00
// struct to hold information for a field to create on a measurement
type fieldCreate struct {
measurement string
2015-05-23 22:06:07 +00:00
field * field
2015-05-22 20:08:43 +00:00
}
// struct to hold information for a series to create
type seriesCreate struct {
measurement string
series * Series
}
// WritePoints will write the raw data points and any new metadata to the index in the shard
2015-05-22 21:39:55 +00:00
func ( s * Shard ) WritePoints ( points [ ] Point ) error {
2015-05-23 22:06:07 +00:00
seriesToCreate , fieldsToCreate , err := s . validateSeriesAndFields ( points )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// add any new series to the in-memory index
if len ( seriesToCreate ) > 0 {
s . index . mu . Lock ( )
2015-05-22 20:08:43 +00:00
for _ , ss := range seriesToCreate {
2015-07-20 19:59:46 +00:00
s . index . CreateSeriesIndexIfNotExists ( ss . measurement , ss . series )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
s . index . mu . Unlock ( )
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// add any new fields and keep track of what needs to be saved
measurementFieldsToSave , err := s . createFieldsAndMeasurements ( fieldsToCreate )
if err != nil {
return err
2015-05-22 20:08:43 +00:00
}
// make sure all data is encoded before attempting to save to bolt
for _ , p := range points {
// marshal the raw data if it hasn't been marshaled already
2015-05-22 21:39:55 +00:00
if p . Data ( ) == nil {
2015-05-22 20:08:43 +00:00
// this was populated earlier, don't need to validate that it's there.
2015-06-07 09:09:47 +00:00
s . mu . RLock ( )
mf := s . measurementFields [ p . Name ( ) ]
s . mu . RUnlock ( )
2015-06-26 22:47:10 +00:00
// If a measurement is dropped while writes for it are in progress, this could be nil
if mf == nil {
return ErrFieldNotFound
}
2015-06-07 09:09:47 +00:00
data , err := mf . codec . EncodeFields ( p . Fields ( ) )
2015-05-22 20:08:43 +00:00
if err != nil {
return err
}
2015-05-22 21:39:55 +00:00
p . SetData ( data )
2015-05-22 20:08:43 +00:00
}
}
// save to the underlying bolt instance
if err := s . db . Update ( func ( tx * bolt . Tx ) error {
// save any new metadata
if len ( seriesToCreate ) > 0 {
b := tx . Bucket ( [ ] byte ( "series" ) )
for _ , sc := range seriesToCreate {
2015-06-02 23:08:48 +00:00
data , err := sc . series . MarshalBinary ( )
if err != nil {
return err
}
if err := b . Put ( [ ] byte ( sc . series . Key ) , data ) ; err != nil {
2015-05-22 20:08:43 +00:00
return err
}
}
}
2015-05-23 22:06:07 +00:00
if len ( measurementFieldsToSave ) > 0 {
b := tx . Bucket ( [ ] byte ( "fields" ) )
for name , m := range measurementFieldsToSave {
2015-06-02 23:08:48 +00:00
data , err := m . MarshalBinary ( )
if err != nil {
return err
}
if err := b . Put ( [ ] byte ( name ) , data ) ; err != nil {
2015-05-22 20:08:43 +00:00
return err
}
}
}
2015-06-18 15:07:51 +00:00
// Write points to WAL bucket.
wal := tx . Bucket ( [ ] byte ( "wal" ) )
2015-05-22 20:08:43 +00:00
for _ , p := range points {
2015-06-18 15:07:51 +00:00
// Retrieve partition bucket.
key := p . Key ( )
b , err := wal . CreateBucketIfNotExists ( [ ] byte { WALPartition ( key ) } )
2015-05-22 20:08:43 +00:00
if err != nil {
2015-06-18 15:07:51 +00:00
return fmt . Errorf ( "create WAL partition bucket: %s" , err )
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Generate an autoincrementing index for the WAL partition.
id , _ := b . NextSequence ( )
// Append points sequentially to the WAL bucket.
v := marshalWALEntry ( key , p . UnixNano ( ) , p . Data ( ) )
if err := b . Put ( u64tob ( id ) , v ) ; err != nil {
return fmt . Errorf ( "put wal: %s" , err )
}
}
return nil
} ) ; err != nil {
return err
}
// If successful then save points to in-memory cache.
if err := func ( ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-06-29 16:12:15 +00:00
// tracks which in-memory caches need to be resorted
resorts := map [ uint8 ] map [ string ] struct { } { }
2015-06-18 15:07:51 +00:00
for _ , p := range points {
// Generate in-memory cache entry of <timestamp,data>.
key , data := p . Key ( ) , p . Data ( )
v := make ( [ ] byte , 8 + len ( data ) )
binary . BigEndian . PutUint64 ( v [ 0 : 8 ] , uint64 ( p . UnixNano ( ) ) )
copy ( v [ 8 : ] , data )
// Determine if we are appending.
partitionID := WALPartition ( key )
a := s . cache [ partitionID ] [ string ( key ) ]
appending := ( len ( a ) == 0 || bytes . Compare ( a [ len ( a ) - 1 ] , v ) == - 1 )
// Append to cache list.
a = append ( a , v )
2015-06-29 16:12:15 +00:00
// If not appending, keep track of cache lists that need to be resorted.
2015-06-18 15:07:51 +00:00
if ! appending {
2015-06-29 16:12:15 +00:00
series := resorts [ partitionID ]
if series == nil {
series = map [ string ] struct { } { }
resorts [ partitionID ] = series
}
series [ string ( key ) ] = struct { } { }
2015-06-18 15:07:51 +00:00
}
s . cache [ partitionID ] [ string ( key ) ] = a
// Calculate estimated WAL size.
s . walSize += len ( key ) + len ( v )
}
2015-06-29 16:12:15 +00:00
// Sort by timestamp if not appending.
for partitionID , cache := range resorts {
for key , _ := range cache {
sort . Sort ( byteSlices ( s . cache [ partitionID ] [ key ] ) )
}
}
2015-06-18 15:07:51 +00:00
// Check for flush threshold.
s . triggerAutoFlush ( )
return nil
} ( ) ; err != nil {
return err
}
return nil
}
// Flush writes all points from the write ahead log to the index.
2015-07-07 11:11:01 +00:00
func ( s * Shard ) Flush ( partitionFlushDelay time . Duration ) error {
2015-06-18 15:07:51 +00:00
// Retrieve a list of WAL buckets.
var partitionIDs [ ] uint8
if err := s . db . View ( func ( tx * bolt . Tx ) error {
return tx . Bucket ( [ ] byte ( "wal" ) ) . ForEach ( func ( key , _ [ ] byte ) error {
partitionIDs = append ( partitionIDs , uint8 ( key [ 0 ] ) )
return nil
} )
} ) ; err != nil {
return err
}
// Continue flushing until there are no more partition buckets.
for _ , partitionID := range partitionIDs {
if err := s . FlushPartition ( partitionID ) ; err != nil {
return fmt . Errorf ( "flush partition: id=%d, err=%s" , partitionID , err )
}
// Wait momentarily so other threads can process.
2015-07-07 11:11:01 +00:00
time . Sleep ( partitionFlushDelay )
2015-06-18 15:07:51 +00:00
}
s . mu . Lock ( )
defer s . mu . Unlock ( )
// Reset WAL size.
s . walSize = 0
// Reset the timer.
s . flushTimer . Reset ( s . WALFlushInterval )
return nil
}
// FlushPartition flushes a single WAL partition.
func ( s * Shard ) FlushPartition ( partitionID uint8 ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
startTime := time . Now ( )
var pointN int
if err := s . db . Update ( func ( tx * bolt . Tx ) error {
// Retrieve partition bucket. Exit if it doesn't exist.
pb := tx . Bucket ( [ ] byte ( "wal" ) ) . Bucket ( [ ] byte { byte ( partitionID ) } )
if pb == nil {
return ErrWALPartitionNotFound
}
// Iterate over keys in the WAL partition bucket.
c := pb . Cursor ( )
for k , v := c . First ( ) ; k != nil ; k , v = c . Next ( ) {
key , timestamp , data := unmarshalWALEntry ( v )
// Create bucket for entry.
b , err := tx . CreateBucketIfNotExists ( key )
if err != nil {
return fmt . Errorf ( "create bucket: %s" , err )
}
// Write point to bucket.
if err := b . Put ( u64tob ( uint64 ( timestamp ) ) , data ) ; err != nil {
return fmt . Errorf ( "put: %s" , err )
2015-05-22 20:08:43 +00:00
}
2015-06-18 15:07:51 +00:00
// Remove entry in the WAL.
if err := c . Delete ( ) ; err != nil {
return fmt . Errorf ( "delete: %s" , err )
}
pointN ++
2015-05-22 20:08:43 +00:00
}
return nil
} ) ; err != nil {
return err
}
2015-06-18 15:07:51 +00:00
// Reset cache.
s . cache [ partitionID ] = make ( map [ string ] [ ] [ ] byte )
if pointN > 0 {
s . logger . Printf ( "flush %d points in %.3fs" , pointN , time . Since ( startTime ) . Seconds ( ) )
}
2015-05-22 20:08:43 +00:00
return nil
}
2015-06-18 15:07:51 +00:00
// autoflusher waits for notification of a flush and kicks it off in the background.
// This method runs in a separate goroutine.
func ( s * Shard ) autoflusher ( closing chan struct { } ) {
defer s . wg . Done ( )
for {
// Wait for close or flush signal.
select {
case <- closing :
return
case <- s . flushTimer . C :
2015-07-07 11:11:01 +00:00
if err := s . Flush ( s . WALPartitionFlushDelay ) ; err != nil {
2015-06-18 15:07:51 +00:00
s . logger . Printf ( "flush error: %s" , err )
}
case <- s . flush :
2015-07-07 11:11:01 +00:00
if err := s . Flush ( s . WALPartitionFlushDelay ) ; err != nil {
2015-06-18 15:07:51 +00:00
s . logger . Printf ( "flush error: %s" , err )
}
}
}
}
// triggerAutoFlush signals that a flush should occur if the size is above the threshold.
// This function must be called within the context of a lock.
func ( s * Shard ) triggerAutoFlush ( ) {
// Ignore if we haven't reached the threshold.
if s . walSize < s . MaxWALSize {
return
}
// Otherwise send a non-blocking signal.
select {
case s . flush <- struct { } { } :
default :
}
}
2015-05-28 22:02:12 +00:00
func ( s * Shard ) ValidateAggregateFieldsInStatement ( measurementName string , stmt * influxql . SelectStatement ) error {
s . mu . RLock ( )
defer s . mu . RUnlock ( )
validateType := func ( aname , fname string , t influxql . DataType ) error {
if t != influxql . Float && t != influxql . Integer {
return fmt . Errorf ( "aggregate '%s' requires numerical field values. Field '%s' is of type %s" ,
aname , fname , t )
}
return nil
}
m := s . measurementFields [ measurementName ]
if m == nil {
return fmt . Errorf ( "measurement not found: %s" , measurementName )
}
// If a numerical aggregate is requested, ensure it is only performed on numeric data or on a
// nested aggregate on numeric data.
for _ , a := range stmt . FunctionCalls ( ) {
// Check for fields like `derivative(mean(value), 1d)`
var nested * influxql . Call = a
if fn , ok := nested . Args [ 0 ] . ( * influxql . Call ) ; ok {
nested = fn
}
switch lit := nested . Args [ 0 ] . ( type ) {
case * influxql . VarRef :
if influxql . IsNumeric ( nested ) {
f := m . Fields [ lit . Val ]
if err := validateType ( a . Name , f . Name , f . Type ) ; err != nil {
return err
}
}
case * influxql . Distinct :
if nested . Name != "count" {
return fmt . Errorf ( "aggregate call didn't contain a field %s" , a . String ( ) )
}
if influxql . IsNumeric ( nested ) {
f := m . Fields [ lit . Val ]
if err := validateType ( a . Name , f . Name , f . Type ) ; err != nil {
return err
}
}
default :
return fmt . Errorf ( "aggregate call didn't contain a field %s" , a . String ( ) )
}
}
return nil
}
2015-06-02 15:20:20 +00:00
// deleteSeries deletes the buckets and the metadata for the given series keys
func ( s * Shard ) deleteSeries ( keys [ ] string ) error {
2015-06-18 15:07:51 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-06-02 15:20:20 +00:00
if err := s . db . Update ( func ( tx * bolt . Tx ) error {
b := tx . Bucket ( [ ] byte ( "series" ) )
for _ , k := range keys {
if err := b . Delete ( [ ] byte ( k ) ) ; err != nil {
return err
}
2015-06-18 15:07:51 +00:00
if err := tx . DeleteBucket ( [ ] byte ( k ) ) ; err != nil && err != bolt . ErrBucketNotFound {
2015-06-02 15:20:20 +00:00
return err
}
2015-06-18 15:07:51 +00:00
delete ( s . cache [ WALPartition ( [ ] byte ( k ) ) ] , k )
2015-06-02 15:20:20 +00:00
}
return nil
} ) ; err != nil {
return err
}
return nil
}
2015-06-03 15:32:50 +00:00
// deleteMeasurement deletes the measurement field encoding information and all underlying series from the shard
func ( s * Shard ) deleteMeasurement ( name string , seriesKeys [ ] string ) error {
2015-06-18 15:07:51 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2015-06-03 15:32:50 +00:00
if err := s . db . Update ( func ( tx * bolt . Tx ) error {
bm := tx . Bucket ( [ ] byte ( "fields" ) )
if err := bm . Delete ( [ ] byte ( name ) ) ; err != nil {
return err
}
b := tx . Bucket ( [ ] byte ( "series" ) )
for _ , k := range seriesKeys {
if err := b . Delete ( [ ] byte ( k ) ) ; err != nil {
return err
}
2015-06-18 15:07:51 +00:00
if err := tx . DeleteBucket ( [ ] byte ( k ) ) ; err != nil && err != bolt . ErrBucketNotFound {
2015-06-03 15:32:50 +00:00
return err
}
2015-06-18 15:07:51 +00:00
delete ( s . cache [ WALPartition ( [ ] byte ( k ) ) ] , k )
2015-06-03 15:32:50 +00:00
}
return nil
} ) ; err != nil {
return err
}
2015-06-16 18:40:36 +00:00
// Remove entry from shard index.
delete ( s . measurementFields , name )
2015-06-03 15:32:50 +00:00
return nil
}
2015-05-23 22:06:07 +00:00
func ( s * Shard ) createFieldsAndMeasurements ( fieldsToCreate [ ] * fieldCreate ) ( map [ string ] * measurementFields , error ) {
if len ( fieldsToCreate ) == 0 {
return nil , nil
}
s . index . mu . Lock ( )
s . mu . Lock ( )
defer s . index . mu . Unlock ( )
defer s . mu . Unlock ( )
// add fields
measurementsToSave := make ( map [ string ] * measurementFields )
for _ , f := range fieldsToCreate {
m := s . measurementFields [ f . measurement ]
if m == nil {
m = measurementsToSave [ f . measurement ]
if m == nil {
m = & measurementFields { Fields : make ( map [ string ] * field ) }
}
s . measurementFields [ f . measurement ] = m
}
measurementsToSave [ f . measurement ] = m
// add the field to the in memory index
if err := m . createFieldIfNotExists ( f . field . Name , f . field . Type ) ; err != nil {
return nil , err
}
// ensure the measurement is in the index and the field is there
measurement := s . index . createMeasurementIndexIfNotExists ( f . measurement )
2015-06-04 18:50:32 +00:00
measurement . fieldNames [ f . field . Name ] = struct { } { }
2015-05-23 22:06:07 +00:00
}
return measurementsToSave , nil
}
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed
func ( s * Shard ) validateSeriesAndFields ( points [ ] Point ) ( [ ] * seriesCreate , [ ] * fieldCreate , error ) {
var seriesToCreate [ ] * seriesCreate
var fieldsToCreate [ ] * fieldCreate
// get the mutex for the in memory index, which is shared across shards
s . index . mu . RLock ( )
defer s . index . mu . RUnlock ( )
// get the shard mutex for locally defined fields
s . mu . RLock ( )
defer s . mu . RUnlock ( )
for _ , p := range points {
// see if the series should be added to the index
2015-05-28 21:47:52 +00:00
if ss := s . index . series [ string ( p . Key ( ) ) ] ; ss == nil {
series := & Series { Key : string ( p . Key ( ) ) , Tags : p . Tags ( ) }
2015-05-23 22:06:07 +00:00
seriesToCreate = append ( seriesToCreate , & seriesCreate { p . Name ( ) , series } )
}
// see if the field definitions need to be saved to the shard
mf := s . measurementFields [ p . Name ( ) ]
if mf == nil {
for name , value := range p . Fields ( ) {
fieldsToCreate = append ( fieldsToCreate , & fieldCreate { p . Name ( ) , & field { Name : name , Type : influxql . InspectDataType ( value ) } } )
}
continue // skip validation since all fields are new
}
// validate field types and encode data
for name , value := range p . Fields ( ) {
if f := mf . Fields [ name ] ; f != nil {
2015-06-09 21:57:19 +00:00
// Field present in shard metadata, make sure there is no type conflict.
2015-05-23 22:06:07 +00:00
if f . Type != influxql . InspectDataType ( value ) {
2015-06-11 22:14:49 +00:00
return nil , nil , fmt . Errorf ( "field type conflict: input field \"%s\" on measurement \"%s\" is type %T, already exists as type %s" , name , p . Name ( ) , value , f . Type )
2015-05-23 22:06:07 +00:00
}
continue // Field is present, and it's of the same type. Nothing more to do.
}
fieldsToCreate = append ( fieldsToCreate , & fieldCreate { p . Name ( ) , & field { Name : name , Type : influxql . InspectDataType ( value ) } } )
}
}
return seriesToCreate , fieldsToCreate , nil
}
2015-05-22 20:08:43 +00:00
// loadsMetadataIndex loads the shard metadata into memory. This should only be called by Open
func ( s * Shard ) loadMetadataIndex ( ) error {
return s . db . View ( func ( tx * bolt . Tx ) error {
2015-05-23 22:06:07 +00:00
s . index . mu . Lock ( )
defer s . index . mu . Unlock ( )
2015-05-22 20:08:43 +00:00
// load measurement metadata
2015-05-23 22:06:07 +00:00
meta := tx . Bucket ( [ ] byte ( "fields" ) )
2015-05-22 20:08:43 +00:00
c := meta . Cursor ( )
for k , v := c . First ( ) ; k != nil ; k , v = c . Next ( ) {
2015-05-23 22:06:07 +00:00
m := s . index . createMeasurementIndexIfNotExists ( string ( k ) )
mf := & measurementFields { }
2015-06-02 23:08:48 +00:00
if err := mf . UnmarshalBinary ( v ) ; err != nil {
return err
}
2015-05-23 22:06:07 +00:00
for name , _ := range mf . Fields {
2015-06-04 18:50:32 +00:00
m . fieldNames [ name ] = struct { } { }
2015-05-23 22:06:07 +00:00
}
mf . codec = newFieldCodec ( mf . Fields )
2015-05-29 21:15:05 +00:00
s . measurementFields [ string ( k ) ] = mf
2015-05-22 20:08:43 +00:00
}
// load series metadata
meta = tx . Bucket ( [ ] byte ( "series" ) )
c = meta . Cursor ( )
for k , v := c . First ( ) ; k != nil ; k , v = c . Next ( ) {
2015-06-02 23:08:48 +00:00
series := & Series { }
if err := series . UnmarshalBinary ( v ) ; err != nil {
return err
}
2015-07-20 19:59:46 +00:00
s . index . CreateSeriesIndexIfNotExists ( measurementFromSeriesKey ( string ( k ) ) , series )
2015-05-22 20:08:43 +00:00
}
return nil
} )
}
2015-06-18 15:07:51 +00:00
// SeriesCount returns the number of series buckets on the shard.
// This does not include a count from the WAL.
func ( s * Shard ) SeriesCount ( ) ( n int , err error ) {
err = s . db . View ( func ( tx * bolt . Tx ) error {
return tx . ForEach ( func ( _ [ ] byte , _ * bolt . Bucket ) error {
n ++
return nil
} )
} )
// Remove top-level buckets.
n -= topLevelBucketN
return
}
2015-05-23 22:06:07 +00:00
type measurementFields struct {
Fields map [ string ] * field ` json:"fields" `
2015-05-28 22:02:12 +00:00
codec * FieldCodec
2015-05-23 22:06:07 +00:00
}
2015-06-02 23:08:48 +00:00
// MarshalBinary encodes the object to a binary format.
func ( m * measurementFields ) MarshalBinary ( ) ( [ ] byte , error ) {
var pb internal . MeasurementFields
for _ , f := range m . Fields {
id := int32 ( f . ID )
name := f . Name
2015-06-10 20:02:26 +00:00
t := int32 ( f . Type )
2015-06-02 23:08:48 +00:00
pb . Fields = append ( pb . Fields , & internal . Field { ID : & id , Name : & name , Type : & t } )
}
return proto . Marshal ( & pb )
}
2015-06-03 11:36:39 +00:00
// UnmarshalBinary decodes the object from a binary format.
2015-06-02 23:08:48 +00:00
func ( m * measurementFields ) UnmarshalBinary ( buf [ ] byte ) error {
var pb internal . MeasurementFields
if err := proto . Unmarshal ( buf , & pb ) ; err != nil {
return err
}
m . Fields = make ( map [ string ] * field )
for _ , f := range pb . Fields {
m . Fields [ f . GetName ( ) ] = & field { ID : uint8 ( f . GetID ( ) ) , Name : f . GetName ( ) , Type : influxql . DataType ( f . GetType ( ) ) }
}
return nil
}
2015-05-23 22:06:07 +00:00
// createFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func ( m * measurementFields ) createFieldIfNotExists ( name string , typ influxql . DataType ) error {
// Ignore if the field already exists.
if f := m . Fields [ name ] ; f != nil {
if f . Type != typ {
return ErrFieldTypeConflict
}
return nil
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Only 255 fields are allowed. If we go over that then return an error.
if len ( m . Fields ) + 1 > math . MaxUint8 {
return ErrFieldOverflow
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create and append a new field.
f := & field {
ID : uint8 ( len ( m . Fields ) + 1 ) ,
Name : name ,
Type : typ ,
}
m . Fields [ name ] = f
m . codec = newFieldCodec ( m . Fields )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
return nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Field represents a series field.
type field struct {
ID uint8 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
Type influxql . DataType ` json:"type,omitempty" `
}
2015-05-22 20:08:43 +00:00
2015-06-28 06:54:34 +00:00
// FieldCodec provides encoding and decoding functionality for the fields of a given
2015-05-23 22:06:07 +00:00
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
// However, this is here until tx.go and the engine get refactored into tsdb.
type FieldCodec struct {
2015-05-23 22:06:07 +00:00
fieldsByID map [ uint8 ] * field
fieldsByName map [ string ] * field
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
2015-05-28 22:02:12 +00:00
func newFieldCodec ( fields map [ string ] * field ) * FieldCodec {
2015-05-23 22:06:07 +00:00
fieldsByID := make ( map [ uint8 ] * field , len ( fields ) )
fieldsByName := make ( map [ string ] * field , len ( fields ) )
for _ , f := range fields {
fieldsByID [ f . ID ] = f
fieldsByName [ f . Name ] = f
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
return & FieldCodec { fieldsByID : fieldsByID , fieldsByName : fieldsByName }
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) EncodeFields ( values map [ string ] interface { } ) ( [ ] byte , error ) {
2015-05-23 22:06:07 +00:00
// Allocate byte slice
b := make ( [ ] byte , 0 , 10 )
for k , v := range values {
field := f . fieldsByName [ k ]
if field == nil {
panic ( fmt . Sprintf ( "field does not exist for %s" , k ) )
} else if influxql . InspectDataType ( v ) != field . Type {
2015-06-10 21:57:27 +00:00
return nil , fmt . Errorf ( "field \"%s\" is type %T, mapped as type %s" , k , v , field . Type )
2015-05-23 22:06:07 +00:00
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
var buf [ ] byte
switch field . Type {
case influxql . Float :
value := v . ( float64 )
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , math . Float64bits ( value ) )
case influxql . Integer :
var value uint64
switch v . ( type ) {
case int :
value = uint64 ( v . ( int ) )
case int32 :
value = uint64 ( v . ( int32 ) )
case int64 :
value = uint64 ( v . ( int64 ) )
default :
panic ( fmt . Sprintf ( "invalid integer type: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
buf = make ( [ ] byte , 9 )
binary . BigEndian . PutUint64 ( buf [ 1 : 9 ] , value )
case influxql . Boolean :
value := v . ( bool )
// Only 1 byte need for a boolean.
buf = make ( [ ] byte , 2 )
if value {
buf [ 1 ] = byte ( 1 )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
case influxql . String :
value := v . ( string )
if len ( value ) > maxStringLength {
value = value [ : maxStringLength ]
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
buf = make ( [ ] byte , len ( value ) + 3 )
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Set the string length, then copy the string itself.
binary . BigEndian . PutUint16 ( buf [ 1 : 3 ] , uint16 ( len ( value ) ) )
for i , c := range [ ] byte ( value ) {
buf [ i + 3 ] = byte ( c )
2015-05-22 20:08:43 +00:00
}
default :
2015-05-23 22:06:07 +00:00
panic ( fmt . Sprintf ( "unsupported value type during encode fields: %T" , v ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Always set the field ID as the leading byte.
buf [ 0 ] = field . ID
// Append temp buffer to the end.
b = append ( b , buf ... )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return b , nil
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
// TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
func ( f * FieldCodec ) FieldIDByName ( s string ) ( uint8 , error ) {
fi := f . fieldsByName [ s ]
if fi == nil {
2015-06-30 17:29:09 +00:00
return 0 , ErrFieldNotFound
2015-05-28 22:02:12 +00:00
}
return fi . ID , nil
}
2015-05-23 22:06:07 +00:00
// DecodeFields decodes a byte slice into a set of field ids and values.
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) DecodeFields ( b [ ] byte ) ( map [ uint8 ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
if len ( b ) == 0 {
return nil , nil
}
2015-05-22 20:08:43 +00:00
2015-05-23 22:06:07 +00:00
// Create a map to hold the decoded data.
values := make ( map [ uint8 ] interface { } , 0 )
for {
if len ( b ) < 1 {
// No more bytes.
break
}
// First byte is the field identifier.
fieldID := b [ 0 ]
field := f . fieldsByID [ fieldID ]
if field == nil {
// See note in DecodeByID() regarding field-mapping failures.
return nil , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
// Move bytes forward.
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : size + 3 ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode fields: %T" , f . fieldsByID [ fieldID ] ) )
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
values [ fieldID ] = value
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
return values , nil
2015-05-22 20:08:43 +00:00
}
2015-05-23 22:06:07 +00:00
// DecodeFieldsWithNames decodes a byte slice into a set of field names and values
2015-05-28 22:02:12 +00:00
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeFieldsWithNames ( b [ ] byte ) ( map [ string ] interface { } , error ) {
2015-05-23 22:06:07 +00:00
fields , err := f . DecodeFields ( b )
if err != nil {
return nil , err
}
m := make ( map [ string ] interface { } )
for id , v := range fields {
field := f . fieldsByID [ id ]
if field != nil {
m [ field . Name ] = v
2015-05-22 20:08:43 +00:00
}
}
2015-05-23 22:06:07 +00:00
return m , nil
2015-05-22 20:08:43 +00:00
}
2015-05-28 22:02:12 +00:00
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
// TODO: shouldn't be exported. refactor engine
func ( f * FieldCodec ) DecodeByID ( targetID uint8 , b [ ] byte ) ( interface { } , error ) {
if len ( b ) == 0 {
return 0 , ErrFieldNotFound
}
for {
if len ( b ) < 1 {
// No more bytes.
break
}
field , ok := f . fieldsByID [ b [ 0 ] ]
if ! ok {
// This can happen, though is very unlikely. If this node receives encoded data, to be written
// to disk, and is queried for that data before its metastore is updated, there will be no field
// mapping for the data during decode. All this can happen because data is encoded by the node
// that first received the write request, not the node that actually writes the data to disk.
// So if this happens, the read must be aborted.
return 0 , ErrFieldUnmappedID
}
var value interface { }
switch field . Type {
case influxql . Float :
// Move bytes forward.
value = math . Float64frombits ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Integer :
value = int64 ( binary . BigEndian . Uint64 ( b [ 1 : 9 ] ) )
b = b [ 9 : ]
case influxql . Boolean :
if b [ 1 ] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b [ 2 : ]
case influxql . String :
size := binary . BigEndian . Uint16 ( b [ 1 : 3 ] )
value = string ( b [ 3 : 3 + size ] )
// Move bytes forward.
b = b [ size + 3 : ]
default :
panic ( fmt . Sprintf ( "unsupported value type during decode by id: %T" , field . Type ) )
}
if field . ID == targetID {
return value , nil
}
}
return 0 , ErrFieldNotFound
}
2015-06-30 17:29:09 +00:00
// DecodeByName scans a byte slice for a field with the given name, converts it to its
// expected type, and return that value.
func ( f * FieldCodec ) DecodeByName ( name string , b [ ] byte ) ( interface { } , error ) {
2015-07-09 18:24:06 +00:00
fi := f . fieldByName ( name )
if fi == nil {
2015-06-30 18:17:04 +00:00
return 0 , ErrFieldNotFound
2015-06-30 17:29:09 +00:00
}
2015-07-09 18:24:06 +00:00
return f . DecodeByID ( fi . ID , b )
2015-06-30 17:29:09 +00:00
}
2015-05-23 22:06:07 +00:00
// FieldByName returns the field by its name. It will return a nil if not found
2015-05-28 22:02:12 +00:00
func ( f * FieldCodec ) fieldByName ( name string ) * field {
2015-05-23 22:06:07 +00:00
return f . fieldsByName [ name ]
2015-05-22 20:08:43 +00:00
}
// mustMarshal encodes a value to JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid marshal will cause corruption and a panic is appropriate.
func mustMarshalJSON ( v interface { } ) [ ] byte {
b , err := json . Marshal ( v )
if err != nil {
panic ( "marshal: " + err . Error ( ) )
}
return b
}
// mustUnmarshalJSON decodes a value from JSON.
// This will panic if an error occurs. This should only be used internally when
// an invalid unmarshal will cause corruption and a panic is appropriate.
func mustUnmarshalJSON ( b [ ] byte , v interface { } ) {
if err := json . Unmarshal ( b , v ) ; err != nil {
panic ( "unmarshal: " + err . Error ( ) )
}
}
// u64tob converts a uint64 into an 8-byte slice.
func u64tob ( v uint64 ) [ ] byte {
b := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( b , v )
return b
}
2015-06-18 15:07:51 +00:00
// marshalWALEntry encodes point data into a single byte slice.
//
// The format of the byte slice is:
//
// uint64 timestamp
// uint32 key length
// []byte key
// []byte data
//
func marshalWALEntry ( key [ ] byte , timestamp int64 , data [ ] byte ) [ ] byte {
v := make ( [ ] byte , 8 + 4 , 8 + 4 + len ( key ) + len ( data ) )
binary . BigEndian . PutUint64 ( v [ 0 : 8 ] , uint64 ( timestamp ) )
binary . BigEndian . PutUint32 ( v [ 8 : 12 ] , uint32 ( len ( key ) ) )
v = append ( v , key ... )
v = append ( v , data ... )
return v
}
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// unmarshalWALEntry decodes a WAL entry into it's separate parts.
// Returned byte slices point to the original slice.
func unmarshalWALEntry ( v [ ] byte ) ( key [ ] byte , timestamp int64 , data [ ] byte ) {
keyLen := binary . BigEndian . Uint32 ( v [ 8 : 12 ] )
key = v [ 12 : 12 + keyLen ]
timestamp = int64 ( binary . BigEndian . Uint64 ( v [ 0 : 8 ] ) )
data = v [ 12 + keyLen : ]
return
}
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// marshalCacheEntry encodes the timestamp and data to a single byte slice.
//
// The format of the byte slice is:
//
// uint64 timestamp
// []byte data
//
func marshalCacheEntry ( timestamp int64 , data [ ] byte ) [ ] byte {
buf := make ( [ ] byte , 8 , 8 + len ( data ) )
binary . BigEndian . PutUint64 ( buf [ 0 : 8 ] , uint64 ( timestamp ) )
return append ( buf , data ... )
}
2015-05-22 20:08:43 +00:00
2015-06-18 15:07:51 +00:00
// unmarshalCacheEntry returns the timestamp and data from an encoded byte slice.
func unmarshalCacheEntry ( buf [ ] byte ) ( timestamp int64 , data [ ] byte ) {
timestamp = int64 ( binary . BigEndian . Uint64 ( buf [ 0 : 8 ] ) )
data = buf [ 8 : ]
return
}
// byteSlices represents a sortable slice of byte slices.
type byteSlices [ ] [ ] byte
func ( a byteSlices ) Len ( ) int { return len ( a ) }
func ( a byteSlices ) Less ( i , j int ) bool { return bytes . Compare ( a [ i ] , a [ j ] ) == - 1 }
func ( a byteSlices ) Swap ( i , j int ) { a [ i ] , a [ j ] = a [ j ] , a [ i ] }
// shardCursor provides ordered iteration across a Bolt bucket and shard cache.
type shardCursor struct {
// Bolt cursor and readahead buffer.
cursor * bolt . Cursor
buf struct {
key , value [ ] byte
}
// Cache and current cache index.
cache [ ] [ ] byte
index int
}
// Seek moves the cursor to a position and returns the closest key/value pair.
func ( sc * shardCursor ) Seek ( seek [ ] byte ) ( key , value [ ] byte ) {
// Seek bolt cursor.
if sc . cursor != nil {
sc . buf . key , sc . buf . value = sc . cursor . Seek ( seek )
}
// Seek cache index.
sc . index = sort . Search ( len ( sc . cache ) , func ( i int ) bool {
return bytes . Compare ( sc . cache [ i ] [ 0 : 8 ] , seek ) != - 1
} )
return sc . read ( )
}
// Next returns the next key/value pair from the cursor.
func ( sc * shardCursor ) Next ( ) ( key , value [ ] byte ) {
// Read next bolt key/value if not bufferred.
if sc . buf . key == nil && sc . cursor != nil {
sc . buf . key , sc . buf . value = sc . cursor . Next ( )
}
return sc . read ( )
}
// read returns the next key/value in the cursor buffer or cache.
func ( sc * shardCursor ) read ( ) ( key , value [ ] byte ) {
// If neither a buffer or cache exists then return nil.
if sc . buf . key == nil && sc . index >= len ( sc . cache ) {
return nil , nil
}
// Use the buffer if it exists and there's no cache or if it is lower than the cache.
if sc . buf . key != nil && ( sc . index >= len ( sc . cache ) || bytes . Compare ( sc . buf . key , sc . cache [ sc . index ] [ 0 : 8 ] ) == - 1 ) {
key , value = sc . buf . key , sc . buf . value
sc . buf . key , sc . buf . value = nil , nil
return
}
// Otherwise read from the cache.
// Continue skipping ahead through duplicate keys in the cache list.
for {
// Read the current cache key/value pair.
key , value = sc . cache [ sc . index ] [ 0 : 8 ] , sc . cache [ sc . index ] [ 8 : ]
sc . index ++
// Exit loop if we're at the end of the cache or the next key is different.
if sc . index >= len ( sc . cache ) || ! bytes . Equal ( key , sc . cache [ sc . index ] [ 0 : 8 ] ) {
break
}
}
return
}
// WALPartitionN is the number of partitions in the write ahead log.
const WALPartitionN = 8
// WALPartition returns the partition number that key belongs to.
func WALPartition ( key [ ] byte ) uint8 {
h := fnv . New64a ( )
h . Write ( key )
return uint8 ( h . Sum64 ( ) % WALPartitionN )
}