influxdb/tsdb/shard.go

1521 lines
38 KiB
Go
Raw Normal View History

package tsdb
import (
"bytes"
"errors"
"fmt"
"io"
2016-09-14 13:55:44 +00:00
"math"
"os"
2016-05-18 14:34:06 +00:00
"path/filepath"
"regexp"
2016-02-04 15:12:52 +00:00
"sort"
"strings"
"sync"
"sync/atomic"
"time"
2015-11-04 21:06:06 +00:00
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
2017-03-15 12:16:28 +00:00
"github.com/influxdata/influxdb/pkg/estimator"
2016-04-05 12:54:11 +00:00
internal "github.com/influxdata/influxdb/tsdb/internal"
"github.com/uber-go/zap"
)
2016-05-18 15:04:50 +00:00
// monitorStatInterval is the interval at which the shard is inspected
// for the purpose of determining certain monitoring statistics.
const monitorStatInterval = 30 * time.Second
2015-09-04 22:43:57 +00:00
const (
statWriteReq = "writeReq"
statWriteReqOK = "writeReqOk"
statWriteReqErr = "writeReqErr"
statSeriesCreate = "seriesCreate"
statFieldsCreate = "fieldsCreate"
statWritePointsErr = "writePointsErr"
statWritePointsDropped = "writePointsDropped"
statWritePointsOK = "writePointsOk"
statWriteBytes = "writeBytes"
statDiskBytes = "diskBytes"
2015-09-04 22:43:57 +00:00
)
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors.New("field overflow")
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors.New("field type conflict")
// ErrFieldNotFound is returned when a field cannot be found.
ErrFieldNotFound = errors.New("field not found")
// ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID
// there is no mapping for.
ErrFieldUnmappedID = errors.New("field ID not mapped")
2016-03-29 22:32:34 +00:00
// ErrEngineClosed is returned when a caller attempts indirectly to
// access the shard's underlying engine.
ErrEngineClosed = errors.New("engine is closed")
// ErrShardDisabled is returned when a the shard is not available for
// queries or writes.
ErrShardDisabled = errors.New("shard is disabled")
)
var (
// Static objects to prevent small allocs.
timeBytes = []byte("time")
)
// A ShardError implements the error interface, and contains extra
// context about the shard that generated the error.
type ShardError struct {
id uint64
Err error
}
// NewShardError returns a new ShardError.
func NewShardError(id uint64, err error) error {
if err == nil {
return nil
}
return ShardError{id: id, Err: err}
}
2016-12-31 05:12:37 +00:00
// Error returns the string representation of the error, to satisfy the error interface.
func (e ShardError) Error() string {
return fmt.Sprintf("[shard %d] %s", e.id, e.Err)
}
2016-11-17 13:24:32 +00:00
// PartialWriteError indicates a write request could only write a portion of the
// requested values.
type PartialWriteError struct {
Reason string
Dropped int
2017-03-24 15:48:10 +00:00
// The set of series keys that were dropped. Can be nil.
DroppedKeys map[string]struct{}
}
func (e PartialWriteError) Error() string {
return fmt.Sprintf("partial write: %s dropped=%d", e.Reason, e.Dropped)
}
// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
path string
walPath string
id uint64
database string
retentionPolicy string
2016-11-16 18:57:55 +00:00
options EngineOptions
2016-11-29 12:26:52 +00:00
mu sync.RWMutex
engine Engine
index Index
2016-05-18 14:34:06 +00:00
closing chan struct{}
enabled bool
2015-09-04 22:43:57 +00:00
// expvar-based stats.
stats *ShardStatistics
defaultTags models.StatisticTags
2015-09-04 22:43:57 +00:00
baseLogger zap.Logger
logger zap.Logger
EnableOnOpen bool
}
2016-11-15 16:20:00 +00:00
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
2016-11-16 18:57:55 +00:00
func NewShard(id uint64, path string, walPath string, opt EngineOptions) *Shard {
2016-09-14 13:55:44 +00:00
db, rp := decodeStorePath(path)
logger := zap.New(zap.NullEncoder())
2016-09-14 13:55:44 +00:00
s := &Shard{
2016-11-16 18:57:55 +00:00
id: id,
path: path,
walPath: walPath,
options: opt,
closing: make(chan struct{}),
stats: &ShardStatistics{},
defaultTags: models.StatisticTags{
"path": path,
"walPath": walPath,
"id": fmt.Sprintf("%d", id),
"database": db,
"retentionPolicy": rp,
2016-11-16 18:57:55 +00:00
"engine": opt.EngineVersion,
},
database: db,
retentionPolicy: rp,
logger: logger,
baseLogger: logger,
EnableOnOpen: true,
}
return s
}
2016-12-31 05:12:37 +00:00
// WithLogger sets the logger on the shard.
func (s *Shard) WithLogger(log zap.Logger) {
s.baseLogger = log
if err := s.ready(); err == nil {
s.engine.WithLogger(s.baseLogger)
}
s.logger = s.baseLogger.With(zap.String("service", "shard"))
}
// SetEnabled enables the shard for queries and write. When disabled, all
// writes and queries return an error and compactions are stopped for the shard.
func (s *Shard) SetEnabled(enabled bool) {
s.mu.Lock()
// Prevent writes and queries
s.enabled = enabled
if s.engine != nil {
// Disable background compactions and snapshotting
s.engine.SetEnabled(enabled)
}
s.mu.Unlock()
}
// ShardStatistics maintains statistics for a shard.
type ShardStatistics struct {
2016-10-11 16:45:33 +00:00
WriteReq int64
WriteReqOK int64
WriteReqErr int64
FieldsCreated int64
WritePointsErr int64
WritePointsDropped int64
2016-10-11 16:45:33 +00:00
WritePointsOK int64
BytesWritten int64
DiskBytes int64
}
// Statistics returns statistics for periodic monitoring.
func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
if err := s.ready(); err != nil {
return nil
}
2016-11-29 12:26:52 +00:00
// TODO(edd): Should statSeriesCreate be the current number of series in the
// shard, or the total number of series ever created?
sSketch, tSketch, err := s.engine.SeriesSketches()
seriesN := int64(sSketch.Count() - tSketch.Count())
2016-09-21 15:04:37 +00:00
if err != nil {
2017-01-06 16:31:25 +00:00
s.logger.Error("cannot compute series sketch", zap.Error(err))
2016-09-21 15:04:37 +00:00
seriesN = 0
}
2016-09-09 22:16:53 +00:00
tags = s.defaultTags.Merge(tags)
statistics := []models.Statistic{{
Name: "shard",
2016-09-09 22:16:53 +00:00
Tags: tags,
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&s.stats.WriteReq),
statWriteReqOK: atomic.LoadInt64(&s.stats.WriteReqOK),
statWriteReqErr: atomic.LoadInt64(&s.stats.WriteReqErr),
2016-11-29 12:26:52 +00:00
statSeriesCreate: seriesN,
statFieldsCreate: atomic.LoadInt64(&s.stats.FieldsCreated),
statWritePointsErr: atomic.LoadInt64(&s.stats.WritePointsErr),
statWritePointsDropped: atomic.LoadInt64(&s.stats.WritePointsDropped),
statWritePointsOK: atomic.LoadInt64(&s.stats.WritePointsOK),
statWriteBytes: atomic.LoadInt64(&s.stats.BytesWritten),
statDiskBytes: atomic.LoadInt64(&s.stats.DiskBytes),
},
}}
2016-09-01 12:40:16 +00:00
// Add the index and engine statistics.
statistics = append(statistics, s.engine.Statistics(tags)...)
return statistics
}
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.path }
2016-02-10 20:04:18 +00:00
// Open initializes and opens the shard's store.
func (s *Shard) Open() error {
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
// Return if the shard is already open
2015-07-22 14:53:20 +00:00
if s.engine != nil {
return nil
}
2016-11-15 16:20:00 +00:00
// Initialize underlying index.
ipath := filepath.Join(s.path, "index")
2016-11-16 18:57:55 +00:00
idx, err := NewIndex(s.id, ipath, s.options)
2016-11-15 16:20:00 +00:00
if err != nil {
return err
}
// Open index.
if err := idx.Open(); err != nil {
return err
}
s.index = idx
2015-07-22 14:53:20 +00:00
// Initialize underlying engine.
2016-11-16 18:57:55 +00:00
e, err := NewEngine(s.id, idx, s.path, s.walPath, s.options)
if err != nil {
return err
}
2015-07-22 14:53:20 +00:00
// Set log output on the engine.
e.WithLogger(s.baseLogger)
// Disable compactions while loading the index
e.SetEnabled(false)
2015-07-22 14:53:20 +00:00
// Open engine.
if err := e.Open(); err != nil {
return err
}
2016-11-16 18:57:55 +00:00
2017-02-09 17:59:14 +00:00
// Load metadata index for the inmem index only.
2016-11-16 18:57:55 +00:00
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
return err
}
2017-02-09 17:59:14 +00:00
s.engine = e
2016-11-16 18:57:55 +00:00
2016-10-05 05:52:49 +00:00
go s.monitor()
2016-05-18 14:34:06 +00:00
return nil
}(); err != nil {
2017-03-10 14:45:26 +00:00
s.close(true)
return NewShardError(s.id, err)
}
if s.EnableOnOpen {
// enable writes, queries and compactions
s.SetEnabled(true)
}
return nil
}
// Close shuts down the shard's store.
func (s *Shard) Close() error {
s.mu.Lock()
2015-07-22 14:53:20 +00:00
defer s.mu.Unlock()
2017-03-10 14:45:26 +00:00
return s.close(true)
}
2017-03-10 14:45:26 +00:00
// CloseFast closes the shard without cleaning up the shard ID or any of the
// shard's series keys from the index it belongs to.
//
// CloseFast can be called when the entire index is being removed, e.g., when
// the database the shard belongs to is being dropped.
func (s *Shard) CloseFast() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.close(false)
}
// close closes the shard an removes reference to the shard from associated
// indexes, unless clean is false.
func (s *Shard) close(clean bool) error {
2016-02-02 15:33:20 +00:00
if s.engine == nil {
return nil
}
2016-02-02 15:33:20 +00:00
2016-05-18 14:34:06 +00:00
// Close the closing channel at most once.
select {
case <-s.closing:
default:
close(s.closing)
}
2017-03-10 14:45:26 +00:00
if clean {
// Don't leak our shard ID and series keys in the index
s.UnloadIndex()
}
2016-12-19 16:57:05 +00:00
2016-02-02 15:33:20 +00:00
err := s.engine.Close()
if err == nil {
s.engine = nil
}
2016-11-15 16:20:00 +00:00
if e := s.index.Close(); e == nil {
s.index = nil
}
2016-02-02 15:33:20 +00:00
return err
}
// ready determines if the Shard is ready for queries or writes.
// It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled
func (s *Shard) ready() error {
var err error
2016-03-29 22:32:34 +00:00
s.mu.RLock()
if s.engine == nil {
err = ErrEngineClosed
} else if !s.enabled {
err = ErrShardDisabled
}
2016-03-29 22:32:34 +00:00
s.mu.RUnlock()
return err
2016-03-29 22:32:34 +00:00
}
2016-12-31 05:12:37 +00:00
// LastModified returns the time when this shard was last modified.
func (s *Shard) LastModified() time.Time {
if err := s.ready(); err != nil {
return time.Time{}
}
return s.engine.LastModified()
}
2016-12-19 16:57:05 +00:00
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
s.index.RemoveShard(s.id)
}
2016-11-16 18:57:55 +00:00
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
2016-05-18 14:34:06 +00:00
var size int64
2016-05-18 15:04:50 +00:00
err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
2016-05-18 15:04:50 +00:00
if !fi.IsDir() {
size += fi.Size()
2016-05-18 14:34:06 +00:00
}
return err
})
if err != nil {
return 0, err
}
2016-05-18 14:34:06 +00:00
2016-05-18 15:04:50 +00:00
err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
2016-05-18 15:04:50 +00:00
if !fi.IsDir() {
size += fi.Size()
2016-05-18 14:34:06 +00:00
}
return err
})
return size, err
}
2016-12-31 05:12:37 +00:00
// FieldCreate holds information for a field to create on a measurement.
2015-07-22 14:53:20 +00:00
type FieldCreate struct {
Measurement string
2015-07-23 16:33:37 +00:00
Field *Field
}
2016-12-31 05:12:37 +00:00
// SeriesCreate holds information for a series to create.
2015-07-22 14:53:20 +00:00
type SeriesCreate struct {
Measurement string
Series *Series
}
2016-12-31 05:12:37 +00:00
// WritePoints will write the raw data points and any new metadata to the index in the shard.
func (s *Shard) WritePoints(points []models.Point) error {
if err := s.ready(); err != nil {
return err
2016-03-29 22:32:34 +00:00
}
var writeError error
s.mu.RLock()
defer s.mu.RUnlock()
atomic.AddInt64(&s.stats.WriteReq, 1)
2015-09-04 22:43:57 +00:00
points, fieldsToCreate, err := s.validateSeriesAndFields(points)
if err != nil {
if _, ok := err.(PartialWriteError); !ok {
return err
}
// There was a partial write (points dropped), hold onto the error to return
// to the caller, but continue on writing the remaining points.
writeError = err
}
atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))
// add any new fields and keep track of what needs to be saved
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
}
2015-07-22 14:53:20 +00:00
// Write to the engine.
if err := s.engine.WritePoints(points); err != nil {
atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqErr, 1)
2015-07-22 14:53:20 +00:00
return fmt.Errorf("engine: %s", err)
}
atomic.AddInt64(&s.stats.WritePointsOK, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqOK, 1)
return writeError
}
2015-07-22 14:53:20 +00:00
// DeleteSeries deletes a list of series.
2016-09-29 09:39:13 +00:00
func (s *Shard) DeleteSeries(seriesKeys [][]byte) error {
2016-09-14 13:55:44 +00:00
return s.DeleteSeriesRange(seriesKeys, math.MinInt64, math.MaxInt64)
2016-04-27 19:01:07 +00:00
}
2016-09-29 09:39:13 +00:00
// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (s *Shard) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
if err := s.ready(); err != nil {
return err
2016-04-27 19:01:07 +00:00
}
if err := s.engine.DeleteSeriesRange(seriesKeys, min, max); err != nil {
return err
}
return nil
}
2015-07-22 14:53:20 +00:00
// DeleteMeasurement deletes a measurement and all underlying series.
2016-09-29 09:39:13 +00:00
func (s *Shard) DeleteMeasurement(name []byte) error {
if err := s.ready(); err != nil {
return err
2016-03-29 22:32:34 +00:00
}
2016-11-16 18:57:55 +00:00
return s.engine.DeleteMeasurement(name)
}
2016-12-16 14:21:42 +00:00
// SeriesN returns the unique number of series in the shard.
func (s *Shard) SeriesN() int64 {
return s.engine.SeriesN()
}
2017-03-15 12:16:28 +00:00
// SeriesSketches returns the series sketches for the shard.
func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
s.mu.RLock()
defer s.mu.RUnlock()
2017-03-20 21:37:17 +00:00
if s.engine == nil {
return nil, nil, nil
}
2017-03-15 12:16:28 +00:00
return s.engine.SeriesSketches()
}
// MeasurementsSketches returns the measurement sketches for the shard.
func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
s.mu.RLock()
defer s.mu.RUnlock()
2017-03-20 21:37:17 +00:00
if s.engine == nil {
return nil, nil, nil
}
2017-03-15 12:16:28 +00:00
return s.engine.MeasurementsSketches()
}
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
if len(fieldsToCreate) == 0 {
return nil
}
// add fields
for _, f := range fieldsToCreate {
2016-11-15 16:20:00 +00:00
mf := s.engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists(f.Field.Name, f.Field.Type, false); err != nil {
return err
}
2016-11-16 18:57:55 +00:00
s.index.SetFieldName(f.Measurement, f.Field.Name)
}
return nil
}
2016-12-31 05:12:37 +00:00
// validateSeriesAndFields checks which series and fields are new and whose metadata should be saved and indexed.
func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, []*FieldCreate, error) {
var (
fieldsToCreate []*FieldCreate
err error
2017-02-06 18:14:13 +00:00
dropped int
reason string // only first error reason is set unless returned from CreateSeriesListIfNotExists
)
2016-11-11 16:25:53 +00:00
// Create all series against the index in bulk.
keys := make([][]byte, len(points))
names := make([][]byte, len(points))
tagsSlice := make([]models.Tags, len(points))
2017-04-14 20:36:54 +00:00
// Drop any series w/ a "time" tag, these are illegal
var j int
for i, p := range points {
tags := p.Tags()
if v := tags.Get(timeBytes); v != nil {
dropped++
if reason == "" {
2017-04-28 18:21:57 +00:00
reason = fmt.Sprintf("invalid tag key: input tag \"%s\" on measurement \"%s\" is invalid", "time", p.Name())
}
continue
}
2017-04-14 20:36:54 +00:00
keys[j] = p.Key()
names[j] = []byte(p.Name())
tagsSlice[j] = tags
points[j] = points[i]
j++
}
2017-04-14 20:36:54 +00:00
points, keys, names, tagsSlice = points[:j], keys[:j], names[:j], tagsSlice[:j]
2017-02-06 18:14:13 +00:00
// Add new series. Check for partial writes.
2017-03-24 15:48:10 +00:00
var droppedKeys map[string]struct{}
if err := s.engine.CreateSeriesListIfNotExists(keys, names, tagsSlice); err != nil {
2017-02-06 18:14:13 +00:00
switch err := err.(type) {
case *PartialWriteError:
reason = err.Reason
dropped += err.Dropped
2017-03-24 15:48:10 +00:00
droppedKeys = err.DroppedKeys
2017-02-06 18:14:13 +00:00
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return nil, nil, err
}
}
// get the shard mutex for locally defined fields
2017-02-06 18:14:13 +00:00
n := 0
// mfCache is a local cache of MeasurementFields to reduce lock contention when validating
// field types.
mfCache := make(map[string]*MeasurementFields, 16)
for i, p := range points {
var skip bool
var validField bool
iter := p.FieldIterator()
for iter.Next() {
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
validField = true
break
}
if !validField {
dropped++
if reason == "" {
2017-04-28 18:21:57 +00:00
reason = fmt.Sprintf("invalid field name: input field \"%s\" on measurement \"%s\" is invalid", "time", p.Name())
}
continue
}
iter.Reset()
2017-03-24 15:48:10 +00:00
// Skip points if keys have been dropped.
// The drop count has already been incremented during series creation.
if droppedKeys != nil {
if _, ok := droppedKeys[string(keys[i])]; ok {
continue
}
2016-11-29 12:26:52 +00:00
}
name := p.Name()
// see if the field definitions need to be saved to the shard
mf := mfCache[name]
if mf == nil {
mf = s.engine.MeasurementFields(name).Clone()
mfCache[name] = mf
}
iter.Reset()
// validate field types and encode data
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
var fieldType influxql.DataType
switch iter.Type() {
case models.Float:
fieldType = influxql.Float
case models.Integer:
fieldType = influxql.Integer
case models.Boolean:
fieldType = influxql.Boolean
case models.String:
fieldType = influxql.String
default:
continue
}
if f := mf.FieldBytes(iter.FieldKey()); f != nil {
// Field present in shard metadata, make sure there is no type conflict.
if f.Type != fieldType {
atomic.AddInt64(&s.stats.WritePointsDropped, 1)
dropped++
if reason == "" {
reason = fmt.Sprintf("%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", ErrFieldTypeConflict, iter.FieldKey(), name, fieldType, f.Type)
}
skip = true
} else {
continue // Field is present, and it's of the same type. Nothing more to do.
}
}
if !skip {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{p.Name(), &Field{Name: string(iter.FieldKey()), Type: fieldType}})
}
}
if !skip {
points[n] = points[i]
n++
}
}
points = points[:n]
if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped}
}
return points, fieldsToCreate, err
}
2016-12-05 17:51:06 +00:00
// MeasurementNamesByExpr returns names of measurements matching the condition.
// If cond is nil then all measurement names are returned.
func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error) {
return s.engine.MeasurementNamesByExpr(cond)
2016-09-01 12:40:16 +00:00
}
2016-11-16 18:57:55 +00:00
// MeasurementFields returns fields for a measurement.
func (s *Shard) MeasurementFields(name []byte) *MeasurementFields {
return s.engine.MeasurementFields(string(name))
}
2017-04-14 20:36:54 +00:00
func (s *Shard) MeasurementExists(name []byte) (bool, error) {
return s.engine.MeasurementExists(name)
2016-03-29 21:55:09 +00:00
}
// WriteTo writes the shard's data to w.
2015-09-04 22:43:57 +00:00
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
if err := s.ready(); err != nil {
return 0, err
2016-03-29 22:32:34 +00:00
}
2015-09-04 22:43:57 +00:00
n, err := s.engine.WriteTo(w)
atomic.AddInt64(&s.stats.BytesWritten, int64(n))
2015-09-04 22:43:57 +00:00
return n, err
}
2015-11-04 21:06:06 +00:00
// CreateIterator returns an iterator for the data in the shard.
func (s *Shard) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error) {
if err := s.ready(); err != nil {
return nil, err
2016-03-29 22:32:34 +00:00
}
if strings.HasPrefix(measurement, "_") {
if itr, ok, err := s.createSystemIterator(measurement, opt); ok {
return itr, err
}
// Unknown system source so pass this to the engine.
}
return s.engine.CreateIterator(measurement, opt)
2015-11-04 21:06:06 +00:00
}
// createSystemIterator returns an iterator for a system source.
func (s *Shard) createSystemIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, bool, error) {
switch measurement {
case "_fieldKeys":
itr, err := NewFieldKeysIterator(s, opt)
return itr, true, err
2016-02-25 21:28:45 +00:00
case "_series":
itr, err := s.createSeriesIterator(opt)
return itr, true, err
case "_tagKeys":
itr, err := NewTagKeysIterator(s, opt)
return itr, true, err
}
return nil, false, nil
}
2016-11-28 16:59:36 +00:00
// createSeriesIterator returns a new instance of SeriesIterator.
func (s *Shard) createSeriesIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}
return s.engine.SeriesPointIterator(opt)
}
2015-11-04 21:06:06 +00:00
// FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
if err := s.ready(); err != nil {
return nil, nil, err
}
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
for _, name := range measurements {
// Handle system sources.
if strings.HasPrefix(name, "_") {
var keys []string
switch name {
case "_fieldKeys":
keys = []string{"fieldKey", "fieldType"}
case "_series":
keys = []string{"key"}
case "_tagKeys":
keys = []string{"tagKey"}
}
if len(keys) > 0 {
for _, k := range keys {
if _, ok := fields[k]; !ok || influxql.String < fields[k] {
fields[k] = influxql.String
}
}
continue
}
// Unknown system source so default to looking for a measurement.
}
// Retrieve measurement.
if exists, err := s.engine.MeasurementExists([]byte(name)); err != nil {
return nil, nil, err
} else if !exists {
continue
}
2015-11-04 21:06:06 +00:00
// Append fields and dimensions.
mf := s.engine.MeasurementFields(name)
if mf != nil {
for k, typ := range mf.FieldSet() {
if _, ok := fields[k]; !ok || typ < fields[k] {
fields[k] = typ
}
2015-11-04 21:06:06 +00:00
}
}
2016-12-05 17:51:06 +00:00
if err := s.engine.ForEachMeasurementTagKey([]byte(name), func(key []byte) error {
dimensions[string(key)] = struct{}{}
return nil
}); err != nil {
return nil, nil, err
2015-11-04 21:06:06 +00:00
}
}
return fields, dimensions, nil
2015-11-04 21:06:06 +00:00
}
func (s *Shard) MeasurementsByRegex(re *regexp.Regexp) []string {
a, _ := s.engine.MeasurementNamesByRegex(re)
other := make([]string, len(a))
for i := range a {
other[i] = string(a[i])
}
return other
}
// MapType returns the data type for the field within the measurement.
func (s *Shard) MapType(measurement, field string) influxql.DataType {
// Process system measurements.
if strings.HasPrefix(measurement, "_") {
switch measurement {
case "_fieldKeys":
if field == "fieldKey" || field == "fieldType" {
return influxql.String
}
return influxql.Unknown
case "_series":
if field == "key" {
return influxql.String
2015-11-04 21:06:06 +00:00
}
return influxql.Unknown
case "_tagKeys":
if field == "tagKey" {
return influxql.String
2015-11-04 21:06:06 +00:00
}
return influxql.Unknown
2015-11-04 21:06:06 +00:00
}
// Unknown system source so default to looking for a measurement.
2015-11-04 21:06:06 +00:00
}
if exists, _ := s.engine.MeasurementExists([]byte(measurement)); !exists {
return influxql.Unknown
}
mf := s.engine.MeasurementFields(measurement)
if mf != nil {
f := mf.Field(field)
if f != nil {
return f.Type
}
}
if exists, _ := s.engine.HasTagKey([]byte(measurement), []byte(field)); exists {
return influxql.Tag
}
return influxql.Unknown
2015-11-04 21:06:06 +00:00
}
// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates.
set := map[string]influxql.Source{}
// Iterate all sources, expanding regexes when they're found.
for _, source := range sources {
switch src := source.(type) {
case *influxql.Measurement:
// Add non-regex measurements directly to the set.
if src.Regex == nil {
set[src.String()] = src
continue
}
// Loop over matching measurements.
2016-11-11 16:25:53 +00:00
names, err := s.engine.MeasurementNamesByRegex(src.Regex.Val)
if err != nil {
return nil, err
}
2016-11-11 16:25:53 +00:00
for _, name := range names {
other := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
2016-11-11 16:25:53 +00:00
Name: string(name),
}
set[other.String()] = other
}
default:
return nil, fmt.Errorf("expandSources: unsupported source type: %T", source)
}
}
// Convert set to sorted slice.
names := make([]string, 0, len(set))
for name := range set {
names = append(names, name)
}
sort.Strings(names)
// Convert set to a list of Sources.
expanded := make(influxql.Sources, 0, len(set))
for _, name := range names {
expanded = append(expanded, set[name])
}
return expanded, nil
}
2016-04-29 00:29:09 +00:00
// Restore restores data to the underlying engine for the shard.
// The shard is reopened after restore.
func (s *Shard) Restore(r io.Reader, basePath string) error {
s.mu.Lock()
// Restore to engine.
if err := s.engine.Restore(r, basePath); err != nil {
s.mu.Unlock()
return err
}
s.mu.Unlock()
// Close shard.
if err := s.Close(); err != nil {
return err
}
// Reopen engine.
2016-05-02 17:47:31 +00:00
return s.Open()
2016-04-29 00:29:09 +00:00
}
2016-05-09 15:53:34 +00:00
// CreateSnapshot will return a path to a temp directory
2016-12-31 05:12:37 +00:00
// containing hard links to the underlying shard files.
2016-05-09 15:53:34 +00:00
func (s *Shard) CreateSnapshot() (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.engine.CreateSnapshot()
}
2016-10-05 05:52:49 +00:00
func (s *Shard) monitor() {
2016-05-18 15:04:50 +00:00
t := time.NewTicker(monitorStatInterval)
2016-05-18 14:34:06 +00:00
defer t.Stop()
2016-10-05 05:52:49 +00:00
t2 := time.NewTicker(time.Minute)
defer t2.Stop()
var changed time.Time
2016-05-18 14:34:06 +00:00
for {
select {
case <-s.closing:
return
case <-t.C:
// Checking DiskSize can be expensive with a lot of shards and TSM files, only
// check if something has changed.
lm := s.LastModified()
if lm.Equal(changed) {
continue
}
2016-05-18 14:34:06 +00:00
size, err := s.DiskSize()
if err != nil {
s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err))
2016-05-18 14:34:06 +00:00
continue
}
atomic.StoreInt64(&s.stats.DiskBytes, size)
changed = lm
2016-10-05 05:52:49 +00:00
case <-t2.C:
2016-11-16 18:57:55 +00:00
if s.options.Config.MaxValuesPerTag == 0 {
2016-10-05 05:52:49 +00:00
continue
}
2016-12-05 17:51:06 +00:00
names, err := s.MeasurementNamesByExpr(nil)
if err != nil {
2017-01-06 16:31:25 +00:00
s.logger.Warn("cannot retrieve measurement names", zap.Error(err))
2016-12-05 17:51:06 +00:00
continue
}
for _, name := range names {
s.engine.ForEachMeasurementTagKey(name, func(k []byte) error {
2017-03-24 15:48:10 +00:00
n := s.engine.TagKeyCardinality(name, k)
perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100)
if perc > 100 {
perc = 100
}
// Log at 80, 85, 90-100% levels
if perc == 80 || perc == 85 || perc >= 90 {
s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s",
perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k))
}
2016-12-05 17:51:06 +00:00
return nil
})
2016-10-05 05:52:49 +00:00
}
2016-05-18 14:34:06 +00:00
}
}
}
type ShardGroup interface {
MeasurementsByRegex(re *regexp.Regexp) []string
FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
MapType(measurement, field string) influxql.DataType
CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
}
// Shards represents a sortable list of shards.
type Shards []*Shard
2015-11-04 21:06:06 +00:00
2016-12-31 05:12:37 +00:00
// Len implements sort.Interface.
func (a Shards) Len() int { return len(a) }
// Less implements sort.Interface.
func (a Shards) Less(i, j int) bool { return a[i].id < a[j].id }
2016-12-31 05:12:37 +00:00
// Swap implements sort.Interface.
func (a Shards) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
2015-11-04 21:06:06 +00:00
func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string {
m := make(map[string]struct{})
for _, sh := range a {
names := sh.MeasurementsByRegex(re)
for _, name := range names {
m[name] = struct{}{}
}
}
if len(m) == 0 {
return nil
}
names := make([]string, 0, len(m))
for key := range m {
names = append(names, key)
}
sort.Strings(names)
return names
}
func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) {
fields = make(map[string]influxql.DataType)
dimensions = make(map[string]struct{})
for _, sh := range a {
f, d, err := sh.FieldDimensions(measurements)
if err != nil {
return nil, nil, err
}
for k, typ := range f {
if _, ok := fields[k]; typ != influxql.Unknown && (!ok || typ < fields[k]) {
fields[k] = typ
}
}
for k := range d {
dimensions[k] = struct{}{}
}
}
return
}
func (a Shards) MapType(measurement, field string) influxql.DataType {
var typ influxql.DataType
for _, sh := range a {
if t := sh.MapType(measurement, field); typ.LessThan(t) {
typ = t
}
}
return typ
}
func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error) {
itrs := make([]influxql.Iterator, 0, len(a))
for _, sh := range a {
itr, err := sh.CreateIterator(measurement, opt)
if err != nil {
influxql.Iterators(itrs).Close()
return nil, err
} else if itr == nil {
continue
}
itrs = append(itrs, itr)
select {
case <-opt.InterruptCh:
influxql.Iterators(itrs).Close()
return nil, err
default:
}
// Enforce series limit at creation time.
if opt.MaxSeriesN > 0 {
stats := itr.Stats()
if stats.SeriesN > opt.MaxSeriesN {
influxql.Iterators(itrs).Close()
return nil, fmt.Errorf("max-select-series limit exceeded: (%d/%d)", stats.SeriesN, opt.MaxSeriesN)
}
}
}
return influxql.Iterators(itrs).Merge(opt)
}
func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
// Use a map as a set to prevent duplicates.
set := map[string]influxql.Source{}
// Iterate through every shard and expand the sources.
for _, sh := range a {
expanded, err := sh.ExpandSources(sources)
if err != nil {
return nil, err
}
for _, src := range expanded {
switch src := src.(type) {
case *influxql.Measurement:
set[src.String()] = src
default:
return nil, fmt.Errorf("Store.ExpandSources: unsupported source type: %T", src)
}
}
}
// Convert set to sorted slice.
names := make([]string, 0, len(set))
for name := range set {
names = append(names, name)
}
sort.Strings(names)
// Convert set to a list of Sources.
sorted := make([]influxql.Source, 0, len(set))
for _, name := range names {
sorted = append(sorted, set[name])
}
return sorted, nil
}
2016-02-10 20:04:18 +00:00
// MeasurementFields holds the fields of a measurement and their codec.
2015-07-22 14:53:20 +00:00
type MeasurementFields struct {
mu sync.RWMutex
2016-05-18 12:34:11 +00:00
fields map[string]*Field
}
2016-11-17 13:24:32 +00:00
// NewMeasurementFields returns an initialised *MeasurementFields value.
func NewMeasurementFields() *MeasurementFields {
return &MeasurementFields{fields: make(map[string]*Field)}
}
// MarshalBinary encodes the object to a binary format.
2015-07-22 14:53:20 +00:00
func (m *MeasurementFields) MarshalBinary() ([]byte, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var pb internal.MeasurementFields
for _, f := range m.fields {
id := int32(f.ID)
name := f.Name
t := int32(f.Type)
pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t})
}
return proto.Marshal(&pb)
}
// UnmarshalBinary decodes the object from a binary format.
2015-07-22 14:53:20 +00:00
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
var pb internal.MeasurementFields
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
m.fields = make(map[string]*Field, len(pb.Fields))
for _, f := range pb.Fields {
m.fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())}
}
return nil
}
2015-08-10 18:46:57 +00:00
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error {
m.mu.RLock()
// Ignore if the field already exists.
if f := m.fields[name]; f != nil {
if f.Type != typ {
m.mu.RUnlock()
return ErrFieldTypeConflict
}
m.mu.RUnlock()
return nil
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
2017-03-03 16:27:01 +00:00
// Re-check field and type under write lock.
if f := m.fields[name]; f != nil {
2017-03-03 16:27:01 +00:00
if f.Type != typ {
return ErrFieldTypeConflict
}
return nil
}
// Create and append a new field.
2015-07-23 16:33:37 +00:00
f := &Field{
ID: uint8(len(m.fields) + 1),
Name: name,
Type: typ,
}
m.fields[name] = f
return nil
}
2016-11-16 18:57:55 +00:00
func (m *MeasurementFields) FieldN() int {
m.mu.RLock()
n := len(m.fields)
m.mu.RUnlock()
return n
}
2016-11-17 13:24:32 +00:00
// Field returns the field for name, or nil if there is no field for name.
func (m *MeasurementFields) Field(name string) *Field {
m.mu.RLock()
f := m.fields[name]
m.mu.RUnlock()
return f
}
2016-11-27 20:15:32 +00:00
func (m *MeasurementFields) HasField(name string) bool {
m.mu.RLock()
f := m.fields[name]
m.mu.RUnlock()
return f != nil
}
2016-11-17 13:24:32 +00:00
// FieldBytes returns the field for name, or nil if there is no field for name.
// FieldBytes should be preferred to Field when the caller has a []byte, because
// it avoids a string allocation, which can't be avoided if the caller converts
// the []byte to a string and calls Field.
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
m.mu.RLock()
f := m.fields[string(name)]
m.mu.RUnlock()
return f
}
2016-11-17 13:24:32 +00:00
// FieldSet returns the set of fields and their types for the measurement.
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
m.mu.RLock()
defer m.mu.RUnlock()
fields := make(map[string]influxql.DataType)
for name, f := range m.fields {
fields[name] = f.Type
}
return fields
}
// Clone returns copy of the MeasurementFields
func (m *MeasurementFields) Clone() *MeasurementFields {
m.mu.RLock()
defer m.mu.RUnlock()
fields := make(map[string]*Field, len(m.fields))
for key, field := range m.fields {
fields[key] = field
}
return &MeasurementFields{
fields: fields,
}
}
2016-11-28 16:59:36 +00:00
// MeasurementFieldSet represents a collection of fields by measurement.
// This safe for concurrent use.
type MeasurementFieldSet struct {
mu sync.RWMutex
fields map[string]*MeasurementFields
}
// NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func NewMeasurementFieldSet() *MeasurementFieldSet {
return &MeasurementFieldSet{
fields: make(map[string]*MeasurementFields),
}
}
// Fields returns fields for a measurement by name.
func (fs *MeasurementFieldSet) Fields(name string) *MeasurementFields {
fs.mu.RLock()
mf := fs.fields[name]
fs.mu.RUnlock()
return mf
}
// CreateFieldsIfNotExists returns fields for a measurement by name.
func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name string) *MeasurementFields {
fs.mu.RLock()
mf := fs.fields[name]
fs.mu.RUnlock()
if mf != nil {
return mf
}
fs.mu.Lock()
mf = fs.fields[name]
if mf == nil {
mf = NewMeasurementFields()
fs.fields[name] = mf
}
fs.mu.Unlock()
return mf
}
// Delete removes a field set for a measurement.
func (fs *MeasurementFieldSet) Delete(name string) {
fs.mu.Lock()
delete(fs.fields, name)
fs.mu.Unlock()
}
// DeleteWithLock executes fn and removes a field set from a measurement under lock.
func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error {
fs.mu.Lock()
defer fs.mu.Unlock()
if err := fn(); err != nil {
return err
}
delete(fs.fields, name)
return nil
}
// Field represents a series field.
2015-07-23 16:33:37 +00:00
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type influxql.DataType `json:"type,omitempty"`
}
2016-11-17 13:24:32 +00:00
// NewFieldKeysIterator returns an iterator that can be iterated over to
// retrieve field keys.
func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
itr := &fieldKeysIterator{sh: sh}
// Retrieve measurements from shard. Filter if condition specified.
2016-12-05 17:51:06 +00:00
names, err := sh.engine.MeasurementNamesByExpr(opt.Condition)
if err != nil {
return nil, err
}
2016-12-05 17:51:06 +00:00
itr.names = names
return itr, nil
}
// fieldKeysIterator iterates over measurements and gets field keys from each measurement.
type fieldKeysIterator struct {
2016-12-05 17:51:06 +00:00
sh *Shard
names [][]byte // remaining measurement names
buf struct {
name []byte // current measurement name
fields []Field // current measurement's fields
}
}
// Stats returns stats about the points processed.
func (itr *fieldKeysIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
// Close closes the iterator.
func (itr *fieldKeysIterator) Close() error { return nil }
// Next emits the next tag key name.
func (itr *fieldKeysIterator) Next() (*influxql.FloatPoint, error) {
for {
// If there are no more keys then move to the next measurements.
if len(itr.buf.fields) == 0 {
2016-12-05 17:51:06 +00:00
if len(itr.names) == 0 {
return nil, nil
}
2016-12-05 17:51:06 +00:00
itr.buf.name = itr.names[0]
mf := itr.sh.engine.MeasurementFields(string(itr.buf.name))
if mf != nil {
fset := mf.FieldSet()
if len(fset) == 0 {
2016-12-05 17:51:06 +00:00
itr.names = itr.names[1:]
continue
}
keys := make([]string, 0, len(fset))
for k := range fset {
keys = append(keys, k)
}
sort.Strings(keys)
itr.buf.fields = make([]Field, len(keys))
for i, name := range keys {
itr.buf.fields[i] = Field{Name: name, Type: fset[name]}
}
}
2016-12-05 17:51:06 +00:00
itr.names = itr.names[1:]
continue
}
// Return next key.
field := itr.buf.fields[0]
p := &influxql.FloatPoint{
2016-12-05 17:51:06 +00:00
Name: string(itr.buf.name),
Aux: []interface{}{field.Name, field.Type.String()},
}
itr.buf.fields = itr.buf.fields[1:]
return p, nil
}
}
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
2016-12-05 17:51:06 +00:00
fn := func(name []byte) ([][]byte, error) {
var keys [][]byte
if err := sh.engine.ForEachMeasurementTagKey(name, func(key []byte) error {
keys = append(keys, key)
return nil
}); err != nil {
return nil, err
}
2016-12-05 17:51:06 +00:00
return keys, nil
}
2016-12-05 17:51:06 +00:00
return newMeasurementKeysIterator(sh, fn, opt)
}
// measurementKeyFunc is the function called by measurementKeysIterator.
2016-12-05 17:51:06 +00:00
type measurementKeyFunc func(name []byte) ([][]byte, error)
func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.IteratorOptions) (*measurementKeysIterator, error) {
itr := &measurementKeysIterator{fn: fn}
2016-02-04 18:00:50 +00:00
2016-12-05 17:51:06 +00:00
names, err := sh.engine.MeasurementNamesByExpr(opt.Condition)
if err != nil {
return nil, err
2016-02-04 18:00:50 +00:00
}
2016-12-05 17:51:06 +00:00
itr.names = names
2016-02-04 18:00:50 +00:00
return itr, nil
}
// measurementKeysIterator iterates over measurements and gets keys from each measurement.
type measurementKeysIterator struct {
2016-12-05 17:51:06 +00:00
names [][]byte // remaining measurement names
buf struct {
name []byte // current measurement name
keys [][]byte // current measurement's keys
}
fn measurementKeyFunc
}
// Stats returns stats about the points processed.
func (itr *measurementKeysIterator) Stats() influxql.IteratorStats { return influxql.IteratorStats{} }
2016-02-04 18:00:50 +00:00
// Close closes the iterator.
func (itr *measurementKeysIterator) Close() error { return nil }
2016-02-04 18:00:50 +00:00
// Next emits the next tag key name.
func (itr *measurementKeysIterator) Next() (*influxql.FloatPoint, error) {
2016-02-04 18:00:50 +00:00
for {
// If there are no more keys then move to the next measurements.
if len(itr.buf.keys) == 0 {
2016-12-05 17:51:06 +00:00
if len(itr.names) == 0 {
return nil, nil
2016-02-04 18:00:50 +00:00
}
2016-12-05 17:51:06 +00:00
itr.buf.name, itr.names = itr.names[0], itr.names[1:]
keys, err := itr.fn(itr.buf.name)
if err != nil {
return nil, err
}
itr.buf.keys = keys
2016-02-04 18:00:50 +00:00
continue
}
// Return next key.
p := &influxql.FloatPoint{
2016-12-05 17:51:06 +00:00
Name: string(itr.buf.name),
Aux: []interface{}{string(itr.buf.keys[0])},
2016-02-04 18:00:50 +00:00
}
itr.buf.keys = itr.buf.keys[1:]
return p, nil
2016-02-04 18:00:50 +00:00
}
}
2016-11-15 16:20:00 +00:00
// LimitError represents an error caused by a configurable limit.
type LimitError struct {
Reason string
}
func (e *LimitError) Error() string { return e.Reason }