Send database and retention policy with remote writes
There was a race where a remote write could arrive before a meta client cache update arrived. When this happened, the receiving node would drop the write because it could not determine what database and retention policy the shard it was supposed to create belonged to. This change sends the db and rp along with the write so that the receiving node does not need to consult the meta store. It also allows us to not send writes for shards that no longer exist instead of always sending them and having the receiving node logs fill up with dropped write requests. This second situation can occur when shards are deleted and some nodes still have writes queued in hinted handoff for those shards. Fixes #5610pull/5879/head
parent
cd4ef1856b
commit
43118ce78e
|
@ -23,15 +23,19 @@ It has these top-level messages:
|
|||
package internal
|
||||
|
||||
import proto "github.com/gogo/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
type WriteShardRequest struct {
|
||||
ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"`
|
||||
Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"`
|
||||
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
|
||||
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
|
||||
Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"`
|
||||
RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -53,9 +57,23 @@ func (m *WriteShardRequest) GetPoints() [][]byte {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *WriteShardRequest) GetDatabase() string {
|
||||
if m != nil && m.Database != nil {
|
||||
return *m.Database
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *WriteShardRequest) GetRetentionPolicy() string {
|
||||
if m != nil && m.RetentionPolicy != nil {
|
||||
return *m.RetentionPolicy
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type WriteShardResponse struct {
|
||||
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
|
||||
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
|
||||
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
|
||||
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -78,8 +96,8 @@ func (m *WriteShardResponse) GetMessage() string {
|
|||
}
|
||||
|
||||
type ExecuteStatementRequest struct {
|
||||
Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"`
|
||||
Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"`
|
||||
Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"`
|
||||
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -102,8 +120,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string {
|
|||
}
|
||||
|
||||
type ExecuteStatementResponse struct {
|
||||
Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"`
|
||||
Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"`
|
||||
Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"`
|
||||
Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -126,8 +144,8 @@ func (m *ExecuteStatementResponse) GetMessage() string {
|
|||
}
|
||||
|
||||
type CreateIteratorRequest struct {
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
|
||||
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
|
||||
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -150,7 +168,7 @@ func (m *CreateIteratorRequest) GetOpt() []byte {
|
|||
}
|
||||
|
||||
type CreateIteratorResponse struct {
|
||||
Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"`
|
||||
Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -166,8 +184,8 @@ func (m *CreateIteratorResponse) GetErr() string {
|
|||
}
|
||||
|
||||
type FieldDimensionsRequest struct {
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
|
||||
Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"`
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
|
||||
Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -190,9 +208,9 @@ func (m *FieldDimensionsRequest) GetSources() []byte {
|
|||
}
|
||||
|
||||
type FieldDimensionsResponse struct {
|
||||
Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"`
|
||||
Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"`
|
||||
Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"`
|
||||
Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"`
|
||||
Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"`
|
||||
Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -222,8 +240,8 @@ func (m *FieldDimensionsResponse) GetErr() string {
|
|||
}
|
||||
|
||||
type SeriesKeysRequest struct {
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"`
|
||||
Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"`
|
||||
ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"`
|
||||
Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -246,8 +264,8 @@ func (m *SeriesKeysRequest) GetOpt() []byte {
|
|||
}
|
||||
|
||||
type SeriesKeysResponse struct {
|
||||
SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"`
|
||||
Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"`
|
||||
SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"`
|
||||
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -270,4 +288,14 @@ func (m *SeriesKeysResponse) GetErr() string {
|
|||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest")
|
||||
proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse")
|
||||
proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest")
|
||||
proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse")
|
||||
proto.RegisterType((*CreateIteratorRequest)(nil), "internal.CreateIteratorRequest")
|
||||
proto.RegisterType((*CreateIteratorResponse)(nil), "internal.CreateIteratorResponse")
|
||||
proto.RegisterType((*FieldDimensionsRequest)(nil), "internal.FieldDimensionsRequest")
|
||||
proto.RegisterType((*FieldDimensionsResponse)(nil), "internal.FieldDimensionsResponse")
|
||||
proto.RegisterType((*SeriesKeysRequest)(nil), "internal.SeriesKeysRequest")
|
||||
proto.RegisterType((*SeriesKeysResponse)(nil), "internal.SeriesKeysResponse")
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@ package internal;
|
|||
message WriteShardRequest {
|
||||
required uint64 ShardID = 1;
|
||||
repeated bytes Points = 2;
|
||||
optional string Database = 3;
|
||||
optional string RetentionPolicy = 4;
|
||||
}
|
||||
|
||||
message WriteShardResponse {
|
||||
|
|
|
@ -202,6 +202,9 @@ type TSDBStore struct {
|
|||
}
|
||||
|
||||
func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error {
|
||||
if s.CreateShardFn == nil {
|
||||
return nil
|
||||
}
|
||||
return s.CreateShardFn(database, policy, shardID)
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,14 @@ func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id }
|
|||
// ShardID gets the ShardID
|
||||
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }
|
||||
|
||||
func (w *WriteShardRequest) SetDatabase(db string) { w.pb.Database = &db }
|
||||
|
||||
func (w *WriteShardRequest) SetRetentionPolicy(rp string) { w.pb.RetentionPolicy = &rp }
|
||||
|
||||
func (w *WriteShardRequest) Database() string { return w.pb.GetDatabase() }
|
||||
|
||||
func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionPolicy() }
|
||||
|
||||
// Points returns the time series Points
|
||||
func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() }
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// MaxMessageSize defines how large a message can be before we reject it
|
||||
|
@ -244,30 +243,19 @@ func (s *Service) processWriteShardRequest(buf []byte) error {
|
|||
s.statMap.Add(writeShardPointsReq, int64(len(points)))
|
||||
err := s.TSDBStore.WriteToShard(req.ShardID(), points)
|
||||
|
||||
// We may have received a write for a shard that we don't have locally because the
|
||||
// sending node may have just created the shard (via the metastore) and the write
|
||||
// arrived before the local store could create the shard. In this case, we need
|
||||
// to check the metastore to determine what database and retention policy this
|
||||
// shard should reside within.
|
||||
if err == tsdb.ErrShardNotFound {
|
||||
|
||||
// Query the metastore for the owner of this shard
|
||||
database, retentionPolicy, sgi := s.MetaClient.ShardOwner(req.ShardID())
|
||||
if sgi == nil {
|
||||
// If we can't find it, then we need to drop this request
|
||||
// as it is no longer valid. This could happen if writes were queued via
|
||||
// hinted handoff and delivered after a shard group was deleted.
|
||||
s.Logger.Printf("drop write request: shard=%d. shard group does not exist or was deleted", req.ShardID())
|
||||
return nil
|
||||
}
|
||||
|
||||
err = s.TSDBStore.CreateShard(database, retentionPolicy, req.ShardID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.TSDBStore.WriteToShard(req.ShardID(), points)
|
||||
db, rp := req.Database(), req.RetentionPolicy()
|
||||
if db == "" || rp == "" {
|
||||
s.Logger.Printf("drop write request: shard=%d. no database or rentention policy received", req.ShardID())
|
||||
return nil
|
||||
}
|
||||
|
||||
err = s.TSDBStore.CreateShard(req.Database(), req.RetentionPolicy(), req.ShardID())
|
||||
if err != nil {
|
||||
s.statMap.Add(writeShardFail, 1)
|
||||
return fmt.Errorf("create shard %d: %s", req.ShardID(), err)
|
||||
}
|
||||
|
||||
err = s.TSDBStore.WriteToShard(req.ShardID(), points)
|
||||
if err != nil {
|
||||
s.statMap.Add(writeShardFail, 1)
|
||||
return fmt.Errorf("write shard %d: %s", req.ShardID(), err)
|
||||
|
|
|
@ -23,10 +23,15 @@ func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (m *metaClient) ShardOwner(shardID uint64) (db, rp string, sgi *meta.ShardGroupInfo) {
|
||||
return "db", "rp", &meta.ShardGroupInfo{}
|
||||
}
|
||||
|
||||
type testService struct {
|
||||
nodeID uint64
|
||||
ln net.Listener
|
||||
muxln net.Listener
|
||||
nodeID uint64
|
||||
ln net.Listener
|
||||
muxln net.Listener
|
||||
responses chan *serviceResponse
|
||||
|
||||
TSDBStore TSDBStore
|
||||
}
|
||||
|
@ -46,6 +51,7 @@ func newTestWriteService(f func(shardID uint64, points []models.Point) error) te
|
|||
muxln: muxln,
|
||||
}
|
||||
s.TSDBStore.WriteToShardFn = f
|
||||
s.responses = make(chan *serviceResponse, 1024)
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -62,8 +68,8 @@ type serviceResponse struct {
|
|||
points []models.Point
|
||||
}
|
||||
|
||||
func writeShardSuccess(shardID uint64, points []models.Point) error {
|
||||
responses <- &serviceResponse{
|
||||
func (ts *testService) writeShardSuccess(shardID uint64, points []models.Point) error {
|
||||
ts.responses <- &serviceResponse{
|
||||
shardID: shardID,
|
||||
points: points,
|
||||
}
|
||||
|
@ -79,13 +85,11 @@ func writeShardSlow(shardID uint64, points []models.Point) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var responses = make(chan *serviceResponse, 1024)
|
||||
|
||||
func (testService) ResponseN(n int) ([]*serviceResponse, error) {
|
||||
func (ts *testService) ResponseN(n int) ([]*serviceResponse, error) {
|
||||
var a []*serviceResponse
|
||||
for {
|
||||
select {
|
||||
case r := <-responses:
|
||||
case r := <-ts.responses:
|
||||
a = append(a, r)
|
||||
if len(a) == n {
|
||||
return a, nil
|
||||
|
|
|
@ -34,6 +34,7 @@ type ShardWriter struct {
|
|||
|
||||
MetaClient interface {
|
||||
DataNode(id uint64) (ni *meta.NodeInfo, err error)
|
||||
ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,9 +62,20 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point)
|
|||
conn.Close() // return to pool
|
||||
}(conn)
|
||||
|
||||
// Determine the location of this shard and whether it still exists
|
||||
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
|
||||
if sgi == nil {
|
||||
// If we can't the shard group for this shard, then we need to drop this request
|
||||
// as it is no longer valid. This could happen if writes were queued via
|
||||
// hinted handoff and we're processing the queue after a shard group was deleted.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build write request.
|
||||
var request WriteShardRequest
|
||||
request.SetShardID(shardID)
|
||||
request.SetDatabase(db)
|
||||
request.SetRetentionPolicy(rp)
|
||||
request.AddPoints(points)
|
||||
|
||||
// Marshal into protocol buffers.
|
||||
|
|
|
@ -13,7 +13,8 @@ import (
|
|||
|
||||
// Ensure the shard writer can successfully write a single request.
|
||||
func TestShardWriter_WriteShard_Success(t *testing.T) {
|
||||
ts := newTestWriteService(writeShardSuccess)
|
||||
ts := newTestWriteService(nil)
|
||||
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
|
||||
s := cluster.NewService(cluster.Config{})
|
||||
s.Listener = ts.muxln
|
||||
s.TSDBStore = &ts.TSDBStore
|
||||
|
@ -60,7 +61,8 @@ func TestShardWriter_WriteShard_Success(t *testing.T) {
|
|||
|
||||
// Ensure the shard writer can successful write a multiple requests.
|
||||
func TestShardWriter_WriteShard_Multiple(t *testing.T) {
|
||||
ts := newTestWriteService(writeShardSuccess)
|
||||
ts := newTestWriteService(nil)
|
||||
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
|
||||
s := cluster.NewService(cluster.Config{})
|
||||
s.Listener = ts.muxln
|
||||
s.TSDBStore = &ts.TSDBStore
|
||||
|
@ -137,7 +139,8 @@ func TestShardWriter_WriteShard_Error(t *testing.T) {
|
|||
|
||||
// Ensure the shard writer returns an error when dialing times out.
|
||||
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
|
||||
ts := newTestWriteService(writeShardSuccess)
|
||||
ts := newTestWriteService(nil)
|
||||
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
|
||||
s := cluster.NewService(cluster.Config{})
|
||||
s.Listener = ts.muxln
|
||||
s.TSDBStore = &ts.TSDBStore
|
||||
|
|
Loading…
Reference in New Issue