From a3bf3e2ef1db938290f50360775433dcce1995da Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Thu, 28 Apr 2016 17:29:09 -0700 Subject: [PATCH] added baseline backup/restore plumbing --- cluster/internal/data.pb.go | 256 ++++++++++++++++++----------- cluster/internal/data.proto | 21 +++ cluster/rpc.go | 86 ++++++++++ cluster/service.go | 167 +++++++++++++++++++ cluster/statement_executor.go | 5 + cluster/statement_executor_test.go | 11 ++ tsdb/engine.go | 5 +- tsdb/engine/tsm1/engine.go | 69 ++++++++ tsdb/shard.go | 26 +++ tsdb/store.go | 16 ++ tsdb/store_test.go | 65 ++++++++ 11 files changed, 626 insertions(+), 101 deletions(-) diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go index 21a34c3193..4fd41e029a 100644 --- a/cluster/internal/data.pb.go +++ b/cluster/internal/data.pb.go @@ -24,6 +24,10 @@ It has these top-level messages: ExpandSourcesResponse RemoteMonitorRequest RemoteMonitorResponse + BackupShardRequest + BackupShardResponse + CopyShardRequest + CopyShardResponse */ package cluster @@ -36,10 +40,6 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -const _ = proto.GoGoProtoPackageIsVersion1 - type WriteShardRequest struct { ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` @@ -48,10 +48,9 @@ type WriteShardRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} } -func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) } -func (*WriteShardRequest) ProtoMessage() {} -func (*WriteShardRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{0} } +func (m *WriteShardRequest) Reset() { *m = WriteShardRequest{} } +func (m *WriteShardRequest) String() string { return proto.CompactTextString(m) } +func (*WriteShardRequest) ProtoMessage() {} func (m *WriteShardRequest) GetShardID() uint64 { if m != nil && m.ShardID != nil { @@ -87,10 +86,9 @@ type WriteShardResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} } -func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) } -func (*WriteShardResponse) ProtoMessage() {} -func (*WriteShardResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{1} } +func (m *WriteShardResponse) Reset() { *m = WriteShardResponse{} } +func (m *WriteShardResponse) String() string { return proto.CompactTextString(m) } +func (*WriteShardResponse) ProtoMessage() {} func (m *WriteShardResponse) GetCode() int32 { if m != nil && m.Code != nil { @@ -112,10 +110,9 @@ type ExecuteStatementRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} } -func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) } -func (*ExecuteStatementRequest) ProtoMessage() {} -func (*ExecuteStatementRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{2} } +func (m *ExecuteStatementRequest) Reset() { *m = ExecuteStatementRequest{} } +func (m *ExecuteStatementRequest) String() string { return proto.CompactTextString(m) } +func (*ExecuteStatementRequest) ProtoMessage() {} func (m *ExecuteStatementRequest) GetStatement() string { if m != nil && m.Statement != nil { @@ -137,10 +134,9 @@ type ExecuteStatementResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} } -func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) } -func (*ExecuteStatementResponse) ProtoMessage() {} -func (*ExecuteStatementResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{3} } +func (m *ExecuteStatementResponse) Reset() { *m = ExecuteStatementResponse{} } +func (m *ExecuteStatementResponse) String() string { return proto.CompactTextString(m) } +func (*ExecuteStatementResponse) ProtoMessage() {} func (m *ExecuteStatementResponse) GetCode() int32 { if m != nil && m.Code != nil { @@ -162,10 +158,9 @@ type CreateIteratorRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} } -func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) } -func (*CreateIteratorRequest) ProtoMessage() {} -func (*CreateIteratorRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{4} } +func (m *CreateIteratorRequest) Reset() { *m = CreateIteratorRequest{} } +func (m *CreateIteratorRequest) String() string { return proto.CompactTextString(m) } +func (*CreateIteratorRequest) ProtoMessage() {} func (m *CreateIteratorRequest) GetShardIDs() []uint64 { if m != nil { @@ -188,10 +183,9 @@ type CreateIteratorResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} } -func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) } -func (*CreateIteratorResponse) ProtoMessage() {} -func (*CreateIteratorResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{5} } +func (m *CreateIteratorResponse) Reset() { *m = CreateIteratorResponse{} } +func (m *CreateIteratorResponse) String() string { return proto.CompactTextString(m) } +func (*CreateIteratorResponse) ProtoMessage() {} func (m *CreateIteratorResponse) GetErr() string { if m != nil && m.Err != nil { @@ -220,10 +214,9 @@ type IteratorStats struct { XXX_unrecognized []byte `json:"-"` } -func (m *IteratorStats) Reset() { *m = IteratorStats{} } -func (m *IteratorStats) String() string { return proto.CompactTextString(m) } -func (*IteratorStats) ProtoMessage() {} -func (*IteratorStats) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{6} } +func (m *IteratorStats) Reset() { *m = IteratorStats{} } +func (m *IteratorStats) String() string { return proto.CompactTextString(m) } +func (*IteratorStats) ProtoMessage() {} func (m *IteratorStats) GetSeriesN() int64 { if m != nil && m.SeriesN != nil { @@ -245,10 +238,9 @@ type FieldDimensionsRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} } -func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) } -func (*FieldDimensionsRequest) ProtoMessage() {} -func (*FieldDimensionsRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{7} } +func (m *FieldDimensionsRequest) Reset() { *m = FieldDimensionsRequest{} } +func (m *FieldDimensionsRequest) String() string { return proto.CompactTextString(m) } +func (*FieldDimensionsRequest) ProtoMessage() {} func (m *FieldDimensionsRequest) GetShardIDs() []uint64 { if m != nil { @@ -271,10 +263,9 @@ type FieldDimensionsResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} } -func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) } -func (*FieldDimensionsResponse) ProtoMessage() {} -func (*FieldDimensionsResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{8} } +func (m *FieldDimensionsResponse) Reset() { *m = FieldDimensionsResponse{} } +func (m *FieldDimensionsResponse) String() string { return proto.CompactTextString(m) } +func (*FieldDimensionsResponse) ProtoMessage() {} func (m *FieldDimensionsResponse) GetFields() []string { if m != nil { @@ -303,10 +294,9 @@ type SeriesKeysRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} } -func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) } -func (*SeriesKeysRequest) ProtoMessage() {} -func (*SeriesKeysRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{9} } +func (m *SeriesKeysRequest) Reset() { *m = SeriesKeysRequest{} } +func (m *SeriesKeysRequest) String() string { return proto.CompactTextString(m) } +func (*SeriesKeysRequest) ProtoMessage() {} func (m *SeriesKeysRequest) GetShardIDs() []uint64 { if m != nil { @@ -328,10 +318,9 @@ type SeriesKeysResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} } -func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) } -func (*SeriesKeysResponse) ProtoMessage() {} -func (*SeriesKeysResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{10} } +func (m *SeriesKeysResponse) Reset() { *m = SeriesKeysResponse{} } +func (m *SeriesKeysResponse) String() string { return proto.CompactTextString(m) } +func (*SeriesKeysResponse) ProtoMessage() {} func (m *SeriesKeysResponse) GetSeriesList() []byte { if m != nil { @@ -353,10 +342,9 @@ type ExpandSourcesRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} } -func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) } -func (*ExpandSourcesRequest) ProtoMessage() {} -func (*ExpandSourcesRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{11} } +func (m *ExpandSourcesRequest) Reset() { *m = ExpandSourcesRequest{} } +func (m *ExpandSourcesRequest) String() string { return proto.CompactTextString(m) } +func (*ExpandSourcesRequest) ProtoMessage() {} func (m *ExpandSourcesRequest) GetShardIDs() []uint64 { if m != nil { @@ -378,10 +366,9 @@ type ExpandSourcesResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} } -func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) } -func (*ExpandSourcesResponse) ProtoMessage() {} -func (*ExpandSourcesResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{12} } +func (m *ExpandSourcesResponse) Reset() { *m = ExpandSourcesResponse{} } +func (m *ExpandSourcesResponse) String() string { return proto.CompactTextString(m) } +func (*ExpandSourcesResponse) ProtoMessage() {} func (m *ExpandSourcesResponse) GetSources() []byte { if m != nil { @@ -406,10 +393,9 @@ type RemoteMonitorRequest struct { XXX_unrecognized []byte `json:"-"` } -func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} } -func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) } -func (*RemoteMonitorRequest) ProtoMessage() {} -func (*RemoteMonitorRequest) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{13} } +func (m *RemoteMonitorRequest) Reset() { *m = RemoteMonitorRequest{} } +func (m *RemoteMonitorRequest) String() string { return proto.CompactTextString(m) } +func (*RemoteMonitorRequest) ProtoMessage() {} func (m *RemoteMonitorRequest) GetRemoteAddrs() []string { if m != nil { @@ -451,10 +437,9 @@ type RemoteMonitorResponse struct { XXX_unrecognized []byte `json:"-"` } -func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} } -func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) } -func (*RemoteMonitorResponse) ProtoMessage() {} -func (*RemoteMonitorResponse) Descriptor() ([]byte, []int) { return fileDescriptorData, []int{14} } +func (m *RemoteMonitorResponse) Reset() { *m = RemoteMonitorResponse{} } +func (m *RemoteMonitorResponse) String() string { return proto.CompactTextString(m) } +func (*RemoteMonitorResponse) ProtoMessage() {} func (m *RemoteMonitorResponse) GetErr() string { if m != nil && m.Err != nil { @@ -463,6 +448,110 @@ func (m *RemoteMonitorResponse) GetErr() string { return "" } +type BackupShardRequest struct { + ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` + Since *int64 `protobuf:"varint,2,opt,name=Since" json:"Since,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *BackupShardRequest) Reset() { *m = BackupShardRequest{} } +func (m *BackupShardRequest) String() string { return proto.CompactTextString(m) } +func (*BackupShardRequest) ProtoMessage() {} + +func (m *BackupShardRequest) GetShardID() uint64 { + if m != nil && m.ShardID != nil { + return *m.ShardID + } + return 0 +} + +func (m *BackupShardRequest) GetSince() int64 { + if m != nil && m.Since != nil { + return *m.Since + } + return 0 +} + +type BackupShardResponse struct { + Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *BackupShardResponse) Reset() { *m = BackupShardResponse{} } +func (m *BackupShardResponse) String() string { return proto.CompactTextString(m) } +func (*BackupShardResponse) ProtoMessage() {} + +func (m *BackupShardResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + +type CopyShardRequest struct { + Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,3,req,name=Policy" json:"Policy,omitempty"` + ShardID *uint64 `protobuf:"varint,4,req,name=ShardID" json:"ShardID,omitempty"` + Since *int64 `protobuf:"varint,5,opt,name=Since" json:"Since,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CopyShardRequest) Reset() { *m = CopyShardRequest{} } +func (m *CopyShardRequest) String() string { return proto.CompactTextString(m) } +func (*CopyShardRequest) ProtoMessage() {} + +func (m *CopyShardRequest) GetHost() string { + if m != nil && m.Host != nil { + return *m.Host + } + return "" +} + +func (m *CopyShardRequest) GetDatabase() string { + if m != nil && m.Database != nil { + return *m.Database + } + return "" +} + +func (m *CopyShardRequest) GetPolicy() string { + if m != nil && m.Policy != nil { + return *m.Policy + } + return "" +} + +func (m *CopyShardRequest) GetShardID() uint64 { + if m != nil && m.ShardID != nil { + return *m.ShardID + } + return 0 +} + +func (m *CopyShardRequest) GetSince() int64 { + if m != nil && m.Since != nil { + return *m.Since + } + return 0 +} + +type CopyShardResponse struct { + Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *CopyShardResponse) Reset() { *m = CopyShardResponse{} } +func (m *CopyShardResponse) String() string { return proto.CompactTextString(m) } +func (*CopyShardResponse) ProtoMessage() {} + +func (m *CopyShardResponse) GetErr() string { + if m != nil && m.Err != nil { + return *m.Err + } + return "" +} + func init() { proto.RegisterType((*WriteShardRequest)(nil), "cluster.WriteShardRequest") proto.RegisterType((*WriteShardResponse)(nil), "cluster.WriteShardResponse") @@ -479,39 +568,8 @@ func init() { proto.RegisterType((*ExpandSourcesResponse)(nil), "cluster.ExpandSourcesResponse") proto.RegisterType((*RemoteMonitorRequest)(nil), "cluster.RemoteMonitorRequest") proto.RegisterType((*RemoteMonitorResponse)(nil), "cluster.RemoteMonitorResponse") -} - -var fileDescriptorData = []byte{ - // 486 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x53, 0x61, 0x6b, 0x13, 0x41, - 0x10, 0xa5, 0xb9, 0xc4, 0x7a, 0x93, 0x68, 0xcd, 0xb6, 0x49, 0xee, 0x63, 0x38, 0x14, 0xf2, 0x29, - 0x4a, 0xa5, 0x42, 0x11, 0x04, 0x49, 0x22, 0x54, 0x6d, 0x0c, 0x51, 0x11, 0xfc, 0x22, 0x6b, 0x6e, - 0xd0, 0x85, 0xe4, 0xf6, 0xdc, 0xd9, 0x60, 0xf3, 0xef, 0x9d, 0xdd, 0xdb, 0xab, 0x69, 0xaa, 0x90, - 0x7e, 0xbc, 0xb9, 0x7d, 0xf3, 0xe6, 0xbd, 0x79, 0x03, 0xc7, 0x2a, 0xb7, 0x68, 0x72, 0xb9, 0x7c, - 0x9a, 0x49, 0x2b, 0x87, 0x85, 0xd1, 0x56, 0x8b, 0xc3, 0xc5, 0x72, 0x4d, 0x5c, 0x4e, 0xbf, 0x41, - 0xfb, 0x8b, 0x51, 0x16, 0x3f, 0xfe, 0x94, 0x26, 0x9b, 0xe3, 0xaf, 0x35, 0x92, 0x15, 0x47, 0x70, - 0xe8, 0xbf, 0x2f, 0xc6, 0xc9, 0x41, 0xbf, 0x36, 0xa8, 0x8b, 0x87, 0x70, 0x6f, 0xa6, 0xb9, 0x0f, - 0x25, 0xb5, 0x7e, 0x34, 0x68, 0x89, 0x47, 0x70, 0x7f, 0xcc, 0xcd, 0xbe, 0x4b, 0xc2, 0x24, 0xea, - 0x1f, 0x0c, 0x62, 0xd1, 0x83, 0xa3, 0x39, 0x5a, 0xcc, 0xad, 0xd2, 0xf9, 0x4c, 0x2f, 0xd5, 0x62, - 0x93, 0xd4, 0xdd, 0x8f, 0xf4, 0x39, 0x88, 0x6d, 0x02, 0x2a, 0x74, 0x4e, 0x28, 0x5a, 0x50, 0x1f, - 0xe9, 0x0c, 0x7d, 0xfb, 0x86, 0xe3, 0xbb, 0x44, 0x22, 0xf9, 0x03, 0xb9, 0xbf, 0x03, 0xbd, 0x82, - 0xde, 0xe4, 0x0a, 0x17, 0x6b, 0x86, 0x59, 0x69, 0x71, 0xc5, 0x7d, 0xab, 0xd9, 0xda, 0x10, 0x5f, - 0xd7, 0x3c, 0x3c, 0xbe, 0x31, 0x4d, 0xcd, 0x55, 0xd2, 0x73, 0x48, 0x6e, 0xe3, 0xf7, 0xa3, 0x7e, - 0x01, 0x9d, 0x91, 0x41, 0x06, 0x5d, 0xb0, 0x3d, 0xd2, 0x6a, 0x53, 0x11, 0x33, 0x4b, 0x30, 0x85, - 0x18, 0x1b, 0xb1, 0x2b, 0x4d, 0x88, 0x3e, 0x14, 0xd6, 0x53, 0xb6, 0xd2, 0xaf, 0xd0, 0xdd, 0xc5, - 0x05, 0x42, 0x7e, 0x36, 0x31, 0x86, 0x31, 0xce, 0x27, 0x66, 0xff, 0xb4, 0x29, 0xca, 0x39, 0x1b, - 0xe2, 0x09, 0x34, 0xdc, 0x80, 0xe4, 0x4d, 0x6c, 0x9e, 0x76, 0x87, 0x61, 0x2d, 0xc3, 0xaa, 0x89, - 0xff, 0x9b, 0x3e, 0x83, 0x07, 0x37, 0x0a, 0x7e, 0x41, 0x68, 0x14, 0xd2, 0xd4, 0xb7, 0x8d, 0xae, - 0x17, 0x34, 0xf5, 0x2a, 0xa2, 0xf4, 0x25, 0x74, 0xdf, 0x28, 0x5c, 0x66, 0x63, 0xc5, 0xda, 0x89, - 0x77, 0x42, 0xff, 0x97, 0xe1, 0x9a, 0xe9, 0xb5, 0x59, 0x20, 0x05, 0x29, 0x6f, 0xa1, 0x77, 0x0b, - 0x1c, 0xb4, 0x30, 0x8f, 0xff, 0x55, 0x62, 0x63, 0x21, 0x00, 0xfe, 0xbe, 0xf2, 0xe1, 0x88, 0x2b, - 0xbd, 0x3e, 0x17, 0xe9, 0x29, 0xb4, 0xcb, 0x49, 0xdf, 0xe1, 0x86, 0xf6, 0xb4, 0xf2, 0x0c, 0xc4, - 0x36, 0x26, 0x50, 0x33, 0x55, 0x59, 0x7d, 0xaf, 0xc8, 0x7a, 0xd9, 0xad, 0x8a, 0xaa, 0xdc, 0xdc, - 0x39, 0x9c, 0x4c, 0xae, 0x0a, 0x99, 0x67, 0x41, 0xcd, 0x1d, 0x14, 0x9f, 0x41, 0x67, 0x07, 0x1a, - 0x48, 0xb7, 0x5e, 0xba, 0xbc, 0xec, 0x30, 0x1a, 0x38, 0x99, 0xe3, 0x4a, 0x5b, 0xbc, 0xd4, 0xb9, - 0xda, 0x8a, 0xca, 0x31, 0x34, 0xcb, 0xfa, 0xeb, 0x2c, 0x33, 0x95, 0x55, 0x6c, 0xdd, 0x94, 0x73, - 0xc7, 0x37, 0x55, 0xab, 0x52, 0xfb, 0x99, 0xdc, 0x61, 0xae, 0xdc, 0x0d, 0x85, 0xca, 0x4c, 0x12, - 0xfd, 0xd6, 0x26, 0xe3, 0xe3, 0x71, 0x15, 0x0e, 0xfb, 0xa8, 0x4c, 0x04, 0xc3, 0x1a, 0xee, 0x14, - 0xd3, 0xc7, 0xd0, 0xd9, 0xe1, 0xfc, 0x47, 0xcc, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0x76, 0x81, - 0x9b, 0x7c, 0xf5, 0x03, 0x00, 0x00, + proto.RegisterType((*BackupShardRequest)(nil), "cluster.BackupShardRequest") + proto.RegisterType((*BackupShardResponse)(nil), "cluster.BackupShardResponse") + proto.RegisterType((*CopyShardRequest)(nil), "cluster.CopyShardRequest") + proto.RegisterType((*CopyShardResponse)(nil), "cluster.CopyShardResponse") } diff --git a/cluster/internal/data.proto b/cluster/internal/data.proto index c98f7f43ed..656ff70e3a 100644 --- a/cluster/internal/data.proto +++ b/cluster/internal/data.proto @@ -81,3 +81,24 @@ message RemoteMonitorRequest { message RemoteMonitorResponse { optional string Err = 1; } + +message BackupShardRequest { + required uint64 ShardID = 1; + optional int64 Since = 2; +} + +message BackupShardResponse { + optional string Err = 2; +} + +message CopyShardRequest { + required string Host = 1; + required string Database = 2; + required string Policy = 3; + required uint64 ShardID = 4; + optional int64 Since = 5; +} + +message CopyShardResponse { + optional string Err = 2; +} diff --git a/cluster/rpc.go b/cluster/rpc.go index 092559163a..6be04b09c0 100644 --- a/cluster/rpc.go +++ b/cluster/rpc.go @@ -556,3 +556,89 @@ func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error { } return nil } + +// BackupShardRequest represents a request to stream a backup of a single shard. +type BackupShardRequest struct { + ShardID uint64 + Since time.Time +} + +// MarshalBinary encodes r to a binary format. +func (r *BackupShardRequest) MarshalBinary() ([]byte, error) { + return proto.Marshal(&internal.BackupShardRequest{ + ShardID: proto.Uint64(r.ShardID), + Since: proto.Int64(r.Since.UnixNano()), + }) +} + +// UnmarshalBinary decodes data into r. +func (r *BackupShardRequest) UnmarshalBinary(data []byte) error { + var pb internal.BackupShardRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.ShardID = pb.GetShardID() + r.Since = time.Unix(0, pb.GetSince()) + return nil +} + +// CopyShardRequest represents a request to copy a shard from another host. +type CopyShardRequest struct { + Host string + Database string + Policy string + ShardID uint64 + Since time.Time +} + +// MarshalBinary encodes r to a binary format. +func (r *CopyShardRequest) MarshalBinary() ([]byte, error) { + return proto.Marshal(&internal.CopyShardRequest{ + Host: proto.String(r.Host), + Database: proto.String(r.Database), + Policy: proto.String(r.Policy), + ShardID: proto.Uint64(r.ShardID), + Since: proto.Int64(r.Since.UnixNano()), + }) +} + +// UnmarshalBinary decodes data into r. +func (r *CopyShardRequest) UnmarshalBinary(data []byte) error { + var pb internal.CopyShardRequest + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + + r.Host = pb.GetHost() + r.Database = pb.GetDatabase() + r.Policy = pb.GetPolicy() + r.ShardID = pb.GetShardID() + r.Since = time.Unix(0, pb.GetSince()) + return nil +} + +// CopyShardResponse represents a response from a shard Copy. +type CopyShardResponse struct { + Err error +} + +func (r *CopyShardResponse) MarshalBinary() ([]byte, error) { + var pb internal.CopyShardResponse + if r.Err != nil { + pb.Err = proto.String(r.Err.Error()) + } + return proto.Marshal(&pb) +} + +func (r *CopyShardResponse) UnmarshalBinary(data []byte) error { + var pb internal.CopyShardResponse + + if err := proto.Unmarshal(data, &pb); err != nil { + return err + } + if pb.Err != nil { + r.Err = errors.New(pb.GetErr()) + } + return nil +} diff --git a/cluster/service.go b/cluster/service.go index b8873ebea2..8cdc5e6c7a 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -12,6 +12,7 @@ import ( "os" "strings" "sync" + "time" "github.com/influxdata/influxdb/monitor" @@ -40,6 +41,15 @@ const ( seriesKeysReq = "seriesKeysReq" seriesKeysResp = "seriesKeysResp" + + expandSourcesReq = "expandSourcesReq" + expandSourcesReqexpandSourcesResp = "expandSourcesResp" + + backupShardReq = "backupShardReq" + backupShardReqbackupShardResp = "backupShardResp" + + copyShardReq = "copyShardReq" + copyShardResp = "copyShardResp" ) const ( @@ -60,8 +70,20 @@ const ( remoteMonitorRequestMessage remoteMonitorResponseMessage + + expandSourcesRequestMessage + expandSourcesResponseMessage + + backupShardRequestMessage + backupShardResponseMessage + + copyShardRequestMessage + copyShardResponseMessage ) +// BackupTimeout is the time before a connection times out when performing a backup. +const BackupTimeout = 30 * time.Second + // Service processes data received over raw TCP connections. type Service struct { mu sync.RWMutex @@ -227,6 +249,18 @@ func (s *Service) handleConn(conn net.Conn) { s.Logger.Printf("process write shard error: %s", err) } s.writeRemoteMonitorResponse(conn, err) + case expandSourcesRequestMessage: + s.statMap.Add(expandSourcesReq, 1) + s.processExpandSourcesRequest(conn) + return + case backupShardRequestMessage: + s.statMap.Add(backupShardReq, 1) + s.processBackupShardRequest(conn) + return + case copyShardRequestMessage: + s.statMap.Add(copyShardReq, 1) + s.processCopyShardRequest(conn) + return default: s.Logger.Printf("cluster service message type not found: %d", typ) } @@ -549,6 +583,139 @@ func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) { } } +func (s *Service) processExpandSourcesRequest(conn net.Conn) { + var sources influxql.Sources + if err := func() error { + // Parse request. + var req ExpandSourcesRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Collect iterator creators for each shard. + ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs)) + for _, shardID := range req.ShardIDs { + ic := s.TSDBStore.ShardIteratorCreator(shardID) + if ic == nil { + return nil + } + ics = append(ics, ic) + } + + // Expand sources from all shards. + a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources) + if err != nil { + return err + } + sources = a + + return nil + }(); err != nil { + s.Logger.Printf("error reading ExpandSources request: %s", err) + EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{ + Sources: sources, + }); err != nil { + s.Logger.Printf("error writing ExpandSources response: %s", err) + return + } +} + +func (s *Service) processBackupShardRequest(conn net.Conn) { + if err := func() error { + // Parse request. + var req BackupShardRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Backup from local shard to the connection. + if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil { + return err + } + + return nil + }(); err != nil { + s.Logger.Printf("error processing BackupShardRequest: %s", err) + return + } +} + +func (s *Service) processCopyShardRequest(conn net.Conn) { + if err := func() error { + // Parse request. + var req CopyShardRequest + if err := DecodeLV(conn, &req); err != nil { + return err + } + + // Begin streaming backup from remote server. + r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since) + if err != nil { + return err + } + defer r.Close() + + // Create shard if it doesn't exist. + if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil { + return err + } + + // Restore to local shard. + if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil { + return err + } + + return nil + }(); err != nil { + s.Logger.Printf("error reading CopyShard request: %s", err) + EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err}) + return + } + + // Encode success response. + if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil { + s.Logger.Printf("error writing CopyShard response: %s", err) + return + } +} + +// backupRemoteShard connects to a cluster service on a remote host and streams a shard. +func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) { + conn, err := net.Dial("tcp", host) + if err != nil { + return nil, err + } + conn.SetDeadline(time.Now().Add(BackupTimeout)) + + if err := func() error { + // Write the cluster multiplexing header byte + if _, err := conn.Write([]byte{MuxHeader}); err != nil { + return err + } + + // Write backup request. + if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{ + ShardID: shardID, + Since: since, + }); err != nil { + return fmt.Errorf("error writing BackupShardRequest: %s", err) + } + + return nil + }(); err != nil { + conn.Close() + return nil, err + } + + // Return the connection which will stream the rest of the backup. + return conn, nil +} + // ReadTLV reads a type-length-value record from r. func ReadTLV(r io.Reader) (byte, []byte, error) { typ, err := ReadType(r) diff --git a/cluster/statement_executor.go b/cluster/statement_executor.go index 00c668a656..56f777bbe6 100644 --- a/cluster/statement_executor.go +++ b/cluster/statement_executor.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "io" "sort" "strconv" "time" @@ -872,12 +873,16 @@ type TSDBStore interface { CreateShard(database, policy string, shardID uint64) error WriteToShard(shardID uint64, points []models.Point) error + RestoreShard(id uint64, r io.Reader) error + BackupShard(id uint64, since time.Time, w io.Writer) error + DeleteDatabase(name string) error DeleteMeasurement(database, name string) error DeleteRetentionPolicy(database, name string) error DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error DeleteShard(id uint64) error IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) + ShardIteratorCreator(id uint64) influxql.IteratorCreator } type LocalTSDBStore struct { diff --git a/cluster/statement_executor_test.go b/cluster/statement_executor_test.go index b1bebeb7e6..b2c2bdd0d5 100644 --- a/cluster/statement_executor_test.go +++ b/cluster/statement_executor_test.go @@ -207,6 +207,9 @@ type TSDBStore struct { CreateShardFn func(database, policy string, shardID uint64) error WriteToShardFn func(shardID uint64, points []models.Point) error + RestoreShardFn func(id uint64, r io.Reader) error + BackupShardFn func(id uint64, since time.Time, w io.Writer) error + DeleteDatabaseFn func(name string) error DeleteMeasurementFn func(database, name string) error DeleteRetentionPolicyFn func(database, name string) error @@ -226,6 +229,14 @@ func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error { return s.WriteToShardFn(shardID, points) } +func (s *TSDBStore) RestoreShard(id uint64, r io.Reader) error { + return s.RestoreShardFn(id, r) +} + +func (s *TSDBStore) BackupShard(id uint64, since time.Time, w io.Writer) error { + return s.BackupShardFn(id, since, w) +} + func (s *TSDBStore) DeleteDatabase(name string) error { return s.DeleteDatabaseFn(name) } diff --git a/tsdb/engine.go b/tsdb/engine.go index 3c2df38a74..8805b5f19a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -31,6 +31,9 @@ type Engine interface { SetLogOutput(io.Writer) LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error + Backup(w io.Writer, basePath string, since time.Time) error + Restore(r io.Reader, basePath string) error + CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) WritePoints(points []models.Point) error @@ -45,8 +48,6 @@ type Engine interface { Format() EngineFormat io.WriterTo - - Backup(w io.Writer, basePath string, since time.Time) error } // EngineFormat represents the format for an engine. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c79e93e0e2..f53188ec1f 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -308,6 +308,75 @@ func (e *Engine) writeFileToBackup(f FileStat, shardRelativePath string, tw *tar return err } +// Restore will read a tar archive generated by Backup(). +// Only files that match basePath will be copied into the directory. This obtains +// a write lock so no operations can be performed while restoring. +func (e *Engine) Restore(r io.Reader, basePath string) error { + // Copy files from archive while under lock to prevent reopening. + if err := func() error { + e.mu.Lock() + defer e.mu.Unlock() + + tr := tar.NewReader(r) + for { + if err := e.readFileFromBackup(tr, basePath); err == io.EOF { + break + } else if err != nil { + return err + } + } + + return nil + }(); err != nil { + if err := e.Open(); err != nil { + log.Printf("error reopening engine after restore: %s", err) + } + return err + } + + return nil +} + +// readFileFromBackup copies the next file from the archive into the shard. +// The file is skipped if it does not have a matching shardRelativePath prefix. +func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) error { + // Read next archive file. + hdr, err := tr.Next() + if err != nil { + return err + } + + // Skip file if it does not have a matching prefix. + if !filepath.HasPrefix(hdr.Name, shardRelativePath) { + return nil + } + path, err := filepath.Rel(shardRelativePath, hdr.Name) + if err != nil { + return err + } + + // Create new file on disk. + f, err := os.OpenFile(filepath.Join(e.path, path), os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + defer f.Close() + + // Copy from archive to the file. + if _, err := io.CopyN(f, tr, hdr.Size); err != nil { + return err + } + + // Sync to disk & close. + if err := f.Sync(); err != nil { + return err + } else if err := f.Close(); err != nil { + return err + } + + return nil +} + // addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the // database index and measurement fields func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error { diff --git a/tsdb/shard.go b/tsdb/shard.go index 944a3f412f..0ef37ec470 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -539,6 +539,32 @@ func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error return expanded, nil } +// Restore restores data to the underlying engine for the shard. +// The shard is reopened after restore. +func (s *Shard) Restore(r io.Reader, basePath string) error { + s.mu.Lock() + + // Restore to engine. + if err := s.engine.Restore(r, basePath); err != nil { + s.mu.Unlock() + return err + } + + s.mu.Unlock() + + // Close shard. + if err := s.Close(); err != nil { + return err + } + + // Reopen engine. + if err := s.Open(); err != nil { + return err + } + + return nil +} + // Shards represents a sortable list of shards. type Shards []*Shard diff --git a/tsdb/store.go b/tsdb/store.go index 30c42edc78..1095684a52 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -521,6 +521,22 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { return shard.engine.Backup(w, path, since) } +// RestoreShard restores a backup from r to a given shard. +// This will only overwrite files included in the backup. +func (s *Store) RestoreShard(id uint64, r io.Reader) error { + shard := s.Shard(id) + if shard == nil { + return fmt.Errorf("shard %d doesn't exist on this server", id) + } + + path, err := relativePath(s.path, shard.path) + if err != nil { + return err + } + + return shard.Restore(r, path) +} + // ShardRelativePath will return the relative path to the shard. i.e. // func (s *Store) ShardRelativePath(id uint64) (string, error) { shard := s.Shard(id) diff --git a/tsdb/store_test.go b/tsdb/store_test.go index cdb129e75d..31afd2be20 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -1,6 +1,7 @@ package tsdb_test import ( + "bytes" "fmt" "io/ioutil" "os" @@ -276,6 +277,70 @@ func TestShards_CreateIterator(t *testing.T) { } } +// Ensure the store can backup a shard and another store can restore it. +func TestStore_BackupRestoreShard(t *testing.T) { + s0, s1 := MustOpenStore(), MustOpenStore() + defer s0.Close() + defer s1.Close() + + // Create shard with data. + s0.MustCreateShardWithData("db0", "rp0", 100, + `cpu value=1 0`, + `cpu value=2 10`, + `cpu value=3 20`, + ) + + // Backup shard to a buffer. + var buf bytes.Buffer + if err := s0.BackupShard(100, time.Time{}, &buf); err != nil { + t.Fatal(err) + } + + // Create the shard on the other store and restore from buffer. + if err := s1.CreateShard("db0", "rp0", 100); err != nil { + t.Fatal(err) + } + if err := s1.RestoreShard(100, &buf); err != nil { + t.Fatal(err) + } + + // Read data from + itr, err := s1.Shard(100).CreateIterator(influxql.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + Ascending: true, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + }) + if err != nil { + t.Fatal(err) + } + fitr := itr.(influxql.FloatIterator) + + // Read values from iterator. The host=serverA points should come first. + p, e := fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(0, 0).UnixNano(), Value: 1}) { + t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) + } + p, e = fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(10, 0).UnixNano(), Value: 2}) { + t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) + } + p, e = fitr.Next() + if e != nil { + t.Fatal(e) + } + if !deep.Equal(p, &influxql.FloatPoint{Name: "cpu", Time: time.Unix(20, 0).UnixNano(), Value: 3}) { + t.Fatalf("unexpected point(2): %s", spew.Sdump(p)) + } +} + func BenchmarkStoreOpen_200KSeries_100Shards(b *testing.B) { benchmarkStoreOpen(b, 64, 5, 5, 1, 100) } func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int) {