From 80bb418136e4db90586e731a4a5b1ebb077e11ec Mon Sep 17 00:00:00 2001
From: neza2017 <yefu.chen@zilliz.com>
Date: Tue, 22 Jun 2021 16:08:08 +0800
Subject: [PATCH] Release partition (#5971)

* add debug info

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* notify query coord to release partitions after drop partition

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>
---
 .../distributed/rootcoord/service_test.go     |  3 ++
 internal/rootcoord/root_coord.go              | 42 ++++++++++++++++++-
 internal/rootcoord/root_coord_test.go         | 13 ++++++
 internal/rootcoord/task.go                    |  7 ++++
 4 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go
index bf0a19c126..c076f147e0 100644
--- a/internal/distributed/rootcoord/service_test.go
+++ b/internal/distributed/rootcoord/service_test.go
@@ -245,6 +245,9 @@ func TestGrpcService(t *testing.T) {
 	core.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error {
 		return nil
 	}
+	core.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error {
+		return nil
+	}
 
 	rootcoord.Params.Address = Params.Address
 	err = svr.rootCoord.Register()
diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go
index d36b5a6eed..6a2a2f5c9d 100644
--- a/internal/rootcoord/root_coord.go
+++ b/internal/rootcoord/root_coord.go
@@ -125,7 +125,8 @@ type Core struct {
 	NewProxyClient func(sess *sessionutil.Session) (types.ProxyNode, error)
 
 	//query service interface, notify query service to release collection
-	CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) error
+	CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error
+	CallReleasePartitionService  func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error
 
 	//dd request scheduler
 	ddReqQueue chan reqTask //dd request will be push into this chan
@@ -232,6 +233,9 @@ func (c *Core) checkInit() error {
 	if c.CallReleaseCollectionService == nil {
 		return fmt.Errorf("CallReleaseCollectionService is nil")
 	}
+	if c.CallReleasePartitionService == nil {
+		return fmt.Errorf("CallReleasePartitionService is nil")
+	}
 	if c.DataCoordSegmentChan == nil {
 		return fmt.Errorf("DataCoordSegmentChan is nil")
 	}
@@ -507,6 +511,7 @@ func (c *Core) startMsgStreamAndSeek(chanName string, subName string, key string
 			return nil, fmt.Errorf("decode msg positions fail, err %s", err.Error())
 		}
 		if len(msgPositions) > 0 {
+			log.Debug("msgstream seek to position", zap.String("chanName", chanName), zap.String("SubName", subName))
 			if err := stream.Seek(msgPositions); err != nil {
 				return nil, fmt.Errorf("msg stream seek fail, err %s", err.Error())
 			}
@@ -514,6 +519,7 @@ func (c *Core) startMsgStreamAndSeek(chanName string, subName string, key string
 		}
 	}
 	stream.Start()
+	log.Debug("Start Consumer", zap.String("chanName", chanName), zap.String("SubName", subName))
 	return &stream, nil
 }
 
@@ -864,6 +870,35 @@ func (c *Core) SetQueryCoord(s types.QueryService) error {
 		retErr = nil
 		return
 	}
+	c.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) (retErr error) {
+		defer func() {
+			if err := recover(); err != nil {
+				retErr = fmt.Errorf("release partition from query service panic, msg = %v", err)
+			}
+		}()
+		req := &querypb.ReleasePartitionsRequest{
+			Base: &commonpb.MsgBase{
+				MsgType:   commonpb.MsgType_ReleasePartitions,
+				MsgID:     0, //TODO, msg ID
+				Timestamp: ts,
+				SourceID:  c.session.ServerID,
+			},
+			DbID:         dbID,
+			CollectionID: collectionID,
+			PartitionIDs: partitionIDs,
+		}
+		rsp, err := s.ReleasePartitions(ctx, req)
+		if err != nil {
+			retErr = err
+			return
+		}
+		if rsp.ErrorCode != commonpb.ErrorCode_Success {
+			retErr = fmt.Errorf("ReleasePartitions from query service failed, error = %s", rsp.Reason)
+			return
+		}
+		retErr = nil
+		return
+	}
 	return nil
 }
 
@@ -934,11 +969,13 @@ func (c *Core) Init() error {
 			c.kvBase = etcdkv.NewEtcdKV(c.etcdCli, Params.KvRootPath)
 			return nil
 		}
+		log.Debug("RootCoord, Connect to Etcd")
 		err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
 		if err != nil {
 			return
 		}
 
+		log.Debug("RootCoord, Set TSO and ID Allocator")
 		idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "gid"))
 		if initError = idAllocator.Initialize(); initError != nil {
 			return
@@ -977,6 +1014,7 @@ func (c *Core) Init() error {
 		c.chanTimeTick.AddProxyNode(c.session)
 		c.proxyClientManager = newProxyClientManager(c)
 
+		log.Debug("RootCoord, set proxy manager")
 		c.proxyNodeManager, initError = newProxyNodeManager(
 			c.ctx,
 			Params.EtcdEndpoints,
@@ -991,6 +1029,8 @@ func (c *Core) Init() error {
 	})
 	if initError == nil {
 		log.Debug(typeutil.RootCoordRole, zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Initializing)]))
+	} else {
+		log.Debug("RootCoord init error", zap.Error(initError))
 	}
 	return initError
 }
diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go
index fa3f9d908c..a6913ad9b8 100644
--- a/internal/rootcoord/root_coord_test.go
+++ b/internal/rootcoord/root_coord_test.go
@@ -131,6 +131,13 @@ func (q *queryMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseC
 	}, nil
 }
 
+func (q *queryMock) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
+	return &commonpb.Status{
+		ErrorCode: commonpb.ErrorCode_Success,
+		Reason:    "",
+	}, nil
+}
+
 type indexMock struct {
 	types.IndexCoord
 	fileArray  []string
@@ -1966,6 +1973,12 @@ func TestCheckInit(t *testing.T) {
 	err = c.checkInit()
 	assert.NotNil(t, err)
 
+	c.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error {
+		return nil
+	}
+	err = c.checkInit()
+	assert.NotNil(t, err)
+
 	c.DataCoordSegmentChan = make(chan *msgstream.MsgPack)
 	err = c.checkInit()
 	assert.NotNil(t, err)
diff --git a/internal/rootcoord/task.go b/internal/rootcoord/task.go
index 504b930913..92bfd48b04 100644
--- a/internal/rootcoord/task.go
+++ b/internal/rootcoord/task.go
@@ -580,6 +580,13 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
 	// error doesn't matter here
 	t.core.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
 
+	//notify query service to release partition
+	go func() {
+		if err = t.core.CallReleasePartitionService(t.core.ctx, ts, 0, collInfo.ID, []typeutil.UniqueID{partInfo.PartitionID}); err != nil {
+			log.Warn("CallReleaseCollectionService failed", zap.String("error", err.Error()))
+		}
+	}()
+
 	// Update DDOperation in etcd
 	return t.core.setDdMsgSendFlag(true)
 }