Update godoc for the coordinator package.

pull/7781/head
Mark Rushakoff 2016-12-28 16:12:55 -08:00
parent 6768c6ed3b
commit 1d3da81a7d
3 changed files with 33 additions and 17 deletions

View File

@ -1,3 +1,5 @@
// Package coordinator contains abstractions for writing points, executing statements,
// and accessing meta data.
package coordinator
import (
@ -24,7 +26,7 @@ const (
DefaultMaxSelectSeriesN = 0
)
// Config represents the configuration for the clustering service.
// Config represents the configuration for the coordinator service.
type Config struct {
WriteTimeout toml.Duration `toml:"write-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`

View File

@ -15,7 +15,7 @@ import (
"go.uber.org/zap"
)
// The statistics generated by the "write" mdoule
// The keys for statistics generated by the "write" module.
const (
statWriteReq = "req"
statPointWriteReq = "pointReq"
@ -73,7 +73,7 @@ type PointsWriter struct {
stats *WriteStatistics
}
// WritePointsRequest represents a request to write point data to the cluster
// WritePointsRequest represents a request to write point data to the cluster.
type WritePointsRequest struct {
Database string
RetentionPolicy string
@ -101,13 +101,13 @@ func NewPointsWriter() *PointsWriter {
}
}
// ShardMapping contains a mapping of a shards to a points.
// ShardMapping contains a mapping of shards to points.
type ShardMapping struct {
Points map[uint64][]models.Point // The points associated with a shard ID
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
}
// NewShardMapping creates an empty ShardMapping
// NewShardMapping creates an empty ShardMapping.
func NewShardMapping() *ShardMapping {
return &ShardMapping{
Points: map[uint64][]models.Point{},
@ -115,13 +115,13 @@ func NewShardMapping() *ShardMapping {
}
}
// MapPoint maps a point to shard
// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
s.Points[shardInfo.ID] = append(s.Points[shardInfo.ID], p)
s.Shards[shardInfo.ID] = shardInfo
}
// Open opens the communication channel with the point writer
// Open opens the communication channel with the point writer.
func (w *PointsWriter) Open() error {
w.mu.Lock()
defer w.mu.Unlock()
@ -132,7 +132,7 @@ func (w *PointsWriter) Open() error {
return nil
}
// Close closes the communication channel with the point writer
// Close closes the communication channel with the point writer.
func (w *PointsWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
@ -148,6 +148,7 @@ func (w *PointsWriter) Close() error {
return nil
}
// WithLogger sets the Logger on w.
func (w *PointsWriter) WithLogger(log zap.Logger) {
w.Logger = log.With(zap.String("service", "write"))
}
@ -249,15 +250,16 @@ func (l sgList) Covers(t time.Time) bool {
return l.ShardGroupAt(t) != nil
}
// ShardGroupAt attempts to find a shard group that could contain a point
// at the given time.
//
// Shard groups are sorted first according to end time, and then according
// to start time. Therefore, if there are multiple shard groups that match
// this point's time they will be preferred in this order:
//
// - a shard group with the earliest end time;
// - (assuming identical end times) the shard group with the earliest start time.
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
// Attempt to find a shard group that could contain this point.
// Shard groups are sorted first according to end time, and then according
// to start time. Therefore, if there are multiple shard groups that match
// this point's time they will be preferred in this order:
//
// - a shard group with the earliest end time;
// - (assuming identical end times) the shard group with the earliest start
// time.
idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })
// We couldn't find a shard group the point falls into.
@ -275,7 +277,7 @@ func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
}
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
// a cluster structure for information. This is to avoid a circular dependency
// a cluster structure for information. This is to avoid a circular dependency.
func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
}

View File

@ -17,6 +17,8 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
// ErrDatabaseNameRequired is returned when executing statements that require a database,
// when a database has not been provided.
var ErrDatabaseNameRequired = errors.New("database name required")
type pointsWriter interface {
@ -45,6 +47,7 @@ type StatementExecutor struct {
MaxSelectBucketsN int
}
// ExecuteStatement executes the given statement with the given execution context.
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
@ -906,6 +909,8 @@ func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersState
return []*models.Row{row}, nil
}
// BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries
// write their points to the destination in batches.
type BufferedPointsWriter struct {
w pointsWriter
buf []models.Point
@ -913,6 +918,7 @@ type BufferedPointsWriter struct {
retentionPolicy string
}
// NewBufferedPointsWriter returns a new BufferedPointsWriter.
func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter {
return &BufferedPointsWriter{
w: w,
@ -922,6 +928,7 @@ func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, c
}
}
// WritePointsInto implements pointsWriter for BufferedPointsWriter.
func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error {
// Make sure we're buffering points only for the expected destination.
if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy {
@ -1147,10 +1154,15 @@ type TSDBStore interface {
TagValues(database string, cond influxql.Expr) ([]tsdb.TagValues, error)
}
var _ TSDBStore = LocalTSDBStore{}
// LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator
// to satisfy the TSDBStore interface.
type LocalTSDBStore struct {
*tsdb.Store
}
// IteratorCreator returns an influxql.IteratorCreator for the given shards, with the given select options.
func (s LocalTSDBStore) IteratorCreator(shards []meta.ShardInfo, opt *influxql.SelectOptions) (influxql.IteratorCreator, error) {
shardIDs := make([]uint64, len(shards))
for i, sh := range shards {