Update segment

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
pull/4973/head^2
neza2017 2020-11-14 14:39:52 +08:00 committed by yefu.chen
parent 27032cc126
commit ce969e9568
8 changed files with 142 additions and 13 deletions

View File

@ -1140,6 +1140,7 @@ func (meta *metaTable) HasPartition(collId UniqueId, tag string) bool
func (meta *metaTable) DeletePartition(collId UniqueId, tag string) error
func (meta *metaTable) AddSegment(seg *SegmentMeta) error
func (meta *metaTable) UpdateSegment(seg *SegmentMeta) error
func (meta *metaTable) GetSegmentById(segId UniqueId)(*SegmentMeta, error)
func (meta *metaTable) DeleteSegment(segId UniqueId) error
func (meta *metaTable) CloseSegment(segId UniqueId, closeTs Timestamp, num_rows int64) error

View File

@ -59,6 +59,7 @@ func (ta *TimestampAllocator) syncTs() {
Count: ta.countPerRPC,
}
resp, err := ta.masterClient.AllocTimestamp(ctx, req)
log.Printf("resp: %v", resp)
cancel()
if err != nil {

View File

@ -360,7 +360,7 @@ func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequ
if err != nil {
return &internalpb.TsoResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, err
}, nil
}
response := &internalpb.TsoResponse{
@ -379,7 +379,7 @@ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*i
if err != nil {
return &internalpb.IDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, err
}, nil
}
response := &internalpb.IDResponse{

View File

@ -76,7 +76,6 @@ func Init() {
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) {
rand.Seed(time.Now().UnixNano())
Init()
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr})
@ -142,6 +141,10 @@ func (s *Master) IsClosed() bool {
return atomic.LoadInt64(&s.isServing) == 0
}
func (s *Master) IsServing() bool {
return !s.IsClosed()
}
// Run runs the pd server.
func (s *Master) Run(grpcPort int64) error {

View File

@ -393,6 +393,34 @@ func (mt *metaTable) AddSegment(seg *pb.SegmentMeta) error {
return nil
}
func (mt *metaTable) UpdateSegment(seg *pb.SegmentMeta) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collID := seg.CollectionID
collMeta := mt.collID2Meta[collID]
isNewSegID := true
for _, segID := range collMeta.SegmentIDs {
if segID == seg.SegmentID {
isNewSegID = false
break
}
}
if isNewSegID {
collMeta.SegmentIDs = append(collMeta.SegmentIDs, seg.SegmentID)
if err := mt.saveCollectionsAndSegmentsMeta(&collMeta, seg); err != nil {
_ = mt.reloadFromKV()
return err
}
} else {
if err := mt.saveSegmentMeta(seg); err != nil {
_ = mt.reloadFromKV()
return err
}
}
return nil
}
func (mt *metaTable) GetSegmentByID(segID UniqueID) (*pb.SegmentMeta, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()

View File

@ -316,3 +316,54 @@ func TestMetaTable_Segment(t *testing.T) {
assert.Equal(t, 0, len(meta.segID2Meta))
}
func TestMetaTable_UpdateSegment(t *testing.T) {
err := gparams.GParams.LoadYaml("config.yaml")
if err != nil {
panic(err)
}
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}})
assert.Nil(t, err)
etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root")
_, err = cli.Delete(context.TODO(), "/etcd/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
meta, err := NewMetaTable(etcdKV)
assert.Nil(t, err)
defer meta.client.Close()
colMeta := pb.CollectionMeta{
ID: 100,
Schema: &schemapb.CollectionSchema{
Name: "coll1",
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
}
segMeta := pb.SegmentMeta{
SegmentID: 200,
CollectionID: 100,
PartitionTag: "p1",
NumRows: 110,
}
err = meta.AddCollection(&colMeta)
assert.Nil(t, err)
err = meta.UpdateSegment(&segMeta)
assert.Nil(t, err)
seg, err := meta.GetSegmentByID(200)
assert.Nil(t, err)
assert.Equal(t, seg.NumRows, int64(110))
segMeta.NumRows = 210
err = meta.UpdateSegment(&segMeta)
assert.Nil(t, err)
seg, err = meta.GetSegmentByID(200)
assert.Nil(t, err)
assert.Equal(t, seg.NumRows, int64(210))
}

View File

@ -5,10 +5,19 @@ import (
"fmt"
"log"
"os"
"path"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
"github.com/zilliztech/milvus-distributed/internal/conf"
mockMaster "github.com/zilliztech/milvus-distributed/internal/master/mock"
"github.com/zilliztech/milvus-distributed/internal/master"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -22,18 +31,24 @@ var proxyClient servicepb.MilvusServiceClient
var proxyServer *Proxy
var masterServer *mockMaster.Master
var masterServer *master.Master
func startMaster(ctx context.Context) {
//conf.LoadConfig("config.yaml")
fmt.Println("THIS is test before.")
svr, err := mockMaster.CreateServer(ctx)
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
rootPath := conf.Config.Etcd.Rootpath
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
tsoRootPath := path.Join(rootPath, "timestamp")
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, tsoRootPath, []string{etcdAddr})
masterServer = svr
if err != nil {
log.Print("create server failed", zap.Error(err))
}
if err := svr.Run(); err != nil {
if err := svr.Run(int64(conf.Config.Master.Port)); err != nil {
log.Fatal("run server failed", zap.Error(err))
}
@ -57,6 +72,10 @@ func startProxy(ctx context.Context) {
func setup() {
conf.LoadConfig("config.yaml")
err := gparams.GParams.LoadYaml("config.yaml")
if err != nil {
panic(err)
}
ctx, cancel = context.WithCancel(context.Background())
startMaster(ctx)
@ -78,7 +97,33 @@ func shutdown() {
}
func TestProxy_CreateCollection(t *testing.T) {
fmt.Println(proxyClient)
var wg sync.WaitGroup
for i := 0; i < 1; i++ {
i := i
cs := &schemapb.CollectionSchema{
Name: "CreateCollection" + strconv.FormatInt(int64(i), 10),
Description: "no description",
AutoID: true,
Fields: make([]*schemapb.FieldSchema, 1),
}
cs.Fields[0] = &schemapb.FieldSchema{
Name: "Field" + strconv.FormatInt(int64(i), 10),
Description: "no description",
DataType: schemapb.DataType_INT32,
}
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
resp, err := proxyClient.CreateCollection(ctx, cs)
if err != nil {
t.Error(err)
}
msg := "Create Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
}(&wg)
}
wg.Wait()
}
func TestMain(m *testing.M) {

View File

@ -89,7 +89,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
func (queue *BaseTaskQueue) AddActiveTask(t task) {
queue.atLock.Lock()
defer queue.atLock.Lock()
defer queue.atLock.Unlock()
ts := t.EndTs()
_, ok := queue.activeTasks[ts]
@ -102,7 +102,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
queue.atLock.Lock()
defer queue.atLock.Lock()
defer queue.atLock.Unlock()
t, ok := queue.activeTasks[ts]
if ok {
@ -116,7 +116,7 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
func (queue *BaseTaskQueue) getTaskByReqID(reqID UniqueID) task {
queue.utLock.Lock()
defer queue.utLock.Lock()
defer queue.utLock.Unlock()
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
if e.Value.(task).ID() == reqID {
return e.Value.(task)