Add writer client and metatable for writer

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2020-12-21 10:06:46 +08:00 committed by yefu.chen
parent f360ed7004
commit 9694203f8b
9 changed files with 595 additions and 11 deletions

View File

@ -0,0 +1,19 @@
syntax = "proto3";
package milvus.proto.service;
option go_package="github.com/zilliztech/milvus-distributed/internal/proto/writerpb";
message FieldFlushMeta {
int32 fieldID = 1;
repeated string binlog_paths = 2;
}
//etcd meta
message SegmentFlushMeta{
int64 segmentID = 1;
bool is_closed = 2;
uint64 open_time =3;
uint64 close_time = 4;
repeated FieldFlushMeta fields = 5;
}

View File

@ -0,0 +1,169 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: write_node.proto
package writerpb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type FieldFlushMeta struct {
FieldID int32 `protobuf:"varint,1,opt,name=fieldID,proto3" json:"fieldID,omitempty"`
BinlogPaths []string `protobuf:"bytes,2,rep,name=binlog_paths,json=binlogPaths,proto3" json:"binlog_paths,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FieldFlushMeta) Reset() { *m = FieldFlushMeta{} }
func (m *FieldFlushMeta) String() string { return proto.CompactTextString(m) }
func (*FieldFlushMeta) ProtoMessage() {}
func (*FieldFlushMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_8ec4781d562c3e8f, []int{0}
}
func (m *FieldFlushMeta) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FieldFlushMeta.Unmarshal(m, b)
}
func (m *FieldFlushMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FieldFlushMeta.Marshal(b, m, deterministic)
}
func (m *FieldFlushMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_FieldFlushMeta.Merge(m, src)
}
func (m *FieldFlushMeta) XXX_Size() int {
return xxx_messageInfo_FieldFlushMeta.Size(m)
}
func (m *FieldFlushMeta) XXX_DiscardUnknown() {
xxx_messageInfo_FieldFlushMeta.DiscardUnknown(m)
}
var xxx_messageInfo_FieldFlushMeta proto.InternalMessageInfo
func (m *FieldFlushMeta) GetFieldID() int32 {
if m != nil {
return m.FieldID
}
return 0
}
func (m *FieldFlushMeta) GetBinlogPaths() []string {
if m != nil {
return m.BinlogPaths
}
return nil
}
//etcd meta
type SegmentFlushMeta struct {
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
IsClosed bool `protobuf:"varint,2,opt,name=is_closed,json=isClosed,proto3" json:"is_closed,omitempty"`
OpenTime uint64 `protobuf:"varint,3,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"`
CloseTime uint64 `protobuf:"varint,4,opt,name=close_time,json=closeTime,proto3" json:"close_time,omitempty"`
Fields []*FieldFlushMeta `protobuf:"bytes,5,rep,name=fields,proto3" json:"fields,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SegmentFlushMeta) Reset() { *m = SegmentFlushMeta{} }
func (m *SegmentFlushMeta) String() string { return proto.CompactTextString(m) }
func (*SegmentFlushMeta) ProtoMessage() {}
func (*SegmentFlushMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_8ec4781d562c3e8f, []int{1}
}
func (m *SegmentFlushMeta) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SegmentFlushMeta.Unmarshal(m, b)
}
func (m *SegmentFlushMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SegmentFlushMeta.Marshal(b, m, deterministic)
}
func (m *SegmentFlushMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_SegmentFlushMeta.Merge(m, src)
}
func (m *SegmentFlushMeta) XXX_Size() int {
return xxx_messageInfo_SegmentFlushMeta.Size(m)
}
func (m *SegmentFlushMeta) XXX_DiscardUnknown() {
xxx_messageInfo_SegmentFlushMeta.DiscardUnknown(m)
}
var xxx_messageInfo_SegmentFlushMeta proto.InternalMessageInfo
func (m *SegmentFlushMeta) GetSegmentID() int64 {
if m != nil {
return m.SegmentID
}
return 0
}
func (m *SegmentFlushMeta) GetIsClosed() bool {
if m != nil {
return m.IsClosed
}
return false
}
func (m *SegmentFlushMeta) GetOpenTime() uint64 {
if m != nil {
return m.OpenTime
}
return 0
}
func (m *SegmentFlushMeta) GetCloseTime() uint64 {
if m != nil {
return m.CloseTime
}
return 0
}
func (m *SegmentFlushMeta) GetFields() []*FieldFlushMeta {
if m != nil {
return m.Fields
}
return nil
}
func init() {
proto.RegisterType((*FieldFlushMeta)(nil), "milvus.proto.service.FieldFlushMeta")
proto.RegisterType((*SegmentFlushMeta)(nil), "milvus.proto.service.SegmentFlushMeta")
}
func init() { proto.RegisterFile("write_node.proto", fileDescriptor_8ec4781d562c3e8f) }
var fileDescriptor_8ec4781d562c3e8f = []byte{
// 287 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xc1, 0x4b, 0xc3, 0x30,
0x14, 0xc6, 0xe9, 0xba, 0xcd, 0x35, 0x13, 0x19, 0xc5, 0x43, 0x40, 0x85, 0x3a, 0x3c, 0xf4, 0x62,
0x0b, 0x7a, 0xf5, 0x20, 0x53, 0x06, 0x1e, 0x06, 0x12, 0x3d, 0x79, 0x29, 0x6b, 0xfb, 0x6c, 0x1f,
0xa4, 0x49, 0x49, 0x5e, 0x27, 0xec, 0x1f, 0xf4, 0xdf, 0x92, 0xa6, 0x93, 0x21, 0x78, 0xfc, 0x7e,
0x1f, 0xef, 0x47, 0xf2, 0xb1, 0xc5, 0x97, 0x41, 0x82, 0x4c, 0xe9, 0x12, 0x92, 0xd6, 0x68, 0xd2,
0xe1, 0x79, 0x83, 0x72, 0xd7, 0xd9, 0x21, 0x25, 0x16, 0xcc, 0x0e, 0x0b, 0x58, 0x6e, 0xd8, 0xd9,
0x1a, 0x41, 0x96, 0x6b, 0xd9, 0xd9, 0x7a, 0x03, 0xb4, 0x0d, 0x39, 0x3b, 0xf9, 0xec, 0xc9, 0xcb,
0x33, 0xf7, 0x22, 0x2f, 0x9e, 0x88, 0xdf, 0x18, 0x5e, 0xb3, 0xd3, 0x1c, 0x95, 0xd4, 0x55, 0xd6,
0x6e, 0xa9, 0xb6, 0x7c, 0x14, 0xf9, 0x71, 0x20, 0xe6, 0x03, 0x7b, 0xed, 0xd1, 0xf2, 0xdb, 0x63,
0x8b, 0x37, 0xa8, 0x1a, 0x50, 0x74, 0x34, 0x5e, 0xb2, 0xc0, 0x0e, 0xec, 0xe0, 0xf4, 0xc5, 0x11,
0x84, 0x17, 0x2c, 0x40, 0x9b, 0x15, 0x52, 0x5b, 0x28, 0xf9, 0x28, 0xf2, 0xe2, 0x99, 0x98, 0xa1,
0x7d, 0x72, 0xb9, 0x2f, 0x75, 0x0b, 0x2a, 0x23, 0x6c, 0x80, 0xfb, 0x91, 0x17, 0x8f, 0xc5, 0xac,
0x07, 0xef, 0xd8, 0x40, 0x78, 0xc5, 0x98, 0x3b, 0x1b, 0xda, 0xb1, 0x6b, 0x03, 0x47, 0x5c, 0xfd,
0xc0, 0xa6, 0xee, 0xe5, 0x96, 0x4f, 0x22, 0x3f, 0x9e, 0xdf, 0xdd, 0x24, 0xff, 0x2d, 0x90, 0xfc,
0xfd, 0xbe, 0x38, 0xdc, 0xac, 0x56, 0x1f, 0x8f, 0x15, 0x52, 0xdd, 0xe5, 0x49, 0xa1, 0x9b, 0x74,
0x8f, 0x52, 0xe2, 0x9e, 0xa0, 0xa8, 0xd3, 0x41, 0x72, 0x5b, 0xa2, 0x25, 0x83, 0x79, 0x47, 0x50,
0xa6, 0xa8, 0x08, 0x8c, 0xda, 0xca, 0xd4, 0x99, 0x53, 0x37, 0xbd, 0x69, 0xf3, 0x7c, 0xea, 0xf2,
0xfd, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9b, 0x50, 0x78, 0xd1, 0x8d, 0x01, 0x00, 0x00,
}

View File

@ -1,12 +1,36 @@
package writerclient
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
import (
"strconv"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Client struct {
kvClient kv.TxnBase // client of a reliable kv service, i.e. etcd client
kvPrefix string
}
func NewWriterClient(etcdAddress string, kvRootPath string) (*Client, error) {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
kvClient := etcdkv.NewEtcdKV(etcdClient, kvRootPath)
return &Client{
kvClient: kvClient,
kvPrefix: "writer/segment/",
}, nil
}
type SegmentDescription struct {
@ -16,17 +40,51 @@ type SegmentDescription struct {
CloseTime Timestamp
}
func (c *Client) FlushSegment(semgentID UniqueID) error {
func (c *Client) FlushSegment(segmentID UniqueID) error {
// push msg to pulsar channel
return nil
}
func (c *Client) DescribeSegment(semgentID UniqueID) (*SegmentDescription, error) {
func (c *Client) DescribeSegment(segmentID UniqueID) (*SegmentDescription, error) {
// query etcd
return &SegmentDescription{}, nil
ret := &SegmentDescription{
SegmentID: segmentID,
IsClosed: false,
}
key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
value, err := c.kvClient.Load(key)
if err != nil {
return ret, err
}
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return ret, err
}
ret.IsClosed = flushMeta.IsClosed
ret.OpenTime = flushMeta.OpenTime
ret.CloseTime = flushMeta.CloseTime
return ret, nil
}
func (c *Client) GetInsertBinlogPaths(semgentID UniqueID) (map[int32][]string, error) {
// query etcd
return nil, nil
func (c *Client) GetInsertBinlogPaths(segmentID UniqueID) (map[int32][]string, error) {
key := c.kvPrefix + strconv.FormatInt(segmentID, 10)
value, err := c.kvClient.Load(key)
if err != nil {
return nil, err
}
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return nil, err
}
ret := make(map[int32][]string)
for _, field := range flushMeta.Fields {
ret[field.FieldID] = field.BinlogPaths
}
return ret, nil
}

View File

@ -31,7 +31,8 @@ func TestDataSyncService_Start(t *testing.T) {
// init write node
pulsarURL := Params.PulsarAddress
node := NewWriteNode(ctx, 0)
node, err := NewWriteNode(ctx, 0)
assert.Nil(t, err)
// test data generate
const DIM = 16
@ -132,7 +133,7 @@ func TestDataSyncService_Start(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err := insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)

View File

@ -0,0 +1,179 @@
package writenode
import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
pb "github.com/zilliztech/milvus-distributed/internal/proto/writerpb"
)
type metaTable struct {
client kv.TxnBase // client of a reliable kv service, i.e. etcd client
segID2FlushMeta map[UniqueID]pb.SegmentFlushMeta // index id to index meta
lock sync.RWMutex
}
func NewMetaTable(kv kv.TxnBase) (*metaTable, error) {
mt := &metaTable{
client: kv,
lock: sync.RWMutex{},
}
err := mt.reloadFromKV()
if err != nil {
return nil, err
}
return mt, nil
}
func (mt *metaTable) reloadFromKV() error {
mt.segID2FlushMeta = make(map[UniqueID]pb.SegmentFlushMeta)
_, values, err := mt.client.LoadWithPrefix("writer/segment")
if err != nil {
return err
}
for _, value := range values {
flushMeta := pb.SegmentFlushMeta{}
err = proto.UnmarshalText(value, &flushMeta)
if err != nil {
return err
}
mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta
}
return nil
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveFlushMeta(meta *pb.SegmentFlushMeta) error {
value := proto.MarshalTextString(meta)
mt.segID2FlushMeta[meta.SegmentID] = *meta
return mt.client.Save("/writer/segment/"+strconv.FormatInt(meta.SegmentID, 10), value)
}
func (mt *metaTable) AddSegmentFlush(segmentID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
_, ok := mt.segID2FlushMeta[segmentID]
if ok {
return errors.Errorf("segment already exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta := pb.SegmentFlushMeta{
IsClosed: false,
SegmentID: segmentID,
}
return mt.saveFlushMeta(&meta)
}
func (mt *metaTable) getFlushCloseTime(segmentID UniqueID) (Timestamp, error) {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
return meta.CloseTime, nil
}
func (mt *metaTable) SetFlushCloseTime(segmentID UniqueID, t Timestamp) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta.CloseTime = t
return mt.saveFlushMeta(&meta)
}
func (mt *metaTable) SetFlushOpenTime(segmentID UniqueID, t Timestamp) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta.OpenTime = t
return mt.saveFlushMeta(&meta)
}
func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return typeutil.ZeroTimestamp, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
return meta.OpenTime, nil
}
func (mt *metaTable) CompleteFlush(segmentID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta.IsClosed = true
return mt.saveFlushMeta(&meta)
}
func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return false, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
return meta.IsClosed, nil
}
func (mt *metaTable) AppendBinlogPaths(segmentID UniqueID, fieldID int32, dataPaths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
found := false
for _, field := range meta.Fields {
if field.FieldID == fieldID {
field.BinlogPaths = append(field.BinlogPaths, dataPaths...)
found = true
break
}
}
if !found {
newField := &pb.FieldFlushMeta{
FieldID: fieldID,
BinlogPaths: dataPaths,
}
meta.Fields = append(meta.Fields, newField)
}
return mt.saveFlushMeta(&meta)
}
func (mt *metaTable) getBinlogPaths(segmentID UniqueID) (map[int32][]string, error) {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return nil, errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
ret := make(map[int32][]string)
for _, field := range meta.Fields {
ret[field.FieldID] = field.BinlogPaths
}
return ret, nil
}

View File

@ -0,0 +1,112 @@
package writenode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
)
func createMetaTable(t *testing.T) *metaTable {
etcdAddr := Params.EtcdAddress
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
return meta
}
func TestMetaTable_AddSegmentFlush(t *testing.T) {
meta := createMetaTable(t)
defer meta.client.Close()
err := meta.AddSegmentFlush(1)
assert.Nil(t, err)
err = meta.AddSegmentFlush(2)
assert.Nil(t, err)
err = meta.AddSegmentFlush(2)
assert.NotNil(t, err)
err = meta.reloadFromKV()
assert.Nil(t, err)
}
func TestMetaTable_SetFlushTime(t *testing.T) {
meta := createMetaTable(t)
defer meta.client.Close()
var segmentID UniqueID = 1
err := meta.AddSegmentFlush(segmentID)
assert.Nil(t, err)
tsOpen := Timestamp(1000)
err = meta.SetFlushOpenTime(segmentID, tsOpen)
assert.Nil(t, err)
exp, err := meta.getFlushOpenTime(segmentID)
assert.Nil(t, err)
assert.Equal(t, tsOpen, exp)
tsClose := Timestamp(10001)
err = meta.SetFlushCloseTime(segmentID, tsClose)
assert.Nil(t, err)
exp, err = meta.getFlushCloseTime(segmentID)
assert.Nil(t, err)
assert.Equal(t, tsClose, exp)
}
func TestMetaTable_AppendBinlogPaths(t *testing.T) {
meta := createMetaTable(t)
defer meta.client.Close()
var segmentID UniqueID = 1
err := meta.AddSegmentFlush(segmentID)
assert.Nil(t, err)
exp := map[int32][]string{
1: {"a", "b", "c"},
2: {"b", "a", "c"},
}
for fieldID, dataPaths := range exp {
for _, dp := range dataPaths {
err = meta.AppendBinlogPaths(segmentID, fieldID, []string{dp})
assert.Nil(t, err)
}
}
ret, err := meta.getBinlogPaths(segmentID)
assert.Nil(t, err)
assert.Equal(t, exp, ret)
}
func TestMetaTable_CompleteFlush(t *testing.T) {
meta := createMetaTable(t)
defer meta.client.Close()
var segmentID UniqueID = 1
err := meta.AddSegmentFlush(segmentID)
assert.Nil(t, err)
ret, err := meta.checkFlushComplete(segmentID)
assert.Nil(t, err)
assert.Equal(t, false, ret)
meta.CompleteFlush(segmentID)
ret, err = meta.checkFlushComplete(segmentID)
assert.Nil(t, err)
assert.Equal(t, true, ret)
}

View File

@ -34,6 +34,9 @@ type ParamTable struct {
MsgChannelSubName string
DefaultPartitionTag string
SliceIndex int
EtcdAddress string
MetaRootPath string
}
var Params ParamTable
@ -60,6 +63,8 @@ func (p *ParamTable) Init() {
}
p.initPulsarAddress()
p.initEtcdAddress()
p.initMetaRootPath()
p.initWriteNodeID()
p.initWriteNodeNum()
@ -241,3 +246,23 @@ func (p *ParamTable) initSliceIndex() {
func (p *ParamTable) initWriteNodeNum() {
p.WriteNodeNum = len(p.WriteNodeIDList())
}
func (p *ParamTable) initEtcdAddress() {
addr, err := p.Load("_EtcdAddress")
if err != nil {
panic(err)
}
p.EtcdAddress = addr
}
func (p *ParamTable) initMetaRootPath() {
rootPath, err := p.Load("etcd.rootPath")
if err != nil {
panic(err)
}
subPath, err := p.Load("etcd.metaSubPath")
if err != nil {
panic(err)
}
p.MetaRootPath = rootPath + "/" + subPath
}

View File

@ -2,21 +2,40 @@ package writenode
import (
"context"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
)
type WriteNode struct {
ctx context.Context
WriteNodeID uint64
dataSyncService *dataSyncService
metaTable *metaTable
}
func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
func NewWriteNode(ctx context.Context, writeNodeID uint64) (*WriteNode, error) {
return &WriteNode{
node := &WriteNode{
ctx: ctx,
WriteNodeID: writeNodeID,
dataSyncService: nil,
}
etcdAddress := Params.EtcdAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metaKV, err2 := NewMetaTable(etcdKV)
if err2 != nil {
return nil, err
}
node.metaTable = metaKV
return node, nil
}
func (node *WriteNode) Start() {

View File

@ -24,6 +24,7 @@ mkdir -p internalpb
mkdir -p servicepb
mkdir -p masterpb
mkdir -p indexbuilderpb
mkdir -p writerpb
${protoc} --go_out=plugins=grpc,paths=source_relative:./commonpb common.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./schemapb schema.proto
@ -33,5 +34,6 @@ ${protoc} --go_out=plugins=grpc,paths=source_relative:./servicepb service_msg.pr
${protoc} --go_out=plugins=grpc,paths=source_relative:./servicepb service.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./masterpb master.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./indexbuilderpb index_builder.proto
${protoc} --go_out=plugins=grpc,paths=source_relative:./writerpb write_node.proto
popd