added baseline backup/restore plumbing

pull/6502/head
Cory LaNou 2016-04-28 17:29:09 -07:00
parent 4484182e86
commit a3bf3e2ef1
11 changed files with 626 additions and 101 deletions

View File

@ -24,6 +24,10 @@ It has these top-level messages:
ExpandSourcesResponse
RemoteMonitorRequest
RemoteMonitorResponse
BackupShardRequest
BackupShardResponse
CopyShardRequest
CopyShardResponse
*/
package cluster
@ -36,10 +40,6 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
const _ = proto.GoGoProtoPackageIsVersion1
type WriteShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"`
@ -48,10 +48,9 @@ type WriteShardRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} }
func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) }
func (*WriteShardRequest) ProtoMessage() {}
func (*WriteShardRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{0} }
func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} }
func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) }
func (*WriteShardRequest) ProtoMessage() {}
func (m *WriteShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
@ -87,10 +86,9 @@ type WriteShardResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} }
func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) }
func (*WriteShardResponse) ProtoMessage() {}
func (*WriteShardResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{1} }
func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} }
func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) }
func (*WriteShardResponse) ProtoMessage() {}
func (m *WriteShardResponse) GetCode() int32 {
if m != nil && m.Code != nil {
@ -112,10 +110,9 @@ type ExecuteStatementRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} }
func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementRequest) ProtoMessage() {}
func (*ExecuteStatementRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{2} }
func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} }
func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementRequest) ProtoMessage() {}
func (m *ExecuteStatementRequest) GetStatement() string {
if m != nil && m.Statement != nil {
@ -137,10 +134,9 @@ type ExecuteStatementResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} }
func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementResponse) ProtoMessage() {}
func (*ExecuteStatementResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{3} }
func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} }
func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) }
func (*ExecuteStatementResponse) ProtoMessage() {}
func (m *ExecuteStatementResponse) GetCode() int32 {
if m != nil && m.Code != nil {
@ -162,10 +158,9 @@ type CreateIteratorRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} }
func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorRequest) ProtoMessage() {}
func (*CreateIteratorRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{4} }
func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} }
func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorRequest) ProtoMessage() {}
func (m *CreateIteratorRequest) GetShardIDs() []uint64 {
if m != nil {
@ -188,10 +183,9 @@ type CreateIteratorResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} }
func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorResponse) ProtoMessage() {}
func (*CreateIteratorResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{5} }
func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} }
func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) }
func (*CreateIteratorResponse) ProtoMessage() {}
func (m *CreateIteratorResponse) GetErr() string {
if m != nil && m.Err != nil {
@ -220,10 +214,9 @@ type IteratorStats struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{6} }
func (m *IteratorStats) Reset() { *m = IteratorStats{} }
func (m *IteratorStats) String() string { return proto.CompactTextString(m) }
func (*IteratorStats) ProtoMessage() {}
func (m *IteratorStats) GetSeriesN() int64 {
if m != nil && m.SeriesN != nil {
@ -245,10 +238,9 @@ type FieldDimensionsRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} }
func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsRequest) ProtoMessage() {}
func (*FieldDimensionsRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{7} }
func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} }
func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsRequest) ProtoMessage() {}
func (m *FieldDimensionsRequest) GetShardIDs() []uint64 {
if m != nil {
@ -271,10 +263,9 @@ type FieldDimensionsResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} }
func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsResponse) ProtoMessage() {}
func (*FieldDimensionsResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{8} }
func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} }
func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) }
func (*FieldDimensionsResponse) ProtoMessage() {}
func (m *FieldDimensionsResponse) GetFields() []string {
if m != nil {
@ -303,10 +294,9 @@ type SeriesKeysRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} }
func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysRequest) ProtoMessage() {}
func (*SeriesKeysRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{9} }
func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} }
func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysRequest) ProtoMessage() {}
func (m *SeriesKeysRequest) GetShardIDs() []uint64 {
if m != nil {
@ -328,10 +318,9 @@ type SeriesKeysResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} }
func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysResponse) ProtoMessage() {}
func (*SeriesKeysResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{10} }
func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} }
func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) }
func (*SeriesKeysResponse) ProtoMessage() {}
func (m *SeriesKeysResponse) GetSeriesList() []byte {
if m != nil {
@ -353,10 +342,9 @@ type ExpandSourcesRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} }
func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesRequest) ProtoMessage() {}
func (*ExpandSourcesRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{11} }
func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} }
func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesRequest) ProtoMessage() {}
func (m *ExpandSourcesRequest) GetShardIDs() []uint64 {
if m != nil {
@ -378,10 +366,9 @@ type ExpandSourcesResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} }
func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesResponse) ProtoMessage() {}
func (*ExpandSourcesResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{12} }
func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} }
func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) }
func (*ExpandSourcesResponse) ProtoMessage() {}
func (m *ExpandSourcesResponse) GetSources() []byte {
if m != nil {
@ -406,10 +393,9 @@ type RemoteMonitorRequest struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} }
func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorRequest) ProtoMessage() {}
func (*RemoteMonitorRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{13} }
func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} }
func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorRequest) ProtoMessage() {}
func (m *RemoteMonitorRequest) GetRemoteAddrs() []string {
if m != nil {
@ -451,10 +437,9 @@ type RemoteMonitorResponse struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} }
func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorResponse) ProtoMessage() {}
func (*RemoteMonitorResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{14} }
func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} }
func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) }
func (*RemoteMonitorResponse) ProtoMessage() {}
func (m *RemoteMonitorResponse) GetErr() string {
if m != nil && m.Err != nil {
@ -463,6 +448,110 @@ func (m *RemoteMonitorResponse) GetErr() string {
return ""
}
type BackupShardRequest struct {
ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"`
Since *int64 `protobuf:"varint,2,opt,name=Since" json:"Since,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *BackupShardRequest) Reset() { *m = BackupShardRequest{} }
func (m *BackupShardRequest) String() string { return proto.CompactTextString(m) }
func (*BackupShardRequest) ProtoMessage() {}
func (m *BackupShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *BackupShardRequest) GetSince() int64 {
if m != nil && m.Since != nil {
return *m.Since
}
return 0
}
type BackupShardResponse struct {
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *BackupShardResponse) Reset() { *m = BackupShardResponse{} }
func (m *BackupShardResponse) String() string { return proto.CompactTextString(m) }
func (*BackupShardResponse) ProtoMessage() {}
func (m *BackupShardResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
type CopyShardRequest struct {
Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"`
Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"`
Policy *string `protobuf:"bytes,3,req,name=Policy" json:"Policy,omitempty"`
ShardID *uint64 `protobuf:"varint,4,req,name=ShardID" json:"ShardID,omitempty"`
Since *int64 `protobuf:"varint,5,opt,name=Since" json:"Since,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CopyShardRequest) Reset() { *m = CopyShardRequest{} }
func (m *CopyShardRequest) String() string { return proto.CompactTextString(m) }
func (*CopyShardRequest) ProtoMessage() {}
func (m *CopyShardRequest) GetHost() string {
if m != nil && m.Host != nil {
return *m.Host
}
return ""
}
func (m *CopyShardRequest) GetDatabase() string {
if m != nil && m.Database != nil {
return *m.Database
}
return ""
}
func (m *CopyShardRequest) GetPolicy() string {
if m != nil && m.Policy != nil {
return *m.Policy
}
return ""
}
func (m *CopyShardRequest) GetShardID() uint64 {
if m != nil && m.ShardID != nil {
return *m.ShardID
}
return 0
}
func (m *CopyShardRequest) GetSince() int64 {
if m != nil && m.Since != nil {
return *m.Since
}
return 0
}
type CopyShardResponse struct {
Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *CopyShardResponse) Reset() { *m = CopyShardResponse{} }
func (m *CopyShardResponse) String() string { return proto.CompactTextString(m) }
func (*CopyShardResponse) ProtoMessage() {}
func (m *CopyShardResponse) GetErr() string {
if m != nil && m.Err != nil {
return *m.Err
}
return ""
}
func init() {
proto.RegisterType((*WriteShardRequest)(nil), "cluster.WriteShardRequest")
proto.RegisterType((*WriteShardResponse)(nil), "cluster.WriteShardResponse")
@ -479,39 +568,8 @@ func init() {
proto.RegisterType((*ExpandSourcesResponse)(nil), "cluster.ExpandSourcesResponse")
proto.RegisterType((*RemoteMonitorRequest)(nil), "cluster.RemoteMonitorRequest")
proto.RegisterType((*RemoteMonitorResponse)(nil), "cluster.RemoteMonitorResponse")
}
var fileDescriptorData = []byte{
// 486 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x53, 0x61, 0x6b, 0x13, 0x41,
0x10, 0xa5, 0xb9, 0xc4, 0x7a, 0x93, 0x68, 0xcd, 0xb6, 0x49, 0xee, 0x63, 0x38, 0x14, 0xf2, 0x29,
0x4a, 0xa5, 0x42, 0x11, 0x04, 0x49, 0x22, 0x54, 0x6d, 0x0c, 0x51, 0x11, 0xfc, 0x22, 0x6b, 0x6e,
0xd0, 0x85, 0xe4, 0xf6, 0xdc, 0xd9, 0x60, 0xf3, 0xef, 0x9d, 0xdd, 0xdb, 0xab, 0x69, 0xaa, 0x90,
0x7e, 0xbc, 0xb9, 0x7d, 0xf3, 0xe6, 0xbd, 0x79, 0x03, 0xc7, 0x2a, 0xb7, 0x68, 0x72, 0xb9, 0x7c,
0x9a, 0x49, 0x2b, 0x87, 0x85, 0xd1, 0x56, 0x8b, 0xc3, 0xc5, 0x72, 0x4d, 0x5c, 0x4e, 0xbf, 0x41,
0xfb, 0x8b, 0x51, 0x16, 0x3f, 0xfe, 0x94, 0x26, 0x9b, 0xe3, 0xaf, 0x35, 0x92, 0x15, 0x47, 0x70,
0xe8, 0xbf, 0x2f, 0xc6, 0xc9, 0x41, 0xbf, 0x36, 0xa8, 0x8b, 0x87, 0x70, 0x6f, 0xa6, 0xb9, 0x0f,
0x25, 0xb5, 0x7e, 0x34, 0x68, 0x89, 0x47, 0x70, 0x7f, 0xcc, 0xcd, 0xbe, 0x4b, 0xc2, 0x24, 0xea,
0x1f, 0x0c, 0x62, 0xd1, 0x83, 0xa3, 0x39, 0x5a, 0xcc, 0xad, 0xd2, 0xf9, 0x4c, 0x2f, 0xd5, 0x62,
0x93, 0xd4, 0xdd, 0x8f, 0xf4, 0x39, 0x88, 0x6d, 0x02, 0x2a, 0x74, 0x4e, 0x28, 0x5a, 0x50, 0x1f,
0xe9, 0x0c, 0x7d, 0xfb, 0x86, 0xe3, 0xbb, 0x44, 0x22, 0xf9, 0x03, 0xb9, 0xbf, 0x03, 0xbd, 0x82,
0xde, 0xe4, 0x0a, 0x17, 0x6b, 0x86, 0x59, 0x69, 0x71, 0xc5, 0x7d, 0xab, 0xd9, 0xda, 0x10, 0x5f,
0xd7, 0x3c, 0x3c, 0xbe, 0x31, 0x4d, 0xcd, 0x55, 0xd2, 0x73, 0x48, 0x6e, 0xe3, 0xf7, 0xa3, 0x7e,
0x01, 0x9d, 0x91, 0x41, 0x06, 0x5d, 0xb0, 0x3d, 0xd2, 0x6a, 0x53, 0x11, 0x33, 0x4b, 0x30, 0x85,
0x18, 0x1b, 0xb1, 0x2b, 0x4d, 0x88, 0x3e, 0x14, 0xd6, 0x53, 0xb6, 0xd2, 0xaf, 0xd0, 0xdd, 0xc5,
0x05, 0x42, 0x7e, 0x36, 0x31, 0x86, 0x31, 0xce, 0x27, 0x66, 0xff, 0xb4, 0x29, 0xca, 0x39, 0x1b,
0xe2, 0x09, 0x34, 0xdc, 0x80, 0xe4, 0x4d, 0x6c, 0x9e, 0x76, 0x87, 0x61, 0x2d, 0xc3, 0xaa, 0x89,
0xff, 0x9b, 0x3e, 0x83, 0x07, 0x37, 0x0a, 0x7e, 0x41, 0x68, 0x14, 0xd2, 0xd4, 0xb7, 0x8d, 0xae,
0x17, 0x34, 0xf5, 0x2a, 0xa2, 0xf4, 0x25, 0x74, 0xdf, 0x28, 0x5c, 0x66, 0x63, 0xc5, 0xda, 0x89,
0x77, 0x42, 0xff, 0x97, 0xe1, 0x9a, 0xe9, 0xb5, 0x59, 0x20, 0x05, 0x29, 0x6f, 0xa1, 0x77, 0x0b,
0x1c, 0xb4, 0x30, 0x8f, 0xff, 0x55, 0x62, 0x63, 0x21, 0x00, 0xfe, 0xbe, 0xf2, 0xe1, 0x88, 0x2b,
0xbd, 0x3e, 0x17, 0xe9, 0x29, 0xb4, 0xcb, 0x49, 0xdf, 0xe1, 0x86, 0xf6, 0xb4, 0xf2, 0x0c, 0xc4,
0x36, 0x26, 0x50, 0x33, 0x55, 0x59, 0x7d, 0xaf, 0xc8, 0x7a, 0xd9, 0xad, 0x8a, 0xaa, 0xdc, 0xdc,
0x39, 0x9c, 0x4c, 0xae, 0x0a, 0x99, 0x67, 0x41, 0xcd, 0x1d, 0x14, 0x9f, 0x41, 0x67, 0x07, 0x1a,
0x48, 0xb7, 0x5e, 0xba, 0xbc, 0xec, 0x30, 0x1a, 0x38, 0x99, 0xe3, 0x4a, 0x5b, 0xbc, 0xd4, 0xb9,
0xda, 0x8a, 0xca, 0x31, 0x34, 0xcb, 0xfa, 0xeb, 0x2c, 0x33, 0x95, 0x55, 0x6c, 0xdd, 0x94, 0x73,
0xc7, 0x37, 0x55, 0xab, 0x52, 0xfb, 0x99, 0xdc, 0x61, 0xae, 0xdc, 0x0d, 0x85, 0xca, 0x4c, 0x12,
0xfd, 0xd6, 0x26, 0xe3, 0xe3, 0x71, 0x15, 0x0e, 0xfb, 0xa8, 0x4c, 0x04, 0xc3, 0x1a, 0xee, 0x14,
0xd3, 0xc7, 0xd0, 0xd9, 0xe1, 0xfc, 0x47, 0xcc, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0x76, 0x81,
0x9b, 0x7c, 0xf5, 0x03, 0x00, 0x00,
proto.RegisterType((*BackupShardRequest)(nil), "cluster.BackupShardRequest")
proto.RegisterType((*BackupShardResponse)(nil), "cluster.BackupShardResponse")
proto.RegisterType((*CopyShardRequest)(nil), "cluster.CopyShardRequest")
proto.RegisterType((*CopyShardResponse)(nil), "cluster.CopyShardResponse")
}

View File

@ -81,3 +81,24 @@ message RemoteMonitorRequest {
message RemoteMonitorResponse {
optional string Err = 1;
}
message BackupShardRequest {
required uint64 ShardID = 1;
optional int64 Since = 2;
}
message BackupShardResponse {
optional string Err = 2;
}
message CopyShardRequest {
required string Host = 1;
required string Database = 2;
required string Policy = 3;
required uint64 ShardID = 4;
optional int64 Since = 5;
}
message CopyShardResponse {
optional string Err = 2;
}

View File

@ -556,3 +556,89 @@ func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error {
}
return nil
}
// BackupShardRequest represents a request to stream a backup of a single shard.
type BackupShardRequest struct {
ShardID uint64
Since time.Time
}
// MarshalBinary encodes r to a binary format.
func (r *BackupShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.BackupShardRequest{
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}
// UnmarshalBinary decodes data into r.
func (r *BackupShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.BackupShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}
// CopyShardRequest represents a request to copy a shard from another host.
type CopyShardRequest struct {
Host string
Database string
Policy string
ShardID uint64
Since time.Time
}
// MarshalBinary encodes r to a binary format.
func (r *CopyShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.CopyShardRequest{
Host: proto.String(r.Host),
Database: proto.String(r.Database),
Policy: proto.String(r.Policy),
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}
// UnmarshalBinary decodes data into r.
func (r *CopyShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
r.Host = pb.GetHost()
r.Database = pb.GetDatabase()
r.Policy = pb.GetPolicy()
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}
// CopyShardResponse represents a response from a shard Copy.
type CopyShardResponse struct {
Err error
}
func (r *CopyShardResponse) MarshalBinary() ([]byte, error) {
var pb internal.CopyShardResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}
func (r *CopyShardResponse) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardResponse
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}

View File

@ -12,6 +12,7 @@ import (
"os"
"strings"
"sync"
"time"
"github.com/influxdata/influxdb/monitor"
@ -40,6 +41,15 @@ const (
seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"
expandSourcesReq = "expandSourcesReq"
expandSourcesReqexpandSourcesResp = "expandSourcesResp"
backupShardReq = "backupShardReq"
backupShardReqbackupShardResp = "backupShardResp"
copyShardReq = "copyShardReq"
copyShardResp = "copyShardResp"
)
const (
@ -60,8 +70,20 @@ const (
remoteMonitorRequestMessage
remoteMonitorResponseMessage
expandSourcesRequestMessage
expandSourcesResponseMessage
backupShardRequestMessage
backupShardResponseMessage
copyShardRequestMessage
copyShardResponseMessage
)
// BackupTimeout is the time before a connection times out when performing a backup.
const BackupTimeout = 30 * time.Second
// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
@ -227,6 +249,18 @@ func (s *Service) handleConn(conn net.Conn) {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeRemoteMonitorResponse(conn, err)
case expandSourcesRequestMessage:
s.statMap.Add(expandSourcesReq, 1)
s.processExpandSourcesRequest(conn)
return
case backupShardRequestMessage:
s.statMap.Add(backupShardReq, 1)
s.processBackupShardRequest(conn)
return
case copyShardRequestMessage:
s.statMap.Add(copyShardReq, 1)
s.processCopyShardRequest(conn)
return
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
@ -549,6 +583,139 @@ func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) {
}
}
func (s *Service) processExpandSourcesRequest(conn net.Conn) {
var sources influxql.Sources
if err := func() error {
// Parse request.
var req ExpandSourcesRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}
// Expand sources from all shards.
a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources)
if err != nil {
return err
}
sources = a
return nil
}(); err != nil {
s.Logger.Printf("error reading ExpandSources request: %s", err)
EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{
Sources: sources,
}); err != nil {
s.Logger.Printf("error writing ExpandSources response: %s", err)
return
}
}
func (s *Service) processBackupShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req BackupShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Backup from local shard to the connection.
if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil {
return err
}
return nil
}(); err != nil {
s.Logger.Printf("error processing BackupShardRequest: %s", err)
return
}
}
func (s *Service) processCopyShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req CopyShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}
// Begin streaming backup from remote server.
r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since)
if err != nil {
return err
}
defer r.Close()
// Create shard if it doesn't exist.
if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil {
return err
}
// Restore to local shard.
if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil {
return err
}
return nil
}(); err != nil {
s.Logger.Printf("error reading CopyShard request: %s", err)
EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err})
return
}
// Encode success response.
if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil {
s.Logger.Printf("error writing CopyShard response: %s", err)
return
}
}
// backupRemoteShard connects to a cluster service on a remote host and streams a shard.
func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(BackupTimeout))
if err := func() error {
// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
return err
}
// Write backup request.
if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{
ShardID: shardID,
Since: since,
}); err != nil {
return fmt.Errorf("error writing BackupShardRequest: %s", err)
}
return nil
}(); err != nil {
conn.Close()
return nil, err
}
// Return the connection which will stream the rest of the backup.
return conn, nil
}
// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
typ, err := ReadType(r)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"sort"
"strconv"
"time"
@ -872,12 +873,16 @@ type TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error
RestoreShard(id uint64, r io.Reader) error
BackupShard(id uint64, since time.Time, w io.Writer) error
DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}
type LocalTSDBStore struct {

View File

@ -207,6 +207,9 @@ type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error
RestoreShardFn func(id uint64, r io.Reader) error
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
@ -226,6 +229,14 @@ func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error {
return s.WriteToShardFn(shardID, points)
}
func (s *TSDBStore) RestoreShard(id uint64, r io.Reader) error {
return s.RestoreShardFn(id, r)
}
func (s *TSDBStore) BackupShard(id uint64, since time.Time, w io.Writer) error {
return s.BackupShardFn(id, since, w)
}
func (s *TSDBStore) DeleteDatabase(name string) error {
return s.DeleteDatabaseFn(name)
}

View File

@ -31,6 +31,9 @@ type Engine interface {
SetLogOutput(io.Writer)
LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error
Backup(w io.Writer, basePath string, since time.Time) error
Restore(r io.Reader, basePath string) error
CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
WritePoints(points []models.Point) error
@ -45,8 +48,6 @@ type Engine interface {
Format() EngineFormat
io.WriterTo
Backup(w io.Writer, basePath string, since time.Time) error
}
// EngineFormat represents the format for an engine.

View File

@ -308,6 +308,75 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar
return err
}
// Restore will read a tar archive generated by Backup().
// Only files that match basePath will be copied into the directory. This obtains
// a write lock so no operations can be performed while restoring.
func (e *Engine) Restore(r io.Reader, basePath string) error {
// Copy files from archive while under lock to prevent reopening.
if err := func() error {
e.mu.Lock()
defer e.mu.Unlock()
tr := tar.NewReader(r)
for {
if err := e.readFileFromBackup(tr, basePath); err == io.EOF {
break
} else if err != nil {
return err
}
}
return nil
}(); err != nil {
if err := e.Open(); err != nil {
log.Printf("error reopening engine after restore: %s", err)
}
return err
}
return nil
}
// readFileFromBackup copies the next file from the archive into the shard.
// The file is skipped if it does not have a matching shardRelativePath prefix.
func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) error {
// Read next archive file.
hdr, err := tr.Next()
if err != nil {
return err
}
// Skip file if it does not have a matching prefix.
if !filepath.HasPrefix(hdr.Name, shardRelativePath) {
return nil
}
path, err := filepath.Rel(shardRelativePath, hdr.Name)
if err != nil {
return err
}
// Create new file on disk.
f, err := os.OpenFile(filepath.Join(e.path, path), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close()
// Copy from archive to the file.
if _, err := io.CopyN(f, tr, hdr.Size); err != nil {
return err
}
// Sync to disk & close.
if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}
return nil
}
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {

View File

@ -539,6 +539,32 @@ func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error
return expanded, nil
}
// Restore restores data to the underlying engine for the shard.
// The shard is reopened after restore.
func (s *Shard) Restore(r io.Reader, basePath string) error {
s.mu.Lock()
// Restore to engine.
if err := s.engine.Restore(r, basePath); err != nil {
s.mu.Unlock()
return err
}
s.mu.Unlock()
// Close shard.
if err := s.Close(); err != nil {
return err
}
// Reopen engine.
if err := s.Open(); err != nil {
return err
}
return nil
}
// Shards represents a sortable list of shards.
type Shards []*Shard

View File

@ -521,6 +521,22 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
return shard.engine.Backup(w, path, since)
}
// RestoreShard restores a backup from r to a given shard.
// This will only overwrite files included in the backup.
func (s *Store) RestoreShard(id uint64, r io.Reader) error {
shard := s.Shard(id)
if shard == nil {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
return shard.Restore(r, path)
}
// ShardRelativePath will return the relative path to the shard. i.e. <database>/<retention>/<id>
func (s *Store) ShardRelativePath(id uint64) (string, error) {
shard := s.Shard(id)

View File

@ -1,6 +1,7 @@
package tsdb_test
import (
"bytes"
"fmt"
"io/ioutil"
"os"
@ -276,6 +277,70 @@ func TestShards_CreateIterator(t *testing.T) {
}
}
// Ensure the store can backup a shard and another store can restore it.
func TestStore_BackupRestoreShard(t *testing.T) {
s0, s1 := MustOpenStore(), MustOpenStore()
defer s0.Close()
defer s1.Close()
// Create shard with data.
s0.MustCreateShardWithData("db0", "rp0", 100,
`cpu value=1 0`,
`cpu value=2 10`,
`cpu value=3 20`,
)
// Backup shard to a buffer.
var buf bytes.Buffer
if err := s0.BackupShard(100, time.Time{}, &buf); err != nil {
t.Fatal(err)
}
// Create the shard on the other store and restore from buffer.
if err := s1.CreateShard("db0", "rp0", 100); err != nil {
t.Fatal(err)
}
if err := s1.RestoreShard(100, &buf); err != nil {
t.Fatal(err)
}
// Read data from
itr, err := s1.Shard(100).CreateIterator(influxql.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Ascending: true,
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
if err != nil {
t.Fatal(err)
}
fitr := itr.(influxql.FloatIterator)
// Read values from iterator. The host=serverA points should come first.
p, e := fitr.Next()
if e != nil {
t.Fatal(e)
}
if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) {
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
}
p, e = fitr.Next()
if e != nil {
t.Fatal(e)
}
if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) {
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
}
p, e = fitr.Next()
if e != nil {
t.Fatal(e)
}
if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) {
t.Fatalf("unexpected point(2): %s", spew.Sdump(p))
}
}
func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) }
func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {