diff --git a/go.mod b/go.mod index 3779f7bfaa..4bc4191509 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/antonmedv/expr v1.8.9 github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20 github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 - github.com/apache/thrift v0.15.0 + github.com/apache/thrift v0.15.0 // indirect github.com/bits-and-blooms/bloom/v3 v3.0.1 github.com/casbin/casbin/v2 v2.44.2 github.com/casbin/json-adapter/v2 v2.0.0 @@ -19,7 +19,6 @@ require ( github.com/containerd/cgroups v1.0.2 github.com/gin-gonic/gin v1.7.7 github.com/gofrs/flock v0.8.1 - github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 github.com/google/btree v1.0.1 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 @@ -28,7 +27,7 @@ require ( github.com/klauspost/compress v1.14.2 github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa + github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651 github.com/minio/minio-go/v7 v7.0.17 github.com/opentracing/opentracing-go v1.2.0 github.com/panjf2000/ants/v2 v2.4.8 diff --git a/go.sum b/go.sum index ad91d133c4..aecfbe6851 100644 --- a/go.sum +++ b/go.sum @@ -488,8 +488,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyex github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa h1:ok2ZT20iWlDqXWBzgVpbYev4tsOKvqUXPIJ1EUaQdEg= -github.com/milvus-io/milvus-proto/go-api v0.0.0-20221213131318-537b49f7c0aa/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651 h1:lXwp7St1mNKatOnl2mt6TU3QRpMTf75liXqTGmTkjis= +github.com/milvus-io/milvus-proto/go-api v0.0.0-20221214030318-aadb4b6b9651/go.mod h1:148qnlmZ0Fdm1Fq+Mj/OW2uDoEP25g3mjh0vMGtkgmk= github.com/milvus-io/pulsar-client-go v0.6.8 h1:fZdZH73aPRszu2fazyeeahQEz34tyn1Pt9EkqJmV100= github.com/milvus-io/pulsar-client-go v0.6.8/go.mod h1:oFIlYIk23tamkSLttw849qphmMIpHY8ztEBWDWJW+sc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/core/src/pb/common.pb.cc b/internal/core/src/pb/common.pb.cc index ad394c8d4e..a2125d7fde 100644 --- a/internal/core/src/pb/common.pb.cc +++ b/internal/core/src/pb/common.pb.cc @@ -447,37 +447,38 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE( "\r\n\tExecuting\020\001\022\r\n\tCompleted\020\002*X\n\020Consist" "encyLevel\022\n\n\006Strong\020\000\022\013\n\007Session\020\001\022\013\n\007Bo" "unded\020\002\022\016\n\nEventually\020\003\022\016\n\nCustomized\020\004*" - "\213\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Im" + "\236\001\n\013ImportState\022\021\n\rImportPending\020\000\022\020\n\014Im" "portFailed\020\001\022\021\n\rImportStarted\020\002\022\023\n\017Impor" - "tPersisted\020\005\022\023\n\017ImportCompleted\020\006\022\032\n\026Imp" - "ortFailedAndCleaned\020\007*2\n\nObjectType\022\016\n\nC" - "ollection\020\000\022\n\n\006Global\020\001\022\010\n\004User\020\002*\206\005\n\017Ob" - "jectPrivilege\022\020\n\014PrivilegeAll\020\000\022\035\n\031Privi" - "legeCreateCollection\020\001\022\033\n\027PrivilegeDropC" - "ollection\020\002\022\037\n\033PrivilegeDescribeCollecti" - "on\020\003\022\034\n\030PrivilegeShowCollections\020\004\022\021\n\rPr" - "ivilegeLoad\020\005\022\024\n\020PrivilegeRelease\020\006\022\027\n\023P" - "rivilegeCompaction\020\007\022\023\n\017PrivilegeInsert\020" - "\010\022\023\n\017PrivilegeDelete\020\t\022\032\n\026PrivilegeGetSt" - "atistics\020\n\022\030\n\024PrivilegeCreateIndex\020\013\022\030\n\024" - "PrivilegeIndexDetail\020\014\022\026\n\022PrivilegeDropI" - "ndex\020\r\022\023\n\017PrivilegeSearch\020\016\022\022\n\016Privilege" - "Flush\020\017\022\022\n\016PrivilegeQuery\020\020\022\030\n\024Privilege" - "LoadBalance\020\021\022\023\n\017PrivilegeImport\020\022\022\034\n\030Pr" - "ivilegeCreateOwnership\020\023\022\027\n\023PrivilegeUpd" - "ateUser\020\024\022\032\n\026PrivilegeDropOwnership\020\025\022\034\n" - "\030PrivilegeSelectOwnership\020\026\022\034\n\030Privilege" - "ManageOwnership\020\027\022\027\n\023PrivilegeSelectUser" - "\020\030*S\n\tStateCode\022\020\n\014Initializing\020\000\022\013\n\007Hea" - "lthy\020\001\022\014\n\010Abnormal\020\002\022\013\n\007StandBy\020\003\022\014\n\010Sto" - "pping\020\004*c\n\tLoadState\022\025\n\021LoadStateNotExis" - "t\020\000\022\024\n\020LoadStateNotLoad\020\001\022\024\n\020LoadStateLo" - "ading\020\002\022\023\n\017LoadStateLoaded\020\003:^\n\021privileg" - "e_ext_obj\022\037.google.protobuf.MessageOptio" - "ns\030\351\007 \001(\0132!.milvus.proto.common.Privileg" - "eExtBf\n\016io.milvus.grpcB\013CommonProtoP\001Z1g" - "ithub.com/milvus-io/milvus-proto/go-api/" - "commonpb\240\001\001\252\002\016IO.Milvus.Grpcb\006proto3" + "tPersisted\020\005\022\021\n\rImportFlushed\020\010\022\023\n\017Impor" + "tCompleted\020\006\022\032\n\026ImportFailedAndCleaned\020\007" + "*2\n\nObjectType\022\016\n\nCollection\020\000\022\n\n\006Global" + "\020\001\022\010\n\004User\020\002*\206\005\n\017ObjectPrivilege\022\020\n\014Priv" + "ilegeAll\020\000\022\035\n\031PrivilegeCreateCollection\020" + "\001\022\033\n\027PrivilegeDropCollection\020\002\022\037\n\033Privil" + "egeDescribeCollection\020\003\022\034\n\030PrivilegeShow" + "Collections\020\004\022\021\n\rPrivilegeLoad\020\005\022\024\n\020Priv" + "ilegeRelease\020\006\022\027\n\023PrivilegeCompaction\020\007\022" + "\023\n\017PrivilegeInsert\020\010\022\023\n\017PrivilegeDelete\020" + "\t\022\032\n\026PrivilegeGetStatistics\020\n\022\030\n\024Privile" + "geCreateIndex\020\013\022\030\n\024PrivilegeIndexDetail\020" + "\014\022\026\n\022PrivilegeDropIndex\020\r\022\023\n\017PrivilegeSe" + "arch\020\016\022\022\n\016PrivilegeFlush\020\017\022\022\n\016PrivilegeQ" + "uery\020\020\022\030\n\024PrivilegeLoadBalance\020\021\022\023\n\017Priv" + "ilegeImport\020\022\022\034\n\030PrivilegeCreateOwnershi" + "p\020\023\022\027\n\023PrivilegeUpdateUser\020\024\022\032\n\026Privileg" + "eDropOwnership\020\025\022\034\n\030PrivilegeSelectOwner" + "ship\020\026\022\034\n\030PrivilegeManageOwnership\020\027\022\027\n\023" + "PrivilegeSelectUser\020\030*S\n\tStateCode\022\020\n\014In" + "itializing\020\000\022\013\n\007Healthy\020\001\022\014\n\010Abnormal\020\002\022" + "\013\n\007StandBy\020\003\022\014\n\010Stopping\020\004*c\n\tLoadState\022" + "\025\n\021LoadStateNotExist\020\000\022\024\n\020LoadStateNotLo" + "ad\020\001\022\024\n\020LoadStateLoading\020\002\022\023\n\017LoadStateL" + "oaded\020\003:^\n\021privilege_ext_obj\022\037.google.pr" + "otobuf.MessageOptions\030\351\007 \001(\0132!.milvus.pr" + "oto.common.PrivilegeExtBf\n\016io.milvus.grp" + "cB\013CommonProtoP\001Z1github.com/milvus-io/m" + "ilvus-proto/go-api/commonpb\240\001\001\252\002\016IO.Milv" + "us.Grpcb\006proto3" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = { &::descriptor_table_google_2fprotobuf_2fdescriptor_2eproto, @@ -498,7 +499,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once; static bool descriptor_table_common_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = { - &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5516, + &descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 5535, &descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 11, 1, schemas, file_default_instances, TableStruct_common_2eproto::offsets, file_level_metadata_common_2eproto, 11, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto, @@ -777,6 +778,7 @@ bool ImportState_IsValid(int value) { case 5: case 6: case 7: + case 8: return true; default: return false; diff --git a/internal/core/src/pb/common.pb.h b/internal/core/src/pb/common.pb.h index 6d49c272d3..ee5b9e6684 100644 --- a/internal/core/src/pb/common.pb.h +++ b/internal/core/src/pb/common.pb.h @@ -461,6 +461,7 @@ enum ImportState : int { ImportFailed = 1, ImportStarted = 2, ImportPersisted = 5, + ImportFlushed = 8, ImportCompleted = 6, ImportFailedAndCleaned = 7, ImportState_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(), @@ -468,7 +469,7 @@ enum ImportState : int { }; bool ImportState_IsValid(int value); constexpr ImportState ImportState_MIN = ImportPending; -constexpr ImportState ImportState_MAX = ImportFailedAndCleaned; +constexpr ImportState ImportState_MAX = ImportFlushed; constexpr int ImportState_ARRAYSIZE = ImportState_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* ImportState_descriptor(); diff --git a/internal/rootcoord/broker.go b/internal/rootcoord/broker.go index a2f096e5d2..06f182904c 100644 --- a/internal/rootcoord/broker.go +++ b/internal/rootcoord/broker.go @@ -44,6 +44,7 @@ type Broker interface { Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) UnsetIsImportingState(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) MarkSegmentsDropped(context.Context, *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) + GetSegmentStates(context.Context, *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) @@ -203,6 +204,10 @@ func (b *ServerBroker) MarkSegmentsDropped(ctx context.Context, req *datapb.Mark return b.s.dataCoord.MarkSegmentsDropped(ctx, req) } +func (b *ServerBroker) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return b.s.dataCoord.GetSegmentStates(ctx, req) +} + func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error { rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{ CollectionID: collID, diff --git a/internal/rootcoord/import_helper.go b/internal/rootcoord/import_helper.go index 0f43b260c3..6349865793 100644 --- a/internal/rootcoord/import_helper.go +++ b/internal/rootcoord/import_helper.go @@ -15,6 +15,7 @@ type GetCollectionNameFunc func(collID, partitionID UniqueID) (string, string, e type IDAllocator func(count uint32) (UniqueID, UniqueID, error) type ImportFunc func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) type MarkSegmentsDroppedFunc func(ctx context.Context, segIDs []int64) (*commonpb.Status, error) +type GetSegmentStatesFunc func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) type DescribeIndexFunc func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) type GetSegmentIndexStateFunc func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) type UnsetIsImportingStateFunc func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) @@ -24,6 +25,7 @@ type ImportFactory interface { NewIDAllocator() IDAllocator NewImportFunc() ImportFunc NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc + NewGetSegmentStatesFunc() GetSegmentStatesFunc NewDescribeIndexFunc() DescribeIndexFunc NewGetSegmentIndexStateFunc() GetSegmentIndexStateFunc NewUnsetIsImportingStateFunc() UnsetIsImportingStateFunc @@ -49,6 +51,10 @@ func (f ImportFactoryImpl) NewMarkSegmentsDroppedFunc() MarkSegmentsDroppedFunc return MarkSegmentsDroppedWithCore(f.c) } +func (f ImportFactoryImpl) NewGetSegmentStatesFunc() GetSegmentStatesFunc { + return GetSegmentStatesWithCore(f.c) +} + func (f ImportFactoryImpl) NewDescribeIndexFunc() DescribeIndexFunc { return DescribeIndexWithCore(f.c) } @@ -102,6 +108,12 @@ func MarkSegmentsDroppedWithCore(c *Core) MarkSegmentsDroppedFunc { } } +func GetSegmentStatesWithCore(c *Core) GetSegmentStatesFunc { + return func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return c.broker.GetSegmentStates(ctx, req) + } +} + func DescribeIndexWithCore(c *Core) DescribeIndexFunc { return func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { return c.broker.DescribeIndex(ctx, colID) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 47b5e4728c..4b1a93e788 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -36,12 +36,12 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/typeutil" - + "github.com/samber/lo" "go.uber.org/zap" ) const ( - MaxPendingCount = 32 + MaxPendingCount = 5000 // TODO: Make this configurable. delimiter = "/" ) @@ -54,10 +54,16 @@ var checkPendingTasksInterval = 60 * 1000 // default 5*60*1000 milliseconds (5 minutes) var cleanUpLoopInterval = 5 * 60 * 1000 -// flipTaskStateInterval is the default interval to loop through tasks and check if their states needs to be -// flipped/updated, for example, from `ImportPersisted` to `ImportCompleted`. -// default 15 * 1000 milliseconds (15 seconds) -var flipTaskStateInterval = 15 * 1000 +// flipPersistedTaskInterval is the default interval to loop through tasks and check if their states needs to be +// flipped/updated from `ImportPersisted` to `ImportFlushed`. +// default 2 * 1000 milliseconds (2 seconds) +// TODO: Make this configurable. +var flipPersistedTaskInterval = 2 * 1000 + +// flipFlushedTaskInterval is the default interval to loop through tasks and check if their states needs to be +// flipped/updated from `ImportFlushed` to `ImportCompleted`. +// default 5 * 1000 milliseconds (5 seconds) +var flipFlushedTaskInterval = 5 * 1000 // importManager manager for import tasks type importManager struct { @@ -79,6 +85,7 @@ type importManager struct { callImportService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error) callMarkSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) + callGetSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) callDescribeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) callGetSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) callUnsetIsImportingState func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) @@ -89,6 +96,7 @@ func newImportManager(ctx context.Context, client kv.TxnKV, idAlloc func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error), importService func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error), markSegmentsDropped func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error), + getSegmentStates func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error), getCollectionName func(collID, partitionID typeutil.UniqueID) (string, string, error), describeIndex func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error), getSegmentIndexState func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error), @@ -106,6 +114,7 @@ func newImportManager(ctx context.Context, client kv.TxnKV, idAllocator: idAlloc, callImportService: importService, callMarkSegmentsDropped: markSegmentsDropped, + callGetSegmentStates: getSegmentStates, getCollectionName: getCollectionName, callDescribeIndex: describeIndex, callGetSegmentIndexState: getSegmentIndexState, @@ -149,17 +158,24 @@ func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) { // flipTaskStateLoop periodically calls `flipTaskState` to check if states of the tasks need to be updated. func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { defer wg.Done() - ticker := time.NewTicker(time.Duration(flipTaskStateInterval) * time.Millisecond) - defer ticker.Stop() + flipPersistedTicker := time.NewTicker(time.Duration(flipPersistedTaskInterval) * time.Millisecond) + flipFlushedTicker := time.NewTicker(time.Duration(flipFlushedTaskInterval) * time.Millisecond) + defer flipPersistedTicker.Stop() + defer flipFlushedTicker.Stop() for { select { case <-m.ctx.Done(): log.Debug("import manager context done, exit check flipTaskStateLoop") return - case <-ticker.C: - log.Debug("start trying to flip task state") - if err := m.flipTaskState(m.ctx); err != nil { - log.Error("failed to flip task state", zap.Error(err)) + case <-flipPersistedTicker.C: + log.Debug("start trying to flip ImportPersisted task") + if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { + log.Error("failed to flip ImportPersisted task", zap.Error(err)) + } + case <-flipFlushedTicker.C: + log.Debug("start trying to flip ImportFlushed task") + if err := m.loadAndFlipFlushedTasks(m.ctx); err != nil { + log.Error("failed to flip ImportPersisted task", zap.Error(err)) } } } @@ -269,61 +285,127 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { return nil } -// flipTaskState checks every import task and flips their import state if eligible. -func (m *importManager) flipTaskState(ctx context.Context) error { +// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to +// `ImportFlushed` if eligible. +func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { var importTasks []*datapb.ImportTaskInfo var err error if importTasks, err = m.loadFromTaskStore(false); err != nil { log.Error("failed to load from task store", zap.Error(err)) return err } + for _, task := range importTasks { + // Checking if ImportPersisted --> ImportFlushed ready. if task.GetState().GetStateCode() == commonpb.ImportState_ImportPersisted { - log.Info(" task found, checking if it is eligible to become ", + log.Info(" task found, checking if it is eligible to become ", zap.Int64("task ID", task.GetId())) + importTask := m.getTaskState(task.GetId()) + + // if this method failed, skip this task, try again in next round + if err = m.flipTaskFlushedState(ctx, importTask, task.GetDatanodeId()); err != nil { + log.Error("failed to flip task flushed state", + zap.Int64("task ID", task.GetId()), + zap.Error(err)) + } + } + } + return nil +} + +// loadAndFlipFlushedTasks checks every import task in `ImportFlushed` state and flips their import state to +// `ImportComplete` if eligible. +func (m *importManager) loadAndFlipFlushedTasks(ctx context.Context) error { + var importTasks []*datapb.ImportTaskInfo + var err error + if importTasks, err = m.loadFromTaskStore(false); err != nil { + log.Error("failed to load from task store", zap.Error(err)) + return err + } + + for _, task := range importTasks { + if task.GetState().GetStateCode() == commonpb.ImportState_ImportFlushed { + log.Info(" task found, checking if it is eligible to become ", + zap.Int64("task ID", task.GetId())) + importTask := m.getTaskState(task.GetId()) // TODO: if collection or partition has been dropped before the task complete, // we need to set the task to failed, because the checkIndexingDone() cannot know // whether the collection has been dropped. // if this method failed, skip this task, try again in next round - m.flipTaskIndexState(ctx, task.GetId()) + if err = m.flipTaskIndexState(ctx, importTask); err != nil { + log.Error("failed to flip task index state", + zap.Int64("task ID", task.GetId()), + zap.Error(err)) + } } } return nil } -func (m *importManager) flipTaskIndexState(ctx context.Context, taskID int64) error { - resp := m.getTaskState(taskID) - ok, err := m.checkIndexingDone(ctx, resp.GetCollectionId(), resp.GetSegmentIds()) +func (m *importManager) flipTaskFlushedState(ctx context.Context, importTask *milvuspb.GetImportStateResponse, dataNodeID int64) error { + ok, err := m.checkFlushDone(ctx, importTask.GetSegmentIds()) if err != nil { - log.Error("an error occurred while checking index state of segments", - zap.Int64("task ID", taskID), + log.Error("an error occurred while checking flush state of segments", + zap.Int64("task ID", importTask.GetId()), zap.Error(err)) - // Failed to check indexing state of segments return err } if ok { - if err := m.setImportTaskState(resp.GetId(), commonpb.ImportState_ImportCompleted); err != nil { + // All segments are flushed. DataNode becomes available. + func() { + m.busyNodesLock.Lock() + defer m.busyNodesLock.Unlock() + delete(m.busyNodes, dataNodeID) + log.Info("a DataNode is no longer busy after processing task", + zap.Int64("dataNode ID", dataNodeID), + zap.Int64("task ID", importTask.GetId())) + + }() + if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportFlushed); err != nil { log.Error("failed to set import task state", - zap.Int64("task ID", resp.GetId()), + zap.Int64("task ID", importTask.GetId()), + zap.Any("target state", commonpb.ImportState_ImportFlushed), + zap.Error(err)) + return err + } + if err = m.sendOutTasks(m.ctx); err != nil { + log.Error("fail to send out import task to DataNodes", + zap.Int64("task ID", importTask.GetId())) + } + } + return nil +} + +func (m *importManager) flipTaskIndexState(ctx context.Context, importTask *milvuspb.GetImportStateResponse) error { + ok, err := m.checkIndexingDone(ctx, importTask.GetCollectionId(), importTask.GetSegmentIds()) + if err != nil { + log.Error("an error occurred while checking index state of segments", + zap.Int64("task ID", importTask.GetId()), + zap.Error(err)) + return err + } + if ok { + if err := m.setImportTaskState(importTask.GetId(), commonpb.ImportState_ImportCompleted); err != nil { + log.Error("failed to set import task state", + zap.Int64("task ID", importTask.GetId()), zap.Any("target state", commonpb.ImportState_ImportCompleted), zap.Error(err)) - // Failed to update task's state return err } log.Info("indexes are successfully built and the import task has complete!", - zap.Int64("task ID", resp.GetId())) + zap.Int64("task ID", importTask.GetId())) log.Info("now start unsetting isImporting state of segments", - zap.Int64("task ID", resp.GetId()), - zap.Int64s("segment IDs", resp.GetSegmentIds())) + zap.Int64("task ID", importTask.GetId()), + zap.Int64s("segment IDs", importTask.GetSegmentIds())) // Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state. if m.callUnsetIsImportingState == nil { log.Error("callUnsetIsImportingState function of importManager is nil") return fmt.Errorf("failed to describe index: segment state method of import manager is nil") } status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{ - SegmentIds: resp.GetSegmentIds(), + SegmentIds: importTask.GetSegmentIds(), }) if err != nil { log.Error("failed to unset importing state of all segments (could be partial failure)", @@ -340,6 +422,31 @@ func (m *importManager) flipTaskIndexState(ctx context.Context, taskID int64) er return nil } +// checkFlushDone checks if flush is done on given segments. +func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (bool, error) { + resp, err := m.callGetSegmentStates(ctx, &datapb.GetSegmentStatesRequest{ + SegmentIDs: segIDs, + }) + if err != nil { + log.Error("failed to get import task segment states", + zap.Int64s("segment IDs", segIDs)) + return false, err + } + getSegmentStates := func(segment *datapb.SegmentStateInfo, _ int) string { + return segment.GetState().String() + } + log.Debug("checking import segment states", + zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates))) + for _, states := range resp.GetStates() { + // Flushed segment could get compacted, so only returns false if there are still importing segments. + if states.GetState() == commonpb.SegmentState_Importing || + states.GetState() == commonpb.SegmentState_Sealed { + return false, nil + } + } + return true, nil +} + // checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`. // It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise. func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) { @@ -627,17 +734,6 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im if err != nil { return nil, err } - - // if is ImportState_ImportPersisted, and index is FLAT, set the task to be complated immediately - // this method is called from importWrapper.reportPersisted() to rootCoord.ReportImport(), - // if flipTaskIndexState failed, the outer caller(importWrapper) will retry 3 times - if ir.GetState() == commonpb.ImportState_ImportPersisted { - err = m.flipTaskIndexState(m.ctx, updatedInfo.GetId()) - if err != nil { - return nil, err - } - } - return updatedInfo, nil } diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index cb20c0ee11..1e91dfda9b 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -69,13 +69,23 @@ func TestImportManager_NewImportManager(t *testing.T) { }, CreateTs: time.Now().Unix() - 100, } + ti3 := &datapb.ImportTaskInfo{ + Id: 300, + State: &datapb.ImportTaskState{ + StateCode: commonpb.ImportState_ImportFlushed, + }, + CreateTs: time.Now().Unix() - 100, + } taskInfo1, err := proto.Marshal(ti1) assert.NoError(t, err) taskInfo2, err := proto.Marshal(ti2) assert.NoError(t, err) + taskInfo3, err := proto.Marshal(ti3) + assert.NoError(t, err) mockKv.Save(BuildImportTaskKey(1), "value") mockKv.Save(BuildImportTaskKey(100), string(taskInfo1)) mockKv.Save(BuildImportTaskKey(200), string(taskInfo2)) + mockKv.Save(BuildImportTaskKey(300), string(taskInfo3)) mockCallImportServiceErr := false callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { @@ -97,13 +107,20 @@ func TestImportManager_NewImportManager(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } var wg sync.WaitGroup wg.Add(1) t.Run("working task expired", func(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) // there are 2 tasks read from store, one is pending, the other is persisted. @@ -135,7 +152,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) var wgLoop sync.WaitGroup @@ -154,7 +171,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) assert.Panics(t, func() { mgr.init(context.TODO()) @@ -171,7 +188,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) }) @@ -185,7 +202,7 @@ func TestImportManager_NewImportManager(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) defer cancel() - mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockTxnKV, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) mgr.init(context.TODO()) func() { @@ -205,7 +222,7 @@ func TestImportManager_NewImportManager(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) mgr.init(ctx) var wgLoop sync.WaitGroup @@ -263,7 +280,7 @@ func TestImportManager_TestSetImportTaskState(t *testing.T) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, nil, nil, nil, nil, nil, nil, nil) assert.NotNil(t, mgr) _, err := mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -356,9 +373,16 @@ func TestImportManager_TestEtcdCleanUp(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) assert.NotNil(t, mgr) _, err = mgr.loadFromTaskStore(true) assert.NoError(t, err) @@ -407,13 +431,23 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, CreateTs: time.Now().Unix() - 100, } + ti3 := &datapb.ImportTaskInfo{ + Id: 300, + State: &datapb.ImportTaskState{ + StateCode: commonpb.ImportState_ImportFlushed, + Segments: []int64{204, 205, 206}, + }, + CreateTs: time.Now().Unix() - 100, + } taskInfo1, err := proto.Marshal(ti1) assert.NoError(t, err) taskInfo2, err := proto.Marshal(ti2) assert.NoError(t, err) - mockKv.Save(BuildImportTaskKey(1), "value") + taskInfo3, err := proto.Marshal(ti3) + assert.NoError(t, err) mockKv.Save(BuildImportTaskKey(100), string(taskInfo1)) mockKv.Save(BuildImportTaskKey(200), string(taskInfo2)) + mockKv.Save(BuildImportTaskKey(300), string(taskInfo3)) mockCallImportServiceErr := false callImportServiceFn := func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { @@ -435,6 +469,13 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { return &indexpb.DescribeIndexResponse{ Status: &commonpb.Status{ @@ -448,10 +489,6 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { callGetSegmentIndexState := func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { return []*indexpb.SegmentIndexState{ - { - SegmentID: 200, - State: commonpb.IndexState_Finished, - }, { SegmentID: 201, State: commonpb.IndexState_Finished, @@ -460,6 +497,22 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { SegmentID: 202, State: commonpb.IndexState_Finished, }, + { + SegmentID: 203, + State: commonpb.IndexState_Finished, + }, + { + SegmentID: 204, + State: commonpb.IndexState_Finished, + }, + { + SegmentID: 205, + State: commonpb.IndexState_Finished, + }, + { + SegmentID: 206, + State: commonpb.IndexState_Finished, + }, }, nil } callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { @@ -468,7 +521,8 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, nil } - flipTaskStateInterval = 50 + flipPersistedTaskInterval = 20 + flipFlushedTaskInterval = 50 var wg sync.WaitGroup wg.Add(1) t.Run("normal case", func(t *testing.T) { @@ -476,13 +530,13 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) mgr.flipTaskStateLoop(&wgLoop) wgLoop.Wait() - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) }) wg.Add(1) @@ -498,7 +552,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, nil } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -520,7 +574,7 @@ func TestImportManager_TestFlipTaskStateLoop(t *testing.T) { }, nil } mgr := newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, - nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) + callGetSegmentStates, nil, callDescribeIndex, callGetSegmentIndexState, callUnsetIsImportingState) assert.NotNil(t, mgr) var wgLoop sync.WaitGroup wgLoop.Add(1) @@ -549,9 +603,15 @@ func TestImportManager_ImportJob(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } - + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // nil request - mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, nil, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp := mgr.importJob(context.TODO(), nil, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -580,7 +640,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, task count equal to file count // since the importServiceFunc return error, tasks will be kept in pending list rowReq.Files = []string{"f1.json"} - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -593,7 +653,7 @@ func TestImportManager_ImportJob(t *testing.T) { // column-based case, one quest one task // since the importServiceFunc return error, tasks will be kept in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 1, len(mgr.pendingTasks)) assert.Equal(t, 0, len(mgr.workingTasks)) @@ -607,13 +667,13 @@ func TestImportManager_ImportJob(t *testing.T) { } // row-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks)) // column-based case, since the importServiceFunc return success, tasks will be sent to working list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -637,7 +697,7 @@ func TestImportManager_ImportJob(t *testing.T) { // row-based case, since the importServiceFunc return success for 1 task // the first task is sent to working list, and 1 task left in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, 0, len(mgr.pendingTasks)) assert.Equal(t, 1, len(mgr.workingTasks)) @@ -711,9 +771,16 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // each data node owns one task - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) for i := 0; i < len(dnList); i++ { resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -722,7 +789,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { } // all data nodes are busy, new task waiting in pending list - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) @@ -730,7 +797,7 @@ func TestImportManager_AllDataNodesBusy(t *testing.T) { // now all data nodes are free again, new task is executed instantly count = 0 - mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr = newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp = mgr.importJob(context.TODO(), colReq, colID, 0) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -785,9 +852,16 @@ func TestImportManager_TaskState(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } // add 3 tasks, their ID is 10000, 10001, 10002, make sure updateTaskInfo() works correctly - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) mgr.importJob(context.TODO(), rowReq, colID, 0) rowReq.Files = []string{"f2.json"} mgr.importJob(context.TODO(), rowReq, colID, 0) @@ -817,51 +891,6 @@ func TestImportManager_TaskState(t *testing.T) { }, } - // callDescribeIndex method is nil - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, nil - } - - // describe index failed, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { - return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - }, - }, nil - } - // index doesn't exist, but callUnsetIsImportingState is nil, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, nil - } - // index doesn't exist, but failed to unset importing state, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, errors.New("error to unset importing state") - } - // index doesn't exist, but failed to unset importing state, return error - _, err = mgr.updateTaskInfo(info) - assert.Error(t, err) - mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) { return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -875,7 +904,7 @@ func TestImportManager_TaskState(t *testing.T) { assert.Equal(t, int64(100), ti.GetCollectionId()) assert.Equal(t, int64(0), ti.GetPartitionId()) assert.Equal(t, []string{"f2.json"}, ti.GetFiles()) - assert.Equal(t, commonpb.ImportState_ImportCompleted, ti.GetState().GetStateCode()) + assert.Equal(t, commonpb.ImportState_ImportPersisted, ti.GetState().GetStateCode()) assert.Equal(t, int64(1000), ti.GetState().GetRowCount()) resp := mgr.getTaskState(10000) @@ -883,7 +912,7 @@ func TestImportManager_TaskState(t *testing.T) { resp = mgr.getTaskState(2) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) - assert.Equal(t, commonpb.ImportState_ImportCompleted, resp.State) + assert.Equal(t, commonpb.ImportState_ImportPersisted, resp.State) resp = mgr.getTaskState(1) assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -939,7 +968,14 @@ func TestImportManager_AllocFail(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } - mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, nil, nil, nil, nil) + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } + mgr := newImportManager(context.TODO(), mockKv, idAlloc, importServiceFunc, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp := mgr.importJob(context.TODO(), rowReq, colID, 0) assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) assert.Equal(t, 0, len(mgr.pendingTasks)) @@ -972,6 +1008,13 @@ func TestImportManager_ListAllTasks(t *testing.T) { ErrorCode: commonpb.ErrorCode_Success, }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } colID1 := int64(100) colID2 := int64(101) @@ -1000,7 +1043,7 @@ func TestImportManager_ListAllTasks(t *testing.T) { } mockKv := memkv.NewMemoryKV() - mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, getCollectionName, nil, nil, nil) + mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn, callMarkSegmentsDropped, callGetSegmentStates, getCollectionName, nil, nil, nil) // add 10 tasks for collection1, id from 1 to 10 file1 := "f1.json" @@ -1196,13 +1239,19 @@ func TestImportManager_checkIndexingDone(t *testing.T) { callGetSegmentIndexState: func(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error) { return nil, errors.New("error") }, + callGetSegmentStates: func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return nil, errors.New("error") + }, } segmentsID := []typeutil.UniqueID{1, 2, 3} + done, err := mgr.checkFlushDone(ctx, segmentsID) + assert.False(t, done) + assert.Error(t, err) // check index of 3 segments // callDescribeIndex() failed - done, err := mgr.checkIndexingDone(ctx, 1, segmentsID) + done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.False(t, done) assert.Error(t, err) @@ -1214,6 +1263,9 @@ func TestImportManager_checkIndexingDone(t *testing.T) { }, nil } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.False(t, done) + assert.Error(t, err) // callDescribeIndex() unexpected error done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.False(t, done) @@ -1226,7 +1278,31 @@ func TestImportManager_checkIndexingDone(t *testing.T) { }, }, nil } + mgr.callGetSegmentStates = func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + States: []*datapb.SegmentStateInfo{ + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed, + }, + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed, + }, + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed, + }, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.True(t, done) + assert.NoError(t, err) // callDescribeIndex() index not exist done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.True(t, done) @@ -1245,6 +1321,9 @@ func TestImportManager_checkIndexingDone(t *testing.T) { }, nil } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.True(t, done) + assert.NoError(t, err) // callGetSegmentIndexState() failed done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.False(t, done) @@ -1258,6 +1337,9 @@ func TestImportManager_checkIndexingDone(t *testing.T) { }, nil } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.True(t, done) + assert.NoError(t, err) // only 1 segment indexed done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.False(t, done) @@ -1277,8 +1359,36 @@ func TestImportManager_checkIndexingDone(t *testing.T) { }, nil } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.True(t, done) + assert.NoError(t, err) // all segments indexed done, err = mgr.checkIndexingDone(ctx, 1, segmentsID) assert.True(t, done) assert.Nil(t, err) + + mgr.callGetSegmentStates = func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + States: []*datapb.SegmentStateInfo{ + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed, + }, + { + SegmentID: 1, + State: commonpb.SegmentState_Flushed, + }, + { + SegmentID: 1, + State: commonpb.SegmentState_Importing, + }, + }, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } + done, err = mgr.checkFlushDone(ctx, segmentsID) + assert.False(t, done) + assert.NoError(t, err) } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 65ca19ac13..0259d2efb6 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -422,6 +422,7 @@ func (c *Core) initImportManager() error { f.NewIDAllocator(), f.NewImportFunc(), f.NewMarkSegmentsDroppedFunc(), + f.NewGetSegmentStatesFunc(), f.NewGetCollectionNameFunc(), f.NewDescribeIndexFunc(), f.NewGetSegmentIndexStateFunc(), @@ -1763,28 +1764,6 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( if code, ok := c.checkHealthy(); !ok { return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil } - // If setting ImportState_ImportCompleted, simply update the state and return directly. - if ir.GetState() == commonpb.ImportState_ImportCompleted { - if err := c.importManager.setImportTaskState(ir.GetTaskId(), commonpb.ImportState_ImportCompleted); err != nil { - errMsg := "failed to set import task as ImportState_ImportCompleted" - log.Error(errMsg, zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("%s %s", errMsg, err.Error()), - }, nil - } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil - } - // Upon receiving ReportImport request, update the related task's state in task store. - ti, err := c.importManager.updateTaskInfo(ir) - if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, - Reason: err.Error(), - }, nil - } // This method update a busy node to idle node, and send import task to idle node resendTaskFunc := func() { @@ -1803,6 +1782,19 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( } } + // If setting ImportState_ImportCompleted, simply update the state and return directly. + if ir.GetState() == commonpb.ImportState_ImportCompleted { + log.Warn("this should not be called!") + } + // Upon receiving ReportImport request, update the related task's state in task store. + ti, err := c.importManager.updateTaskInfo(ir) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure, + Reason: err.Error(), + }, nil + } + // If task failed, send task to idle datanode if ir.GetState() == commonpb.ImportState_ImportFailed { // When a DataNode failed importing, remove this DataNode from the busy node list and send out import tasks again. @@ -1816,9 +1808,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( resendTaskFunc() } else { // Here ir.GetState() == commonpb.ImportState_ImportPersisted - // When a DataNode finishes importing, remove this DataNode from the busy node list and send out import tasks again. - resendTaskFunc() - // Flush all import data segments. + // Seal these import segments, so they can be auto-flushed later. if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index d96172a802..a4ff27a683 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -911,7 +911,7 @@ func TestCore_GetImportState(t *testing.T) { t.Run("normal case", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil, nil) resp, err := c.GetImportState(ctx, &milvuspb.GetImportStateRequest{ Task: 100, }) @@ -995,7 +995,7 @@ func TestCore_ListImportTasks(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode(), withMeta(meta)) - c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, nil, nil, nil, nil, nil, nil, nil, nil) // list all tasks resp, err := c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{}) @@ -1135,6 +1135,14 @@ func TestCore_ReportImport(t *testing.T) { }, nil } + callGetSegmentStates := func(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { + return &datapb.GetSegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + }, nil + } + callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) { return &indexpb.DescribeIndexResponse{ Status: &commonpb.Status{ @@ -1157,25 +1165,10 @@ func TestCore_ReportImport(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - t.Run("report complete import", func(t *testing.T) { - ctx := context.Background() - c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) - resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ - TaskId: 100, - State: commonpb.ImportState_ImportCompleted, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - // Change the state back. - err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) - assert.NoError(t, err) - }) - t.Run("report complete import with task not found", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 101, State: commonpb.ImportState_ImportCompleted, @@ -1187,7 +1180,7 @@ func TestCore_ReportImport(t *testing.T) { t.Run("report import started state", func(t *testing.T) { ctx := context.Background() c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil) + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil, nil, nil) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx) resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ @@ -1210,7 +1203,7 @@ func TestCore_ReportImport(t *testing.T) { withTtSynchronizer(ticker), withDataCoord(dc)) c.broker = newServerBroker(c) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, + c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, callDescribeIndex, nil, callUnsetIsImportingState) c.importManager.loadFromTaskStore(true) c.importManager.sendOutTasks(ctx)