From 3cd24f754800f4363c7d4f7d26c7cf4caea4e77e Mon Sep 17 00:00:00 2001 From: jaime Date: Mon, 22 Jul 2024 21:11:48 +0800 Subject: [PATCH] fix: collection meta is not removed after gc in DataCoord (#34883) issue: #34847 Signed-off-by: jaime --- internal/datacoord/server.go | 8 +++++ internal/datacoord/server_test.go | 51 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index df6783313f..6f7f16bbe4 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1077,6 +1077,14 @@ func (s *Server) stopServerLoop() { // loadCollectionFromRootCoord communicates with RootCoord and asks for collection information. // collection information will be added to server meta info. func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error { + has, err := s.broker.HasCollection(ctx, collectionID) + if err != nil { + return err + } + if !has { + return merr.WrapErrCollectionNotFound(collectionID) + } + resp, err := s.broker.DescribeCollectionInternal(ctx, collectionID) if err != nil { return err diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 543d812b0d..17f4497b5e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3322,6 +3322,57 @@ func TestDataCoord_EnableActiveStandby(t *testing.T) { }, time.Second*5, time.Millisecond*100) } +func TestLoadCollectionFromRootCoord(t *testing.T) { + broker := broker.NewMockBroker(t) + s := &Server{ + broker: broker, + meta: &meta{collections: make(map[UniqueID]*collectionInfo)}, + } + + t.Run("has collection fail with error", func(t *testing.T) { + broker.EXPECT().HasCollection(mock.Anything, mock.Anything). + Return(false, errors.New("has collection error")).Once() + err := s.loadCollectionFromRootCoord(context.TODO(), 0) + assert.Error(t, err, "has collection error") + }) + + t.Run("has collection with not found", func(t *testing.T) { + broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil).Once() + err := s.loadCollectionFromRootCoord(context.TODO(), 0) + assert.Error(t, err) + assert.True(t, errors.Is(err, merr.ErrCollectionNotFound)) + }) + + broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, nil) + + t.Run("describeCollectionInternal fail", func(t *testing.T) { + broker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). + Return(nil, errors.New("describeCollectionInternal error")).Once() + err := s.loadCollectionFromRootCoord(context.TODO(), 0) + assert.Error(t, err, "describeCollectionInternal error") + }) + + broker.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ + CollectionID: 1, + }, nil).Twice() + + t.Run("ShowPartitionsInternal fail", func(t *testing.T) { + broker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything). + Return(nil, errors.New("ShowPartitionsInternal error")).Once() + err := s.loadCollectionFromRootCoord(context.TODO(), 0) + assert.Error(t, err, "ShowPartitionsInternal error") + }) + + broker.EXPECT().ShowPartitionsInternal(mock.Anything, mock.Anything).Return([]int64{2000}, nil).Once() + t.Run("ok", func(t *testing.T) { + err := s.loadCollectionFromRootCoord(context.TODO(), 0) + assert.NoError(t, err) + assert.Equal(t, 1, len(s.meta.collections)) + _, ok := s.meta.collections[1] + assert.True(t, ok) + }) +} + func TestUpdateAutoBalanceConfigLoop(t *testing.T) { Params.Save(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key, "1") defer Params.Reset(Params.DataCoordCfg.CheckAutoBalanceConfigInterval.Key)