2016-05-11 16:32:56 +00:00
|
|
|
package coordinator
|
2015-05-08 04:47:52 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
2016-04-20 20:07:08 +00:00
|
|
|
"io"
|
2015-06-09 18:49:58 +00:00
|
|
|
"log"
|
|
|
|
"os"
|
2016-09-27 16:05:52 +00:00
|
|
|
"sort"
|
2015-05-19 13:40:20 +00:00
|
|
|
"sync"
|
2016-07-07 16:13:56 +00:00
|
|
|
"sync/atomic"
|
2015-05-08 04:47:52 +00:00
|
|
|
"time"
|
2015-05-08 20:27:31 +00:00
|
|
|
|
2016-02-10 17:26:18 +00:00
|
|
|
"github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
2015-05-08 04:47:52 +00:00
|
|
|
)
|
|
|
|
|
2015-09-08 23:56:03 +00:00
|
|
|
// The statistics generated by the "write" mdoule
|
|
|
|
const (
|
2016-06-21 03:41:07 +00:00
|
|
|
statWriteReq = "req"
|
|
|
|
statPointWriteReq = "pointReq"
|
|
|
|
statPointWriteReqLocal = "pointReqLocal"
|
|
|
|
statWriteOK = "writeOk"
|
2016-07-05 17:29:14 +00:00
|
|
|
statWriteDrop = "writeDrop"
|
2016-06-21 03:41:07 +00:00
|
|
|
statWriteTimeout = "writeTimeout"
|
|
|
|
statWriteErr = "writeError"
|
|
|
|
statSubWriteOK = "subWriteOk"
|
|
|
|
statSubWriteDrop = "subWriteDrop"
|
2015-09-08 23:56:03 +00:00
|
|
|
)
|
|
|
|
|
2015-05-19 13:40:20 +00:00
|
|
|
var (
|
2015-05-30 20:00:46 +00:00
|
|
|
// ErrTimeout is returned when a write times out.
|
2015-05-19 13:40:20 +00:00
|
|
|
ErrTimeout = errors.New("timeout")
|
|
|
|
|
|
|
|
// ErrPartialWrite is returned when a write partially succeeds but does
|
2015-05-30 20:00:46 +00:00
|
|
|
// not meet the requested consistency level.
|
2015-05-19 13:40:20 +00:00
|
|
|
ErrPartialWrite = errors.New("partial write")
|
|
|
|
|
2015-05-30 20:00:46 +00:00
|
|
|
// ErrWriteFailed is returned when no writes succeeded.
|
2015-05-19 13:40:20 +00:00
|
|
|
ErrWriteFailed = errors.New("write failed")
|
|
|
|
)
|
2015-05-08 04:47:52 +00:00
|
|
|
|
2015-05-30 16:21:35 +00:00
|
|
|
// PointsWriter handles writes across multiple local and remote data nodes.
|
|
|
|
type PointsWriter struct {
|
2015-07-02 16:32:28 +00:00
|
|
|
mu sync.RWMutex
|
|
|
|
closing chan struct{}
|
|
|
|
WriteTimeout time.Duration
|
|
|
|
Logger *log.Logger
|
2015-05-19 20:56:52 +00:00
|
|
|
|
2016-01-06 22:34:34 +00:00
|
|
|
Node *influxdb.Node
|
2015-12-23 15:48:25 +00:00
|
|
|
|
|
|
|
MetaClient interface {
|
2016-04-30 22:04:38 +00:00
|
|
|
Database(name string) (di *meta.DatabaseInfo)
|
2015-05-21 16:31:25 +00:00
|
|
|
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
|
2016-01-07 16:30:00 +00:00
|
|
|
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
2015-06-05 21:32:15 +00:00
|
|
|
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
|
2015-05-21 16:31:25 +00:00
|
|
|
}
|
2015-05-26 15:41:15 +00:00
|
|
|
|
2015-06-02 20:47:59 +00:00
|
|
|
TSDBStore interface {
|
2016-06-01 22:17:18 +00:00
|
|
|
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
|
2015-09-16 20:33:08 +00:00
|
|
|
WriteToShard(shardID uint64, points []models.Point) error
|
2015-05-26 19:56:54 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 16:21:35 +00:00
|
|
|
ShardWriter interface {
|
2015-09-16 20:33:08 +00:00
|
|
|
WriteShard(shardID, ownerID uint64, points []models.Point) error
|
2015-05-26 19:56:54 +00:00
|
|
|
}
|
2015-06-03 03:31:04 +00:00
|
|
|
|
2015-10-08 16:45:23 +00:00
|
|
|
Subscriber interface {
|
|
|
|
Points() chan<- *WritePointsRequest
|
|
|
|
}
|
2015-11-05 22:27:00 +00:00
|
|
|
subPoints chan<- *WritePointsRequest
|
2015-10-08 16:45:23 +00:00
|
|
|
|
2016-07-07 16:13:56 +00:00
|
|
|
stats *WriteStatistics
|
2015-05-19 13:40:20 +00:00
|
|
|
}
|
|
|
|
|
2016-05-11 15:15:38 +00:00
|
|
|
// WritePointsRequest represents a request to write point data to the cluster
|
|
|
|
type WritePointsRequest struct {
|
|
|
|
Database string
|
|
|
|
RetentionPolicy string
|
|
|
|
Points []models.Point
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddPoint adds a point to the WritePointRequest with field key 'value'
|
|
|
|
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
|
|
|
|
pt, err := models.NewPoint(
|
2016-06-30 16:49:53 +00:00
|
|
|
name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
|
2016-05-11 15:15:38 +00:00
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
w.Points = append(w.Points, pt)
|
|
|
|
}
|
|
|
|
|
2015-05-30 16:21:35 +00:00
|
|
|
// NewPointsWriter returns a new instance of PointsWriter for a node.
|
2015-06-01 17:20:57 +00:00
|
|
|
func NewPointsWriter() *PointsWriter {
|
2015-05-30 16:21:35 +00:00
|
|
|
return &PointsWriter{
|
2015-07-02 16:32:28 +00:00
|
|
|
closing: make(chan struct{}),
|
|
|
|
WriteTimeout: DefaultWriteTimeout,
|
|
|
|
Logger: log.New(os.Stderr, "[write] ", log.LstdFlags),
|
2016-07-07 16:13:56 +00:00
|
|
|
stats: &WriteStatistics{},
|
2015-05-19 20:56:52 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-05-20 15:24:10 +00:00
|
|
|
// ShardMapping contains a mapping of a shards to a points.
|
|
|
|
type ShardMapping struct {
|
2015-09-16 20:33:08 +00:00
|
|
|
Points map[uint64][]models.Point // The points associated with a shard ID
|
2015-05-26 15:41:15 +00:00
|
|
|
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
|
2015-05-20 15:24:10 +00:00
|
|
|
}
|
|
|
|
|
2015-05-19 13:40:20 +00:00
|
|
|
// NewShardMapping creates an empty ShardMapping
|
|
|
|
func NewShardMapping() *ShardMapping {
|
|
|
|
return &ShardMapping{
|
2015-09-16 20:33:08 +00:00
|
|
|
Points: map[uint64][]models.Point{},
|
2015-05-26 15:41:15 +00:00
|
|
|
Shards: map[uint64]*meta.ShardInfo{},
|
2015-05-19 13:40:20 +00:00
|
|
|
}
|
2015-05-08 20:27:31 +00:00
|
|
|
}
|
|
|
|
|
2015-05-19 13:40:20 +00:00
|
|
|
// MapPoint maps a point to shard
|
2015-09-16 20:33:08 +00:00
|
|
|
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
|
2016-09-21 17:12:09 +00:00
|
|
|
s.Points[shardInfo.ID] = append(s.Points[shardInfo.ID], p)
|
2015-05-19 13:40:20 +00:00
|
|
|
s.Shards[shardInfo.ID] = shardInfo
|
|
|
|
}
|
2015-05-08 20:27:31 +00:00
|
|
|
|
2015-09-29 02:40:58 +00:00
|
|
|
// Open opens the communication channel with the point writer
|
2015-05-30 16:21:35 +00:00
|
|
|
func (w *PointsWriter) Open() error {
|
|
|
|
w.mu.Lock()
|
|
|
|
defer w.mu.Unlock()
|
2015-11-05 22:27:00 +00:00
|
|
|
w.closing = make(chan struct{})
|
|
|
|
if w.Subscriber != nil {
|
|
|
|
w.subPoints = w.Subscriber.Points()
|
2015-05-19 20:56:52 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-09-29 02:40:58 +00:00
|
|
|
// Close closes the communication channel with the point writer
|
2015-05-30 16:21:35 +00:00
|
|
|
func (w *PointsWriter) Close() error {
|
|
|
|
w.mu.Lock()
|
|
|
|
defer w.mu.Unlock()
|
|
|
|
if w.closing != nil {
|
|
|
|
close(w.closing)
|
2015-11-05 22:27:00 +00:00
|
|
|
}
|
|
|
|
if w.subPoints != nil {
|
|
|
|
// 'nil' channels always block so this makes the
|
|
|
|
// select statement in WritePoints hit its default case
|
|
|
|
// dropping any in-flight writes.
|
|
|
|
w.subPoints = nil
|
2015-05-19 20:56:52 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-04-20 20:07:08 +00:00
|
|
|
// SetLogOutput sets the writer to which all logs are written. It must not be
|
|
|
|
// called after Open is called.
|
|
|
|
func (w *PointsWriter) SetLogOutput(lw io.Writer) {
|
|
|
|
w.Logger = log.New(lw, "[write] ", log.LstdFlags)
|
|
|
|
}
|
|
|
|
|
2016-07-07 16:13:56 +00:00
|
|
|
// WriteStatistics keeps statistics related to the PointsWriter.
|
|
|
|
type WriteStatistics struct {
|
|
|
|
WriteReq int64
|
|
|
|
PointWriteReq int64
|
|
|
|
PointWriteReqLocal int64
|
|
|
|
WriteOK int64
|
|
|
|
WriteDropped int64
|
|
|
|
WriteTimeout int64
|
|
|
|
WriteErr int64
|
|
|
|
SubWriteOK int64
|
|
|
|
SubWriteDrop int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Statistics returns statistics for periodic monitoring.
|
|
|
|
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
|
|
|
|
return []models.Statistic{{
|
|
|
|
Name: "write",
|
2016-08-19 10:12:35 +00:00
|
|
|
Tags: tags,
|
2016-07-07 16:13:56 +00:00
|
|
|
Values: map[string]interface{}{
|
|
|
|
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
|
|
|
|
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),
|
|
|
|
statPointWriteReqLocal: atomic.LoadInt64(&w.stats.PointWriteReqLocal),
|
|
|
|
statWriteOK: atomic.LoadInt64(&w.stats.WriteOK),
|
|
|
|
statWriteDrop: atomic.LoadInt64(&w.stats.WriteDropped),
|
|
|
|
statWriteTimeout: atomic.LoadInt64(&w.stats.WriteTimeout),
|
|
|
|
statWriteErr: atomic.LoadInt64(&w.stats.WriteErr),
|
|
|
|
statSubWriteOK: atomic.LoadInt64(&w.stats.SubWriteOK),
|
|
|
|
statSubWriteDrop: atomic.LoadInt64(&w.stats.SubWriteDrop),
|
|
|
|
},
|
|
|
|
}}
|
|
|
|
}
|
|
|
|
|
2015-05-19 13:40:20 +00:00
|
|
|
// MapShards maps the points contained in wp to a ShardMapping. If a point
|
|
|
|
// maps to a shard group or shard that does not currently exist, it will be
|
|
|
|
// created before returning the mapping.
|
2015-05-30 16:21:35 +00:00
|
|
|
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
|
2015-12-23 15:48:25 +00:00
|
|
|
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
|
2015-05-08 20:27:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2016-07-05 17:29:14 +00:00
|
|
|
} else if rp == nil {
|
2015-09-17 01:31:03 +00:00
|
|
|
return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
|
|
|
|
}
|
2015-05-08 20:27:31 +00:00
|
|
|
|
2016-09-27 16:05:52 +00:00
|
|
|
// Holds all the shard groups and shards that are required for writes.
|
|
|
|
list := make(sgList, 0, 8)
|
2016-07-05 17:29:14 +00:00
|
|
|
min := time.Unix(0, models.MinNanoTime)
|
|
|
|
if rp.Duration > 0 {
|
|
|
|
min = time.Now().Add(-rp.Duration)
|
|
|
|
}
|
|
|
|
|
2015-05-08 20:27:31 +00:00
|
|
|
for _, p := range wp.Points {
|
2016-09-27 16:05:52 +00:00
|
|
|
// Either the point is outside the scope of the RP, or we already have
|
|
|
|
// a suitable shard group for the point.
|
|
|
|
if p.Time().Before(min) || list.Covers(p.Time()) {
|
2016-07-05 17:29:14 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-05-08 20:27:31 +00:00
|
|
|
|
2016-09-27 16:05:52 +00:00
|
|
|
// No shard groups overlap with the point's time, so we will create
|
|
|
|
// a new shard group for this point.
|
|
|
|
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
|
2015-05-08 20:27:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-09-27 16:05:52 +00:00
|
|
|
|
|
|
|
if sg == nil {
|
|
|
|
return nil, errors.New("nil shard group")
|
|
|
|
}
|
|
|
|
list = list.Append(*sg)
|
2015-05-08 20:27:31 +00:00
|
|
|
}
|
|
|
|
|
2015-05-20 15:24:10 +00:00
|
|
|
mapping := NewShardMapping()
|
2015-05-08 20:27:31 +00:00
|
|
|
for _, p := range wp.Points {
|
2016-09-27 16:05:52 +00:00
|
|
|
sg := list.ShardGroupAt(p.Time())
|
|
|
|
if sg == nil {
|
|
|
|
// We didn't create a shard group because the point was outside the
|
|
|
|
// scope of the RP.
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteDropped, 1)
|
2016-07-05 17:29:14 +00:00
|
|
|
continue
|
|
|
|
}
|
2016-09-27 16:05:52 +00:00
|
|
|
|
2015-05-20 18:52:11 +00:00
|
|
|
sh := sg.ShardFor(p.HashID())
|
2015-05-26 15:41:15 +00:00
|
|
|
mapping.MapPoint(&sh, p)
|
2015-05-08 20:27:31 +00:00
|
|
|
}
|
2015-05-20 15:24:10 +00:00
|
|
|
return mapping, nil
|
2015-05-08 04:47:52 +00:00
|
|
|
}
|
|
|
|
|
2016-09-27 16:05:52 +00:00
|
|
|
// sgList is a wrapper around a meta.ShardGroupInfos where we can also check
|
|
|
|
// if a given time is covered by any of the shard groups in the list.
|
|
|
|
type sgList meta.ShardGroupInfos
|
|
|
|
|
|
|
|
func (l sgList) Covers(t time.Time) bool {
|
|
|
|
if len(l) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return l.ShardGroupAt(t) != nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
|
|
|
|
// Attempt to find a shard group that could contain this point.
|
|
|
|
// Shard groups are sorted first according to end time, and then according
|
|
|
|
// to start time. Therefore, if there are multiple shard groups that match
|
|
|
|
// this point's time they will be preferred in this order:
|
|
|
|
//
|
|
|
|
// - a shard group with the earliest end time;
|
|
|
|
// - (assuming identical end times) the shard group with the earliest start
|
|
|
|
// time.
|
|
|
|
idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })
|
|
|
|
if idx == len(l) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return &l[idx]
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append appends a shard group to the list, and returns a sorted list.
|
|
|
|
func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
|
|
|
|
next := append(l, sgi)
|
|
|
|
sort.Sort(meta.ShardGroupInfos(next))
|
|
|
|
return next
|
|
|
|
}
|
|
|
|
|
2015-09-25 18:26:05 +00:00
|
|
|
// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
|
|
|
|
// a cluster structure for information. This is to avoid a circular dependency
|
2016-02-12 22:10:02 +00:00
|
|
|
func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error {
|
2016-03-30 18:53:06 +00:00
|
|
|
return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points)
|
2015-09-25 18:26:05 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 16:21:35 +00:00
|
|
|
// WritePoints writes across multiple local and remote data nodes according the consistency level.
|
2016-03-10 23:22:22 +00:00
|
|
|
func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteReq, 1)
|
|
|
|
atomic.AddInt64(&w.stats.PointWriteReq, int64(len(points)))
|
2015-09-08 23:56:03 +00:00
|
|
|
|
2016-03-10 23:22:22 +00:00
|
|
|
if retentionPolicy == "" {
|
2016-04-30 22:04:38 +00:00
|
|
|
db := w.MetaClient.Database(database)
|
|
|
|
if db == nil {
|
2016-03-10 23:22:22 +00:00
|
|
|
return influxdb.ErrDatabaseNotFound(database)
|
2015-06-01 23:16:44 +00:00
|
|
|
}
|
2016-03-10 23:22:22 +00:00
|
|
|
retentionPolicy = db.DefaultRetentionPolicy
|
2015-06-01 23:16:44 +00:00
|
|
|
}
|
|
|
|
|
2016-03-10 23:22:22 +00:00
|
|
|
shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})
|
2015-05-08 20:27:31 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-09-21 17:12:09 +00:00
|
|
|
// Write each shard in it's own goroutine and return as soon as one fails.
|
2015-05-19 13:40:20 +00:00
|
|
|
ch := make(chan error, len(shardMappings.Points))
|
|
|
|
for shardID, points := range shardMappings.Points {
|
2015-09-16 20:33:08 +00:00
|
|
|
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
|
2016-03-10 23:22:22 +00:00
|
|
|
ch <- w.writeToShard(shard, database, retentionPolicy, points)
|
|
|
|
}(shardMappings.Shards[shardID], database, retentionPolicy, points)
|
2015-05-19 13:40:20 +00:00
|
|
|
}
|
2015-05-08 04:47:52 +00:00
|
|
|
|
2015-10-08 16:45:23 +00:00
|
|
|
// Send points to subscriptions if possible.
|
2015-11-05 22:27:00 +00:00
|
|
|
ok := false
|
|
|
|
// We need to lock just in case the channel is about to be nil'ed
|
|
|
|
w.mu.RLock()
|
|
|
|
select {
|
2016-03-10 23:22:22 +00:00
|
|
|
case w.subPoints <- &WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points}:
|
2015-11-05 22:27:00 +00:00
|
|
|
ok = true
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
w.mu.RUnlock()
|
|
|
|
if ok {
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.SubWriteOK, 1)
|
2015-11-05 22:27:00 +00:00
|
|
|
} else {
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.SubWriteDrop, 1)
|
2015-10-08 16:45:23 +00:00
|
|
|
}
|
|
|
|
|
2016-05-11 18:59:51 +00:00
|
|
|
timeout := time.NewTimer(w.WriteTimeout)
|
|
|
|
defer timeout.Stop()
|
2015-05-19 13:40:20 +00:00
|
|
|
for range shardMappings.Points {
|
|
|
|
select {
|
2015-05-30 16:21:35 +00:00
|
|
|
case <-w.closing:
|
2015-05-19 20:56:52 +00:00
|
|
|
return ErrWriteFailed
|
2016-05-11 18:59:51 +00:00
|
|
|
case <-timeout.C:
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteTimeout, 1)
|
2016-05-11 18:59:51 +00:00
|
|
|
// return timeout error to caller
|
|
|
|
return ErrTimeout
|
2015-05-19 13:40:20 +00:00
|
|
|
case err := <-ch:
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-08 19:59:33 +00:00
|
|
|
// writeToShards writes points to a shard.
|
|
|
|
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) error {
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
|
2015-06-03 03:31:04 +00:00
|
|
|
|
2016-03-08 19:59:33 +00:00
|
|
|
err := w.TSDBStore.WriteToShard(shard.ID, points)
|
2016-03-08 21:42:50 +00:00
|
|
|
if err == nil {
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteOK, 1)
|
2016-03-08 21:42:50 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-03-08 19:59:33 +00:00
|
|
|
// If we've written to shard that should exist on the current node, but the store has
|
|
|
|
// not actually created this shard, tell it to create it and retry the write
|
|
|
|
if err == tsdb.ErrShardNotFound {
|
2016-06-01 22:17:18 +00:00
|
|
|
err = w.TSDBStore.CreateShard(database, retentionPolicy, shard.ID, true)
|
2016-03-08 19:59:33 +00:00
|
|
|
if err != nil {
|
|
|
|
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
|
2016-07-07 16:13:56 +00:00
|
|
|
|
|
|
|
atomic.AddInt64(&w.stats.WriteErr, 1)
|
2016-03-08 19:59:33 +00:00
|
|
|
return err
|
2015-08-27 07:29:44 +00:00
|
|
|
}
|
2015-05-26 15:41:15 +00:00
|
|
|
}
|
2016-03-08 19:59:33 +00:00
|
|
|
err = w.TSDBStore.WriteToShard(shard.ID, points)
|
|
|
|
if err != nil {
|
|
|
|
w.Logger.Printf("write failed for shard %d: %v", shard.ID, err)
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteErr, 1)
|
2016-03-08 19:59:33 +00:00
|
|
|
return err
|
2015-06-10 18:18:16 +00:00
|
|
|
}
|
|
|
|
|
2016-07-07 16:13:56 +00:00
|
|
|
atomic.AddInt64(&w.stats.WriteOK, 1)
|
2016-03-08 19:59:33 +00:00
|
|
|
return nil
|
2015-05-08 04:47:52 +00:00
|
|
|
}
|