2022-10-08 07:38:58 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"context"
"fmt"
2022-10-17 10:01:25 +00:00
"strconv"
2022-10-08 07:38:58 +00:00
2023-02-26 03:31:49 +00:00
"github.com/cockroachdb/errors"
2022-10-08 07:38:58 +00:00
"go.uber.org/zap"
2022-10-16 12:49:27 +00:00
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/internal/proto/indexpb"
2022-10-14 09:51:24 +00:00
"github.com/milvus-io/milvus/internal/types"
2023-04-06 11:14:32 +00:00
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
2022-10-08 07:38:58 +00:00
)
const (
CreateIndexTaskName = "CreateIndexTask"
DescribeIndexTaskName = "DescribeIndexTask"
DropIndexTaskName = "DropIndexTask"
GetIndexStateTaskName = "GetIndexStateTask"
GetIndexBuildProgressTaskName = "GetIndexBuildProgressTask"
AutoIndexName = "AUTOINDEX"
DimKey = common . DimKey
)
type createIndexTask struct {
Condition
req * milvuspb . CreateIndexRequest
ctx context . Context
rootCoord types . RootCoord
2023-01-04 11:37:36 +00:00
datacoord types . DataCoord
2022-10-27 05:05:31 +00:00
queryCoord types . QueryCoord
2022-10-08 07:38:58 +00:00
result * commonpb . Status
isAutoIndex bool
newIndexParams [ ] * commonpb . KeyValuePair
2023-03-26 14:15:59 +00:00
newTypeParams [ ] * commonpb . KeyValuePair
2022-10-08 07:38:58 +00:00
collectionID UniqueID
fieldSchema * schemapb . FieldSchema
}
func ( cit * createIndexTask ) TraceCtx ( ) context . Context {
return cit . ctx
}
func ( cit * createIndexTask ) ID ( ) UniqueID {
return cit . req . GetBase ( ) . GetMsgID ( )
}
func ( cit * createIndexTask ) SetID ( uid UniqueID ) {
cit . req . GetBase ( ) . MsgID = uid
}
func ( cit * createIndexTask ) Name ( ) string {
return CreateIndexTaskName
}
func ( cit * createIndexTask ) Type ( ) commonpb . MsgType {
return cit . req . GetBase ( ) . GetMsgType ( )
}
func ( cit * createIndexTask ) BeginTs ( ) Timestamp {
return cit . req . GetBase ( ) . GetTimestamp ( )
}
func ( cit * createIndexTask ) EndTs ( ) Timestamp {
return cit . req . GetBase ( ) . GetTimestamp ( )
}
func ( cit * createIndexTask ) SetTs ( ts Timestamp ) {
cit . req . Base . Timestamp = ts
}
func ( cit * createIndexTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
cit . req . Base = commonpbutil . NewMsgBase ( )
2022-10-08 07:38:58 +00:00
return nil
}
func ( cit * createIndexTask ) parseIndexParams ( ) error {
isVecIndex := typeutil . IsVectorType ( cit . fieldSchema . DataType )
indexParamsMap := make ( map [ string ] string )
if ! isVecIndex {
if cit . fieldSchema . DataType == schemapb . DataType_VarChar {
indexParamsMap [ common . IndexTypeKey ] = DefaultStringIndexType
} else {
indexParamsMap [ common . IndexTypeKey ] = DefaultIndexType
}
}
for _ , kv := range cit . req . GetExtraParams ( ) {
if kv . Key == common . IndexParamsKey {
2022-12-07 10:01:19 +00:00
params , err := funcutil . JSONToMap ( kv . Value )
2022-10-08 07:38:58 +00:00
if err != nil {
return err
}
for k , v := range params {
indexParamsMap [ k ] = v
}
} else {
indexParamsMap [ kv . Key ] = kv . Value
}
}
if isVecIndex {
specifyIndexType , exist := indexParamsMap [ common . IndexTypeKey ]
2022-12-07 10:01:19 +00:00
if Params . AutoIndexConfig . Enable . GetAsBool ( ) {
2023-04-30 03:50:40 +00:00
log . Info ( "create index trigger AutoIndex" ,
zap . String ( "original type" , specifyIndexType ) ,
zap . String ( "final type" , Params . AutoIndexConfig . AutoIndexTypeName . GetValue ( ) ) )
// override params by autoindex
2022-12-07 10:01:19 +00:00
for k , v := range Params . AutoIndexConfig . IndexParams . GetAsJSONMap ( ) {
2022-10-08 07:38:58 +00:00
indexParamsMap [ k ] = v
}
} else {
if ! exist {
return fmt . Errorf ( "IndexType not specified" )
}
}
2022-10-14 09:51:24 +00:00
indexType , exist := indexParamsMap [ common . IndexTypeKey ]
if ! exist {
return fmt . Errorf ( "IndexType not specified" )
}
if indexType == indexparamcheck . IndexDISKANN {
2022-11-04 06:25:38 +00:00
err := indexparams . FillDiskIndexParams ( Params , indexParamsMap )
2022-10-14 09:51:24 +00:00
if err != nil {
return err
}
}
2022-10-08 07:38:58 +00:00
err := checkTrain ( cit . fieldSchema , indexParamsMap )
if err != nil {
return err
}
}
typeParams := cit . fieldSchema . GetTypeParams ( )
2023-03-26 14:15:59 +00:00
typeParamsMap := make ( map [ string ] string )
2022-10-08 07:38:58 +00:00
for _ , pair := range typeParams {
2023-03-26 14:15:59 +00:00
typeParamsMap [ pair . Key ] = pair . Value
2022-10-08 07:38:58 +00:00
}
for k , v := range indexParamsMap {
//Currently, it is required that type_params and index_params do not have same keys.
2023-03-26 14:15:59 +00:00
if k == DimKey || k == maxVarCharLengthKey {
delete ( indexParamsMap , k )
2022-10-08 07:38:58 +00:00
continue
}
cit . newIndexParams = append ( cit . newIndexParams , & commonpb . KeyValuePair { Key : k , Value : v } )
}
2023-03-26 14:15:59 +00:00
for k , v := range typeParamsMap {
if _ , ok := indexParamsMap [ k ] ; ok {
continue
}
cit . newTypeParams = append ( cit . newTypeParams , & commonpb . KeyValuePair { Key : k , Value : v } )
}
2022-10-08 07:38:58 +00:00
return nil
}
func ( cit * createIndexTask ) getIndexedField ( ctx context . Context ) ( * schemapb . FieldSchema , error ) {
schema , err := globalMetaCache . GetCollectionSchema ( ctx , cit . req . GetCollectionName ( ) )
if err != nil {
log . Error ( "failed to get collection schema" , zap . Error ( err ) )
return nil , fmt . Errorf ( "failed to get collection schema: %s" , err )
}
schemaHelper , err := typeutil . CreateSchemaHelper ( schema )
if err != nil {
log . Error ( "failed to parse collection schema" , zap . Error ( err ) )
return nil , fmt . Errorf ( "failed to parse collection schema: %s" , err )
}
field , err := schemaHelper . GetFieldFromName ( cit . req . GetFieldName ( ) )
if err != nil {
log . Error ( "create index on non-exist field" , zap . Error ( err ) )
return nil , fmt . Errorf ( "cannot create index on non-exist field: %s" , cit . req . GetFieldName ( ) )
}
return field , nil
}
func fillDimension ( field * schemapb . FieldSchema , indexParams map [ string ] string ) error {
vecDataTypes := [ ] schemapb . DataType {
schemapb . DataType_FloatVector ,
schemapb . DataType_BinaryVector ,
}
if ! funcutil . SliceContain ( vecDataTypes , field . GetDataType ( ) ) {
return nil
}
params := make ( [ ] * commonpb . KeyValuePair , 0 , len ( field . GetTypeParams ( ) ) + len ( field . GetIndexParams ( ) ) )
params = append ( params , field . GetTypeParams ( ) ... )
params = append ( params , field . GetIndexParams ( ) ... )
dimensionInSchema , err := funcutil . GetAttrByKeyFromRepeatedKV ( DimKey , params )
if err != nil {
return fmt . Errorf ( "dimension not found in schema" )
}
dimension , exist := indexParams [ DimKey ]
if exist {
if dimensionInSchema != dimension {
return fmt . Errorf ( "dimension mismatch, dimension in schema: %s, dimension: %s" , dimensionInSchema , dimension )
}
} else {
indexParams [ DimKey ] = dimensionInSchema
}
return nil
}
func checkTrain ( field * schemapb . FieldSchema , indexParams map [ string ] string ) error {
indexType := indexParams [ common . IndexTypeKey ]
// skip params check of non-vector field.
vecDataTypes := [ ] schemapb . DataType {
schemapb . DataType_FloatVector ,
schemapb . DataType_BinaryVector ,
}
if ! funcutil . SliceContain ( vecDataTypes , field . GetDataType ( ) ) {
return indexparamcheck . CheckIndexValid ( field . GetDataType ( ) , indexType , indexParams )
}
adapter , err := indexparamcheck . GetConfAdapterMgrInstance ( ) . GetAdapter ( indexType )
if err != nil {
log . Warn ( "Failed to get conf adapter" , zap . String ( common . IndexTypeKey , indexType ) )
return fmt . Errorf ( "invalid index type: %s" , indexType )
}
if err := fillDimension ( field , indexParams ) ; err != nil {
return err
}
ok := adapter . CheckValidDataType ( field . GetDataType ( ) )
if ! ok {
log . Warn ( "Field data type don't support the index build type" , zap . String ( "fieldDataType" , field . GetDataType ( ) . String ( ) ) , zap . String ( "indexType" , indexType ) )
return fmt . Errorf ( "field data type %s don't support the index build type %s" , field . GetDataType ( ) . String ( ) , indexType )
}
ok = adapter . CheckTrain ( indexParams )
if ! ok {
log . Warn ( "Create index with invalid params" , zap . Any ( "index_params" , indexParams ) )
return fmt . Errorf ( "invalid index params: %v" , indexParams )
}
return nil
}
func ( cit * createIndexTask ) PreExecute ( ctx context . Context ) error {
cit . req . Base . MsgType = commonpb . MsgType_CreateIndex
2022-11-04 06:25:38 +00:00
cit . req . Base . SourceID = paramtable . GetNodeID ( )
2022-10-08 07:38:58 +00:00
collName := cit . req . GetCollectionName ( )
collID , err := globalMetaCache . GetCollectionID ( ctx , collName )
if err != nil {
return err
}
cit . collectionID = collID
2022-10-16 13:05:25 +00:00
if err = validateIndexName ( cit . req . GetIndexName ( ) ) ; err != nil {
return err
}
2022-10-08 07:38:58 +00:00
field , err := cit . getIndexedField ( ctx )
if err != nil {
return err
}
cit . fieldSchema = field
// check index param, not accurate, only some static rules
2022-10-27 05:05:31 +00:00
err = cit . parseIndexParams ( )
if err != nil {
return err
}
return nil
2022-10-08 07:38:58 +00:00
}
func ( cit * createIndexTask ) Execute ( ctx context . Context ) error {
log . Debug ( "proxy create index" , zap . Int64 ( "collID" , cit . collectionID ) , zap . Int64 ( "fieldID" , cit . fieldSchema . GetFieldID ( ) ) ,
zap . String ( "indexName" , cit . req . GetIndexName ( ) ) , zap . Any ( "typeParams" , cit . fieldSchema . GetTypeParams ( ) ) ,
zap . Any ( "indexParams" , cit . req . GetExtraParams ( ) ) )
if cit . req . GetIndexName ( ) == "" {
2022-12-07 10:01:19 +00:00
cit . req . IndexName = Params . CommonCfg . DefaultIndexName . GetValue ( ) + "_" + strconv . FormatInt ( cit . fieldSchema . GetFieldID ( ) , 10 )
2022-10-08 07:38:58 +00:00
}
var err error
2023-01-11 06:35:40 +00:00
req := & indexpb . CreateIndexRequest {
2022-10-08 07:38:58 +00:00
CollectionID : cit . collectionID ,
FieldID : cit . fieldSchema . GetFieldID ( ) ,
IndexName : cit . req . GetIndexName ( ) ,
2023-04-09 08:18:29 +00:00
TypeParams : cit . newTypeParams ,
2022-10-08 07:38:58 +00:00
IndexParams : cit . newIndexParams ,
IsAutoIndex : cit . isAutoIndex ,
UserIndexParams : cit . req . GetExtraParams ( ) ,
Timestamp : cit . BeginTs ( ) ,
}
2023-01-04 11:37:36 +00:00
cit . result , err = cit . datacoord . CreateIndex ( ctx , req )
2022-10-08 07:38:58 +00:00
if err != nil {
return err
}
if cit . result . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( cit . result . Reason )
}
return err
}
func ( cit * createIndexTask ) PostExecute ( ctx context . Context ) error {
return nil
}
type describeIndexTask struct {
Condition
* milvuspb . DescribeIndexRequest
2023-01-04 11:37:36 +00:00
ctx context . Context
datacoord types . DataCoord
result * milvuspb . DescribeIndexResponse
2022-10-08 07:38:58 +00:00
collectionID UniqueID
}
func ( dit * describeIndexTask ) TraceCtx ( ) context . Context {
return dit . ctx
}
func ( dit * describeIndexTask ) ID ( ) UniqueID {
return dit . Base . MsgID
}
func ( dit * describeIndexTask ) SetID ( uid UniqueID ) {
dit . Base . MsgID = uid
}
func ( dit * describeIndexTask ) Name ( ) string {
return DescribeIndexTaskName
}
func ( dit * describeIndexTask ) Type ( ) commonpb . MsgType {
return dit . Base . MsgType
}
func ( dit * describeIndexTask ) BeginTs ( ) Timestamp {
return dit . Base . Timestamp
}
func ( dit * describeIndexTask ) EndTs ( ) Timestamp {
return dit . Base . Timestamp
}
func ( dit * describeIndexTask ) SetTs ( ts Timestamp ) {
dit . Base . Timestamp = ts
}
func ( dit * describeIndexTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
dit . Base = commonpbutil . NewMsgBase ( )
2022-10-08 07:38:58 +00:00
return nil
}
func ( dit * describeIndexTask ) PreExecute ( ctx context . Context ) error {
dit . Base . MsgType = commonpb . MsgType_DescribeIndex
2022-11-04 06:25:38 +00:00
dit . Base . SourceID = paramtable . GetNodeID ( )
2022-10-08 07:38:58 +00:00
if err := validateCollectionName ( dit . CollectionName ) ; err != nil {
return err
}
2022-10-25 03:29:30 +00:00
collID , err := globalMetaCache . GetCollectionID ( ctx , dit . CollectionName )
if err != nil {
return err
}
2022-10-08 07:38:58 +00:00
dit . collectionID = collID
return nil
}
func ( dit * describeIndexTask ) Execute ( ctx context . Context ) error {
schema , err := globalMetaCache . GetCollectionSchema ( ctx , dit . GetCollectionName ( ) )
if err != nil {
log . Error ( "failed to get collection schema" , zap . Error ( err ) )
return fmt . Errorf ( "failed to get collection schema: %s" , err )
}
schemaHelper , err := typeutil . CreateSchemaHelper ( schema )
if err != nil {
log . Error ( "failed to parse collection schema" , zap . Error ( err ) )
return fmt . Errorf ( "failed to parse collection schema: %s" , err )
}
2023-01-11 06:35:40 +00:00
resp , err := dit . datacoord . DescribeIndex ( ctx , & indexpb . DescribeIndexRequest { CollectionID : dit . collectionID , IndexName : dit . IndexName } )
2022-10-08 07:38:58 +00:00
if err != nil || resp == nil {
return err
}
dit . result = & milvuspb . DescribeIndexResponse { }
dit . result . Status = resp . GetStatus ( )
if dit . result . Status . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( dit . result . Status . Reason )
}
for _ , indexInfo := range resp . IndexInfos {
field , err := schemaHelper . GetFieldFromID ( indexInfo . FieldID )
if err != nil {
log . Error ( "failed to get collection field" , zap . Error ( err ) )
return fmt . Errorf ( "failed to get collection field: %d" , indexInfo . FieldID )
}
params := indexInfo . GetUserIndexParams ( )
if params == nil {
params = indexInfo . GetIndexParams ( )
}
desc := & milvuspb . IndexDescription {
IndexName : indexInfo . GetIndexName ( ) ,
IndexID : indexInfo . GetIndexID ( ) ,
FieldName : field . Name ,
Params : params ,
IndexedRows : indexInfo . GetIndexedRows ( ) ,
TotalRows : indexInfo . GetTotalRows ( ) ,
State : indexInfo . GetState ( ) ,
IndexStateFailReason : indexInfo . GetIndexStateFailReason ( ) ,
}
dit . result . IndexDescriptions = append ( dit . result . IndexDescriptions , desc )
}
return err
}
func ( dit * describeIndexTask ) PostExecute ( ctx context . Context ) error {
return nil
}
type dropIndexTask struct {
Condition
ctx context . Context
* milvuspb . DropIndexRequest
2023-01-04 11:37:36 +00:00
dataCoord types . DataCoord
2022-10-19 15:17:44 +00:00
queryCoord types . QueryCoord
2022-10-08 07:38:58 +00:00
result * commonpb . Status
collectionID UniqueID
}
func ( dit * dropIndexTask ) TraceCtx ( ) context . Context {
return dit . ctx
}
func ( dit * dropIndexTask ) ID ( ) UniqueID {
return dit . Base . MsgID
}
func ( dit * dropIndexTask ) SetID ( uid UniqueID ) {
dit . Base . MsgID = uid
}
func ( dit * dropIndexTask ) Name ( ) string {
return DropIndexTaskName
}
func ( dit * dropIndexTask ) Type ( ) commonpb . MsgType {
return dit . Base . MsgType
}
func ( dit * dropIndexTask ) BeginTs ( ) Timestamp {
return dit . Base . Timestamp
}
func ( dit * dropIndexTask ) EndTs ( ) Timestamp {
return dit . Base . Timestamp
}
func ( dit * dropIndexTask ) SetTs ( ts Timestamp ) {
dit . Base . Timestamp = ts
}
func ( dit * dropIndexTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
dit . Base = commonpbutil . NewMsgBase ( )
2022-10-08 07:38:58 +00:00
return nil
}
func ( dit * dropIndexTask ) PreExecute ( ctx context . Context ) error {
dit . Base . MsgType = commonpb . MsgType_DropIndex
2022-11-04 06:25:38 +00:00
dit . Base . SourceID = paramtable . GetNodeID ( )
2022-10-08 07:38:58 +00:00
collName , fieldName := dit . CollectionName , dit . FieldName
if err := validateCollectionName ( collName ) ; err != nil {
return err
}
2022-10-28 08:41:32 +00:00
if fieldName != "" {
if err := validateFieldName ( fieldName ) ; err != nil {
return err
}
2022-10-08 07:38:58 +00:00
}
2022-10-25 03:29:30 +00:00
collID , err := globalMetaCache . GetCollectionID ( ctx , dit . CollectionName )
if err != nil {
return err
}
2022-10-08 07:38:58 +00:00
dit . collectionID = collID
2022-10-27 05:05:31 +00:00
loaded , err := isCollectionLoaded ( ctx , dit . queryCoord , collID )
2022-10-19 15:17:44 +00:00
if err != nil {
return err
}
if loaded {
return errors . New ( "index cannot be dropped, collection is loaded, please release it first" )
}
2022-10-08 07:38:58 +00:00
return nil
}
func ( dit * dropIndexTask ) Execute ( ctx context . Context ) error {
2022-10-31 03:39:33 +00:00
var err error
2023-01-11 06:35:40 +00:00
dit . result , err = dit . dataCoord . DropIndex ( ctx , & indexpb . DropIndexRequest {
2022-10-08 07:38:58 +00:00
CollectionID : dit . collectionID ,
PartitionIDs : nil ,
IndexName : dit . IndexName ,
2022-10-31 03:39:33 +00:00
DropAll : false ,
2022-10-08 07:38:58 +00:00
} )
if dit . result == nil {
return errors . New ( "drop index resp is nil" )
}
if dit . result . ErrorCode != commonpb . ErrorCode_Success {
return errors . New ( dit . result . Reason )
}
return err
}
func ( dit * dropIndexTask ) PostExecute ( ctx context . Context ) error {
return nil
}
// Deprecated: use describeIndexTask instead
type getIndexBuildProgressTask struct {
Condition
* milvuspb . GetIndexBuildProgressRequest
2023-01-04 11:37:36 +00:00
ctx context . Context
rootCoord types . RootCoord
dataCoord types . DataCoord
result * milvuspb . GetIndexBuildProgressResponse
2022-10-08 07:38:58 +00:00
collectionID UniqueID
}
func ( gibpt * getIndexBuildProgressTask ) TraceCtx ( ) context . Context {
return gibpt . ctx
}
func ( gibpt * getIndexBuildProgressTask ) ID ( ) UniqueID {
return gibpt . Base . MsgID
}
func ( gibpt * getIndexBuildProgressTask ) SetID ( uid UniqueID ) {
gibpt . Base . MsgID = uid
}
func ( gibpt * getIndexBuildProgressTask ) Name ( ) string {
return GetIndexBuildProgressTaskName
}
func ( gibpt * getIndexBuildProgressTask ) Type ( ) commonpb . MsgType {
return gibpt . Base . MsgType
}
func ( gibpt * getIndexBuildProgressTask ) BeginTs ( ) Timestamp {
return gibpt . Base . Timestamp
}
func ( gibpt * getIndexBuildProgressTask ) EndTs ( ) Timestamp {
return gibpt . Base . Timestamp
}
func ( gibpt * getIndexBuildProgressTask ) SetTs ( ts Timestamp ) {
gibpt . Base . Timestamp = ts
}
func ( gibpt * getIndexBuildProgressTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
gibpt . Base = commonpbutil . NewMsgBase ( )
2022-10-08 07:38:58 +00:00
return nil
}
func ( gibpt * getIndexBuildProgressTask ) PreExecute ( ctx context . Context ) error {
gibpt . Base . MsgType = commonpb . MsgType_GetIndexBuildProgress
2022-11-04 06:25:38 +00:00
gibpt . Base . SourceID = paramtable . GetNodeID ( )
2022-10-08 07:38:58 +00:00
if err := validateCollectionName ( gibpt . CollectionName ) ; err != nil {
return err
}
return nil
}
func ( gibpt * getIndexBuildProgressTask ) Execute ( ctx context . Context ) error {
collectionName := gibpt . CollectionName
collectionID , err := globalMetaCache . GetCollectionID ( ctx , collectionName )
if err != nil { // err is not nil if collection not exists
return err
}
gibpt . collectionID = collectionID
if gibpt . IndexName == "" {
2022-12-07 10:01:19 +00:00
gibpt . IndexName = Params . CommonCfg . DefaultIndexName . GetValue ( )
2022-10-08 07:38:58 +00:00
}
2023-01-11 06:35:40 +00:00
resp , err := gibpt . dataCoord . GetIndexBuildProgress ( ctx , & indexpb . GetIndexBuildProgressRequest {
2022-10-08 07:38:58 +00:00
CollectionID : collectionID ,
IndexName : gibpt . IndexName ,
} )
if err != nil {
return err
}
gibpt . result = & milvuspb . GetIndexBuildProgressResponse {
Status : resp . Status ,
TotalRows : resp . GetTotalRows ( ) ,
IndexedRows : resp . GetIndexedRows ( ) ,
}
return nil
}
func ( gibpt * getIndexBuildProgressTask ) PostExecute ( ctx context . Context ) error {
return nil
}
// Deprecated: use describeIndexTask instead
type getIndexStateTask struct {
Condition
* milvuspb . GetIndexStateRequest
2023-01-04 11:37:36 +00:00
ctx context . Context
dataCoord types . DataCoord
rootCoord types . RootCoord
result * milvuspb . GetIndexStateResponse
2022-10-08 07:38:58 +00:00
collectionID UniqueID
}
func ( gist * getIndexStateTask ) TraceCtx ( ) context . Context {
return gist . ctx
}
func ( gist * getIndexStateTask ) ID ( ) UniqueID {
return gist . Base . MsgID
}
func ( gist * getIndexStateTask ) SetID ( uid UniqueID ) {
gist . Base . MsgID = uid
}
func ( gist * getIndexStateTask ) Name ( ) string {
return GetIndexStateTaskName
}
func ( gist * getIndexStateTask ) Type ( ) commonpb . MsgType {
return gist . Base . MsgType
}
func ( gist * getIndexStateTask ) BeginTs ( ) Timestamp {
return gist . Base . Timestamp
}
func ( gist * getIndexStateTask ) EndTs ( ) Timestamp {
return gist . Base . Timestamp
}
func ( gist * getIndexStateTask ) SetTs ( ts Timestamp ) {
gist . Base . Timestamp = ts
}
func ( gist * getIndexStateTask ) OnEnqueue ( ) error {
2022-10-19 02:01:26 +00:00
gist . Base = commonpbutil . NewMsgBase ( )
2022-10-08 07:38:58 +00:00
return nil
}
func ( gist * getIndexStateTask ) PreExecute ( ctx context . Context ) error {
gist . Base . MsgType = commonpb . MsgType_GetIndexState
2022-11-04 06:25:38 +00:00
gist . Base . SourceID = paramtable . GetNodeID ( )
2022-10-08 07:38:58 +00:00
if err := validateCollectionName ( gist . CollectionName ) ; err != nil {
return err
}
return nil
}
func ( gist * getIndexStateTask ) Execute ( ctx context . Context ) error {
if gist . IndexName == "" {
2022-12-07 10:01:19 +00:00
gist . IndexName = Params . CommonCfg . DefaultIndexName . GetValue ( )
2022-10-08 07:38:58 +00:00
}
collectionID , err := globalMetaCache . GetCollectionID ( ctx , gist . CollectionName )
if err != nil {
return err
}
2023-01-11 06:35:40 +00:00
state , err := gist . dataCoord . GetIndexState ( ctx , & indexpb . GetIndexStateRequest {
2022-10-08 07:38:58 +00:00
CollectionID : collectionID ,
IndexName : gist . IndexName ,
} )
if err != nil {
return err
}
gist . result = & milvuspb . GetIndexStateResponse {
Status : & commonpb . Status {
ErrorCode : commonpb . ErrorCode_Success ,
Reason : "" ,
} ,
State : state . GetState ( ) ,
FailReason : state . GetFailReason ( ) ,
}
return nil
}
func ( gist * getIndexStateTask ) PostExecute ( ctx context . Context ) error {
return nil
}