feat: integrate storagev2 into index build process (#28995)

issue: https://github.com/milvus-io/milvus/issues/28994

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/28949/head
Bingyi Sun 2023-12-13 17:24:38 +08:00 committed by GitHub
parent ed79505d31
commit ad866d2889
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1107 additions and 436 deletions

7
go.mod
View File

@ -59,9 +59,12 @@ require (
require github.com/apache/arrow/go/v12 v12.0.1
require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092
require (
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/quasilyte/go-ruleguard/dsl v0.3.22
golang.org/x/net v0.17.0
)
require (
@ -169,7 +172,6 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/kvproto v0.0.0-20221129023506-621ec37aac7a // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@ -218,7 +220,6 @@ require (
go.uber.org/automaxprocs v1.5.2 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"fmt"
"path"
"sync"
"time"
@ -25,6 +26,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
@ -81,6 +83,7 @@ type indexBuilder struct {
nodeManager *IndexNodeManager
chunkManager storage.ChunkManager
indexEngineVersionManager IndexEngineVersionManager
handler Handler
}
func newIndexBuilder(
@ -88,6 +91,7 @@ func newIndexBuilder(
metaTable *meta, nodeManager *IndexNodeManager,
chunkManager storage.ChunkManager,
indexEngineVersionManager IndexEngineVersionManager,
handler Handler,
) *indexBuilder {
ctx, cancel := context.WithCancel(ctx)
@ -101,6 +105,7 @@ func newIndexBuilder(
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
handler: handler,
indexEngineVersionManager: indexEngineVersionManager,
}
ib.reloadFromKV()
@ -299,18 +304,70 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(),
}
}
req := &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
var req *indexpb.CreateJobRequest
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
collectionInfo, err := ib.handler.GetCollection(ib.ctx, segment.GetCollectionID())
if err != nil {
log.Info("index builder get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
return false
}
schema := collectionInfo.Schema
var field *schemapb.FieldSchema
for _, f := range schema.Fields {
if f.FieldID == fieldID {
field = f
break
}
}
dim, _ := storage.GetDimFromParams(field.TypeParams)
var scheme string
if Params.MinioCfg.UseSSL.GetAsBool() {
scheme = "https"
} else {
scheme = "http"
}
req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(),
FieldID: fieldID,
FieldName: field.Name,
FieldType: field.DataType,
StorePath: fmt.Sprintf("s3://%s:%s@%s/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
StoreVersion: segment.GetStorageVersion(),
IndexStorePath: fmt.Sprintf("s3://%s:%s@%s/index/%d?scheme=%s&endpoint_override=%s&allow_bucket_creation=true", Params.MinioCfg.AccessKeyID.GetValue(), Params.MinioCfg.SecretAccessKey.GetValue(), Params.MinioCfg.BucketName.GetValue(), segment.GetID(), scheme, Params.MinioCfg.Address.GetValue()),
Dim: int64(dim),
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}
} else {
req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(ib.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: buildID,
DataPaths: binLogs,
IndexVersion: meta.IndexVersion + 1,
StorageConfig: storageConfig,
IndexParams: indexParams,
TypeParams: typeParams,
NumRows: meta.NumRows,
CurrentIndexVersion: ib.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
}
}
if err := ib.assignTask(client, req); err != nil {
// need to release lock then reassign, so set task state to retry
log.Ctx(ib.ctx).Warn("index builder assign task to IndexNode failed", zap.Int64("buildID", buildID),

View File

@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
catalogmocks "github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
@ -666,7 +667,7 @@ func TestIndexBuilder(t *testing.T) {
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager())
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), nil)
assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
@ -1061,3 +1062,137 @@ func TestIndexBuilder_Error(t *testing.T) {
assert.Equal(t, indexTaskRetry, state)
})
}
func TestIndexBuilderV2(t *testing.T) {
var (
collID = UniqueID(100)
partID = UniqueID(200)
indexID = UniqueID(300)
segID = UniqueID(500)
buildID = UniqueID(600)
nodeID = UniqueID(700)
)
paramtable.Init()
paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
ctx := context.Background()
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("CreateSegmentIndex",
mock.Anything,
mock.Anything,
).Return(nil)
catalog.On("AlterSegmentIndexes",
mock.Anything,
mock.Anything,
).Return(nil)
ic := mocks.NewMockIndexNodeClient(t)
ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything).
Return(&indexpb.GetJobStatsResponse{
Status: merr.Success(),
TotalJobNum: 1,
EnqueueJobNum: 0,
InProgressJobNum: 1,
TaskSlots: 1,
JobInfos: []*indexpb.JobInfo{
{
NumRows: 1024,
Dim: 128,
StartTime: 1,
EndTime: 10,
PodID: 1,
},
},
}, nil)
ic.EXPECT().QueryJobs(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.QueryJobsRequest, option ...grpc.CallOption) (*indexpb.QueryJobsResponse, error) {
indexInfos := make([]*indexpb.IndexTaskInfo, 0)
for _, buildID := range in.BuildIDs {
indexInfos = append(indexInfos, &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
})
}
return &indexpb.QueryJobsResponse{
Status: merr.Success(),
ClusterID: in.ClusterID,
IndexInfos: indexInfos,
}, nil
})
ic.EXPECT().CreateJob(mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
ic.EXPECT().DropJobs(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
mt := createMetaTable(catalog)
nodeManager := &IndexNodeManager{
ctx: ctx,
nodeClients: map[UniqueID]types.IndexNodeClient{
4: ic,
},
}
chunkManager := &mocks.ChunkManager{}
chunkManager.EXPECT().RootPath().Return("root")
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{FieldID: fieldID, Name: "vec", TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "10"}}},
},
},
}, nil)
ib := newIndexBuilder(ctx, mt, nodeManager, chunkManager, newIndexEngineVersionManager(), handler)
assert.Equal(t, 6, len(ib.tasks))
assert.Equal(t, indexTaskInit, ib.tasks[buildID])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+1])
// buildID+2 will be filter by isDeleted
assert.Equal(t, indexTaskInit, ib.tasks[buildID+3])
assert.Equal(t, indexTaskInProgress, ib.tasks[buildID+8])
assert.Equal(t, indexTaskInit, ib.tasks[buildID+9])
assert.Equal(t, indexTaskInit, ib.tasks[buildID+10])
ib.scheduleDuration = time.Millisecond * 500
ib.Start()
t.Run("enqueue", func(t *testing.T) {
segIdx := &model.SegmentIndex{
SegmentID: segID + 10,
CollectionID: collID,
PartitionID: partID,
NumRows: 1026,
IndexID: indexID,
BuildID: buildID + 10,
NodeID: 0,
IndexVersion: 0,
IndexState: 0,
FailReason: "",
IsDeleted: false,
CreateTime: 0,
IndexFileKeys: nil,
IndexSize: 0,
}
err := ib.meta.AddSegmentIndex(segIdx)
assert.NoError(t, err)
ib.enqueue(buildID + 10)
})
t.Run("node down", func(t *testing.T) {
ib.nodeDown(nodeID)
})
for {
ib.taskMutex.RLock()
if len(ib.tasks) == 0 {
break
}
ib.taskMutex.RUnlock()
}
ib.Stop()
}

View File

@ -598,7 +598,7 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
func (s *Server) initIndexBuilder(manager storage.ChunkManager) {
if s.indexBuilder == nil {
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager)
s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager, s.indexEngineVersionManager, s.handler)
}
}

View File

@ -828,6 +828,17 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
continue
}
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
ID: segment.ID,
PartitionID: segment.PartitionID,
CollectionID: segment.CollectionID,
InsertChannel: segment.InsertChannel,
NumOfRows: segment.NumOfRows,
})
continue
}
binlogs := segment.GetBinlogs()
if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 {
continue

View File

@ -23,8 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type StorageV2Cache struct {
@ -60,7 +59,7 @@ func (s *StorageV2Cache) SetSpace(segmentID int64, space *milvus_storage.Space)
}
func NewStorageV2Cache(schema *schemapb.CollectionSchema) (*StorageV2Cache, error) {
arrowSchema, err := ConvertToArrowSchema(schema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(schema.Fields)
if err != nil {
return nil, err
}
@ -69,119 +68,3 @@ func NewStorageV2Cache(schema *schemapb.CollectionSchema) (*StorageV2Cache, erro
spaces: make(map[int64]*milvus_storage.Space),
}, nil
}
func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error) {
arrowFields := make([]arrow.Field, 0, len(fields))
for _, field := range fields {
switch field.DataType {
case schemapb.DataType_Bool:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.FixedWidthTypes.Boolean,
})
case schemapb.DataType_Int8:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int8,
})
case schemapb.DataType_Int16:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int16,
})
case schemapb.DataType_Int32:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int32,
})
case schemapb.DataType_Int64:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int64,
})
case schemapb.DataType_Float:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Float32,
})
case schemapb.DataType_Double:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Float64,
})
case schemapb.DataType_String, schemapb.DataType_VarChar:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.BinaryTypes.String,
})
case schemapb.DataType_Array:
elemType, err := convertToArrowType(field.ElementType)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.ListOf(elemType),
})
case schemapb.DataType_JSON:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.BinaryTypes.Binary,
})
case schemapb.DataType_BinaryVector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim / 8},
})
case schemapb.DataType_FloatVector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4},
})
case schemapb.DataType_Float16Vector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 2},
})
default:
return nil, merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}
}
return arrow.NewSchema(arrowFields, nil), nil
}
func convertToArrowType(dataType schemapb.DataType) (arrow.DataType, error) {
switch dataType {
case schemapb.DataType_Bool:
return arrow.FixedWidthTypes.Boolean, nil
case schemapb.DataType_Int8:
return arrow.PrimitiveTypes.Int8, nil
case schemapb.DataType_Int16:
return arrow.PrimitiveTypes.Int16, nil
case schemapb.DataType_Int32:
return arrow.PrimitiveTypes.Int32, nil
case schemapb.DataType_Int64:
return arrow.PrimitiveTypes.Int64, nil
case schemapb.DataType_Float:
return arrow.PrimitiveTypes.Float32, nil
case schemapb.DataType_Double:
return arrow.PrimitiveTypes.Float64, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
return arrow.BinaryTypes.String, nil
default:
return nil, merr.WrapErrParameterInvalidMsg("unknown type %v", dataType.String())
}
}

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -238,7 +239,7 @@ func (t *SyncTaskV2) serializeDeleteData() error {
}
fields = append(fields, tsField)
schema, err := metacache.ConvertToArrowSchema(fields)
schema, err := typeutil2.ConvertToArrowSchema(fields)
if err != nil {
return err
}

View File

@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -94,7 +95,7 @@ func (s *SyncTaskSuiteV2) SetupSuite() {
},
}
arrowSchema, err := metacache.ConvertToArrowSchema(s.schema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(s.schema.Fields)
s.NoError(err)
s.arrowSchema = arrowSchema
}
@ -268,7 +269,7 @@ func (s *SyncTaskSuiteV2) TestBuildRecord() {
{FieldID: 14, Name: "field12", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "4"}}},
}
schema, err := metacache.ConvertToArrowSchema(fieldSchemas)
schema, err := typeutil.ConvertToArrowSchema(fieldSchemas)
s.NoError(err)
b := array.NewRecordBuilder(memory.NewGoAllocator(), schema)

View File

@ -22,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -206,7 +207,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
params.Params.CommonCfg.StorageScheme.SwapTempValue("file")
tmpDir := s.T().TempDir()
arrowSchema, err := metacache.ConvertToArrowSchema(s.collSchema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(arrowSchema, &schema.SchemaOptions{
@ -235,7 +236,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
params.Params.CommonCfg.EnableStorageV2.SwapTempValue("true")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
tmpDir := s.T().TempDir()
arrowSchema, err := metacache.ConvertToArrowSchema(s.collSchema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().

View File

@ -91,18 +91,37 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc()
return merr.Status(err), nil
}
task := &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
var task task
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
task = &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
},
}
} else {
task = &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
}
}
ret := merr.Success()
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
@ -138,6 +157,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
serializedSize: info.serializedSize,
failReason: info.failReason,
currentIndexVersion: info.currentIndexVersion,
indexStoreVersion: info.indexStoreVersion,
}
}
})
@ -159,6 +179,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
ret.IndexInfos[i].SerializedSize = info.serializedSize
ret.IndexInfos[i].FailReason = info.failReason
ret.IndexInfos[i].CurrentIndexVersion = info.currentIndexVersion
ret.IndexInfos[i].IndexStoreVersion = info.indexStoreVersion
log.RatedDebug(5, "querying index build task",
zap.Int64("indexBuildID", buildID),
zap.String("state", info.state.String()),

View File

@ -59,6 +59,7 @@ type taskInfo struct {
serializedSize uint64
failReason string
currentIndexVersion int32
indexStoreVersion int64
// task statistics
statistic *indexpb.JobInfo
@ -77,6 +78,164 @@ type task interface {
Reset()
}
type indexBuildTaskV2 struct {
*indexBuildTask
}
func (it *indexBuildTaskV2) parseParams(ctx context.Context) error {
it.collectionID = it.req.CollectionID
it.partitionID = it.req.PartitionID
it.segmentID = it.req.SegmentID
it.fieldType = it.req.FieldType
it.fieldID = it.req.FieldID
it.fieldName = it.req.FieldName
return nil
}
func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error {
err := it.parseParams(ctx)
if err != nil {
log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err))
return err
}
indexType := it.newIndexParams[common.IndexTypeKey]
if indexType == indexparamcheck.IndexDISKANN {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
log.Ctx(ctx).Warn("IndexNode don't support build disk index",
zap.String("index type", it.newIndexParams[common.IndexTypeKey]),
zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool()))
return merr.WrapErrIndexNotSupported("disk index")
}
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.statistic.Dim, it.req.GetNumRows(), it.fieldType)
if err != nil {
log.Ctx(ctx).Warn("IndexNode get local used size failed")
return err
}
usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize
maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if usedLocalSizeWhenBuild > maxUsedLocalSize {
log.Ctx(ctx).Warn("IndexNode don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return merr.WrapErrServiceDiskLimitExceeded(float32(usedLocalSizeWhenBuild), float32(maxUsedLocalSize))
}
err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize))
if err != nil {
log.Ctx(ctx).Warn("failed to fill disk index params", zap.Error(err))
return err
}
}
var buildIndexInfo *indexcgowrapper.BuildIndexInfo
buildIndexInfo, err = indexcgowrapper.NewBuildIndexInfo(it.req.GetStorageConfig())
defer indexcgowrapper.DeleteBuildIndexInfo(buildIndexInfo)
if err != nil {
log.Ctx(ctx).Warn("create build index info failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendFieldMetaInfoV2(it.collectionID, it.partitionID, it.segmentID, it.fieldID, it.fieldType, it.fieldName, it.req.Dim)
if err != nil {
log.Ctx(ctx).Warn("append field meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexMetaInfo(it.req.IndexID, it.req.BuildID, it.req.IndexVersion)
if err != nil {
log.Ctx(ctx).Warn("append index meta failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendBuildIndexParam(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Warn("append index params failed", zap.Error(err))
return err
}
err = buildIndexInfo.AppendIndexStorageInfo(it.req.StorePath, it.req.IndexStorePath, it.req.StoreVersion)
if err != nil {
log.Ctx(ctx).Warn("append storage info failed", zap.Error(err))
return err
}
jsonIndexParams, err := json.Marshal(it.newIndexParams)
if err != nil {
log.Ctx(ctx).Error("failed to json marshal index params", zap.Error(err))
return err
}
log.Ctx(ctx).Info("index params are ready",
zap.Int64("buildID", it.BuildID),
zap.String("index params", string(jsonIndexParams)))
err = buildIndexInfo.AppendBuildTypeParam(it.newTypeParams)
if err != nil {
log.Ctx(ctx).Warn("append type params failed", zap.Error(err))
return err
}
it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexInfo)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {
log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed",
zap.Int64("buildID", it.BuildID),
zap.Int64("index version", it.req.GetIndexVersion()))
}
log.Ctx(ctx).Error("failed to build index", zap.Error(err))
return err
}
buildIndexLatency := it.tr.RecordSpan()
metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds()))
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
return nil
}
func (it *indexBuildTaskV2) SaveIndexFiles(ctx context.Context) error {
gcIndex := func() {
if err := it.index.Delete(); err != nil {
log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err))
}
}
version, err := it.index.UpLoadV2()
if err != nil {
log.Ctx(ctx).Error("failed to upload index", zap.Error(err))
gcIndex()
return err
}
encodeIndexFileDur := it.tr.Record("index serialize and upload done")
metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(encodeIndexFileDur.Seconds())
// early release index for gc, and we can ensure that Delete is idempotent.
gcIndex()
// use serialized size before encoding
it.serializedSize = 0
saveFileKeys := make([]string, 0)
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatisticV2(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion, version)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys))
saveIndexFileDur := it.tr.RecordSpan()
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds())
it.tr.Elapse("index building all done")
log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID),
zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID))
return nil
}
// IndexBuildTask is used to record the information of the index tasks.
type indexBuildTask struct {
ident string
@ -95,6 +254,7 @@ type indexBuildTask struct {
partitionID UniqueID
segmentID UniqueID
fieldID UniqueID
fieldName string
fieldType schemapb.DataType
fieldData storage.FieldData
indexBlobs []*storage.Blob

View File

@ -160,8 +160,9 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexTaskQueue {
unissuedTasks: list.New(),
activeTasks: make(map[string]task),
maxTaskNum: 1024,
utBufChan: make(chan int, 1024),
sched: sched,
utBufChan: make(chan int, 1024),
sched: sched,
}
}

View File

@ -16,6 +16,29 @@
package indexnode
import (
"context"
"testing"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
milvus_storage "github.com/milvus-io/milvus-storage/go/storage"
"github.com/milvus-io/milvus-storage/go/storage/options"
"github.com/milvus-io/milvus-storage/go/storage/schema"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
// import (
// "context"
// "github.com/cockroachdb/errors"
@ -190,3 +213,109 @@ package indexnode
// assert.Error(t, err)
// })
// }
type IndexBuildTaskV2Suite struct {
suite.Suite
schema *schemapb.CollectionSchema
arrowSchema *arrow.Schema
space *milvus_storage.Space
}
func (suite *IndexBuildTaskV2Suite) SetupSuite() {
paramtable.Init()
}
func (suite *IndexBuildTaskV2Suite) SetupTest() {
suite.schema = &schemapb.CollectionSchema{
Name: "test",
Description: "test",
AutoID: false,
Fields: []*schemapb.FieldSchema{
{FieldID: 1, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true},
{FieldID: 2, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: 3, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "1"}}},
},
}
var err error
suite.arrowSchema, err = typeutil.ConvertToArrowSchema(suite.schema.Fields)
suite.NoError(err)
tmpDir := suite.T().TempDir()
opt := options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(
suite.arrowSchema,
&schema.SchemaOptions{
PrimaryColumn: "pk",
VectorColumn: "vec",
VersionColumn: "ts",
})).
Build()
suite.space, err = milvus_storage.Open("file://"+tmpDir, opt)
suite.NoError(err)
b := array.NewRecordBuilder(memory.DefaultAllocator, suite.arrowSchema)
defer b.Release()
b.Field(0).(*array.Int64Builder).AppendValues([]int64{1}, nil)
b.Field(1).(*array.Int64Builder).AppendValues([]int64{1}, nil)
fb := b.Field(2).(*array.FixedSizeBinaryBuilder)
fb.Reserve(1)
fb.Append([]byte{1, 2, 3, 4})
rec := b.NewRecord()
defer rec.Release()
reader, err := array.NewRecordReader(suite.arrowSchema, []arrow.Record{rec})
suite.NoError(err)
err = suite.space.Write(reader, &options.DefaultWriteOptions)
suite.NoError(err)
}
func (suite *IndexBuildTaskV2Suite) TestBuildIndex() {
req := &indexpb.CreateJobRequest{
BuildID: 1,
IndexVersion: 1,
IndexID: 0,
IndexName: "",
IndexParams: []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: metric.L2}, {Key: common.DimKey, Value: "1"}},
TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "1"}},
NumRows: 10,
StorageConfig: &indexpb.StorageConfig{
RootPath: "/tmp/milvus/data",
StorageType: "local",
},
CollectionID: 1,
PartitionID: 1,
SegmentID: 1,
FieldID: 3,
FieldName: "vec",
FieldType: schemapb.DataType_FloatVector,
StorePath: "file://" + suite.space.Path(),
StoreVersion: suite.space.GetCurrentVersion(),
IndexStorePath: "file://" + suite.space.Path(),
Dim: 4,
}
task := &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: "test",
ctx: context.Background(),
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
req: req,
tr: timerecord.NewTimeRecorder("test"),
node: NewIndexNode(context.Background(), dependency.NewDefaultFactory(true)),
},
}
var err error
err = task.Prepare(context.Background())
suite.NoError(err)
err = task.BuildIndex(context.Background())
suite.NoError(err)
err = task.SaveIndexFiles(context.Background())
suite.NoError(err)
}
func TestIndexBuildTaskV2Suite(t *testing.T) {
suite.Run(t, new(IndexBuildTaskV2Suite))
}

View File

@ -76,6 +76,28 @@ func (i *IndexNode) storeIndexFilesAndStatistic(
}
}
func (i *IndexNode) storeIndexFilesAndStatisticV2(
ClusterID string,
buildID UniqueID,
fileKeys []string,
serializedSize uint64,
statistic *indexpb.JobInfo,
currentIndexVersion int32,
indexStoreVersion int64,
) {
key := taskKey{ClusterID: ClusterID, BuildID: buildID}
i.stateLock.Lock()
defer i.stateLock.Unlock()
if info, ok := i.tasks[key]; ok {
info.fileKeys = common.CloneStringList(fileKeys)
info.serializedSize = serializedSize
info.statistic = proto.Clone(statistic).(*indexpb.JobInfo)
info.currentIndexVersion = currentIndexVersion
info.indexStoreVersion = indexStoreVersion
return
}
}
func (i *IndexNode) deleteTaskInfos(ctx context.Context, keys []taskKey) []*taskInfo {
i.stateLock.Lock()
defer i.stateLock.Unlock()

View File

@ -24,6 +24,7 @@ type SegmentIndex struct {
// deprecated
WriteHandoff bool
CurrentIndexVersion int32
IndexStoreVersion int64
}
func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {

View File

@ -64,8 +64,8 @@ type MockDataNode_AddImportSegment_Call struct {
}
// AddImportSegment is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.AddImportSegmentRequest
// - _a0 context.Context
// - _a1 *datapb.AddImportSegmentRequest
func (_e *MockDataNode_Expecter) AddImportSegment(_a0 interface{}, _a1 interface{}) *MockDataNode_AddImportSegment_Call {
return &MockDataNode_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment", _a0, _a1)}
}
@ -119,8 +119,8 @@ type MockDataNode_CheckChannelOperationProgress_Call struct {
}
// CheckChannelOperationProgress is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.ChannelWatchInfo
// - _a0 context.Context
// - _a1 *datapb.ChannelWatchInfo
func (_e *MockDataNode_Expecter) CheckChannelOperationProgress(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckChannelOperationProgress_Call {
return &MockDataNode_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress", _a0, _a1)}
}
@ -174,8 +174,8 @@ type MockDataNode_Compaction_Call struct {
}
// Compaction is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.CompactionPlan
// - _a0 context.Context
// - _a1 *datapb.CompactionPlan
func (_e *MockDataNode_Expecter) Compaction(_a0 interface{}, _a1 interface{}) *MockDataNode_Compaction_Call {
return &MockDataNode_Compaction_Call{Call: _e.mock.On("Compaction", _a0, _a1)}
}
@ -229,8 +229,8 @@ type MockDataNode_DropImport_Call struct {
}
// DropImport is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.DropImportRequest
// - _a0 context.Context
// - _a1 *datapb.DropImportRequest
func (_e *MockDataNode_Expecter) DropImport(_a0 interface{}, _a1 interface{}) *MockDataNode_DropImport_Call {
return &MockDataNode_DropImport_Call{Call: _e.mock.On("DropImport", _a0, _a1)}
}
@ -284,8 +284,8 @@ type MockDataNode_FlushChannels_Call struct {
}
// FlushChannels is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.FlushChannelsRequest
// - _a0 context.Context
// - _a1 *datapb.FlushChannelsRequest
func (_e *MockDataNode_Expecter) FlushChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushChannels_Call {
return &MockDataNode_FlushChannels_Call{Call: _e.mock.On("FlushChannels", _a0, _a1)}
}
@ -339,8 +339,8 @@ type MockDataNode_FlushSegments_Call struct {
}
// FlushSegments is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.FlushSegmentsRequest
// - _a0 context.Context
// - _a1 *datapb.FlushSegmentsRequest
func (_e *MockDataNode_Expecter) FlushSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_FlushSegments_Call {
return &MockDataNode_FlushSegments_Call{Call: _e.mock.On("FlushSegments", _a0, _a1)}
}
@ -435,8 +435,8 @@ type MockDataNode_GetCompactionState_Call struct {
}
// GetCompactionState is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.CompactionStateRequest
// - _a0 context.Context
// - _a1 *datapb.CompactionStateRequest
func (_e *MockDataNode_Expecter) GetCompactionState(_a0 interface{}, _a1 interface{}) *MockDataNode_GetCompactionState_Call {
return &MockDataNode_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState", _a0, _a1)}
}
@ -490,8 +490,8 @@ type MockDataNode_GetComponentStates_Call struct {
}
// GetComponentStates is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.GetComponentStatesRequest
// - _a0 context.Context
// - _a1 *milvuspb.GetComponentStatesRequest
func (_e *MockDataNode_Expecter) GetComponentStates(_a0 interface{}, _a1 interface{}) *MockDataNode_GetComponentStates_Call {
return &MockDataNode_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates", _a0, _a1)}
}
@ -545,8 +545,8 @@ type MockDataNode_GetMetrics_Call struct {
}
// GetMetrics is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.GetMetricsRequest
// - _a0 context.Context
// - _a1 *milvuspb.GetMetricsRequest
func (_e *MockDataNode_Expecter) GetMetrics(_a0 interface{}, _a1 interface{}) *MockDataNode_GetMetrics_Call {
return &MockDataNode_GetMetrics_Call{Call: _e.mock.On("GetMetrics", _a0, _a1)}
}
@ -641,8 +641,8 @@ type MockDataNode_GetStatisticsChannel_Call struct {
}
// GetStatisticsChannel is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *internalpb.GetStatisticsChannelRequest
// - _a0 context.Context
// - _a1 *internalpb.GetStatisticsChannelRequest
func (_e *MockDataNode_Expecter) GetStatisticsChannel(_a0 interface{}, _a1 interface{}) *MockDataNode_GetStatisticsChannel_Call {
return &MockDataNode_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel", _a0, _a1)}
}
@ -696,8 +696,8 @@ type MockDataNode_Import_Call struct {
}
// Import is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.ImportTaskRequest
// - _a0 context.Context
// - _a1 *datapb.ImportTaskRequest
func (_e *MockDataNode_Expecter) Import(_a0 interface{}, _a1 interface{}) *MockDataNode_Import_Call {
return &MockDataNode_Import_Call{Call: _e.mock.On("Import", _a0, _a1)}
}
@ -751,8 +751,8 @@ type MockDataNode_ImportV2_Call struct {
}
// ImportV2 is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.ImportRequest
// - _a0 context.Context
// - _a1 *datapb.ImportRequest
func (_e *MockDataNode_Expecter) ImportV2(_a0 interface{}, _a1 interface{}) *MockDataNode_ImportV2_Call {
return &MockDataNode_ImportV2_Call{Call: _e.mock.On("ImportV2", _a0, _a1)}
}
@ -847,8 +847,8 @@ type MockDataNode_NotifyChannelOperation_Call struct {
}
// NotifyChannelOperation is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.ChannelOperationsRequest
// - _a0 context.Context
// - _a1 *datapb.ChannelOperationsRequest
func (_e *MockDataNode_Expecter) NotifyChannelOperation(_a0 interface{}, _a1 interface{}) *MockDataNode_NotifyChannelOperation_Call {
return &MockDataNode_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation", _a0, _a1)}
}
@ -902,8 +902,8 @@ type MockDataNode_PreImport_Call struct {
}
// PreImport is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.PreImportRequest
// - _a0 context.Context
// - _a1 *datapb.PreImportRequest
func (_e *MockDataNode_Expecter) PreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_PreImport_Call {
return &MockDataNode_PreImport_Call{Call: _e.mock.On("PreImport", _a0, _a1)}
}
@ -957,8 +957,8 @@ type MockDataNode_QueryImport_Call struct {
}
// QueryImport is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.QueryImportRequest
// - _a0 context.Context
// - _a1 *datapb.QueryImportRequest
func (_e *MockDataNode_Expecter) QueryImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryImport_Call {
return &MockDataNode_QueryImport_Call{Call: _e.mock.On("QueryImport", _a0, _a1)}
}
@ -1012,8 +1012,8 @@ type MockDataNode_QueryPreImport_Call struct {
}
// QueryPreImport is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.QueryPreImportRequest
// - _a0 context.Context
// - _a1 *datapb.QueryPreImportRequest
func (_e *MockDataNode_Expecter) QueryPreImport(_a0 interface{}, _a1 interface{}) *MockDataNode_QueryPreImport_Call {
return &MockDataNode_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport", _a0, _a1)}
}
@ -1108,8 +1108,8 @@ type MockDataNode_ResendSegmentStats_Call struct {
}
// ResendSegmentStats is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.ResendSegmentStatsRequest
// - _a0 context.Context
// - _a1 *datapb.ResendSegmentStatsRequest
func (_e *MockDataNode_Expecter) ResendSegmentStats(_a0 interface{}, _a1 interface{}) *MockDataNode_ResendSegmentStats_Call {
return &MockDataNode_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats", _a0, _a1)}
}
@ -1142,7 +1142,7 @@ type MockDataNode_SetAddress_Call struct {
}
// SetAddress is a helper method to define mock.On call
// - address string
// - address string
func (_e *MockDataNode_Expecter) SetAddress(address interface{}) *MockDataNode_SetAddress_Call {
return &MockDataNode_SetAddress_Call{Call: _e.mock.On("SetAddress", address)}
}
@ -1184,7 +1184,7 @@ type MockDataNode_SetDataCoordClient_Call struct {
}
// SetDataCoordClient is a helper method to define mock.On call
// - dataCoord types.DataCoordClient
// - dataCoord types.DataCoordClient
func (_e *MockDataNode_Expecter) SetDataCoordClient(dataCoord interface{}) *MockDataNode_SetDataCoordClient_Call {
return &MockDataNode_SetDataCoordClient_Call{Call: _e.mock.On("SetDataCoordClient", dataCoord)}
}
@ -1217,7 +1217,7 @@ type MockDataNode_SetEtcdClient_Call struct {
}
// SetEtcdClient is a helper method to define mock.On call
// - etcdClient *clientv3.Client
// - etcdClient *clientv3.Client
func (_e *MockDataNode_Expecter) SetEtcdClient(etcdClient interface{}) *MockDataNode_SetEtcdClient_Call {
return &MockDataNode_SetEtcdClient_Call{Call: _e.mock.On("SetEtcdClient", etcdClient)}
}
@ -1259,7 +1259,7 @@ type MockDataNode_SetRootCoordClient_Call struct {
}
// SetRootCoordClient is a helper method to define mock.On call
// - rootCoord types.RootCoordClient
// - rootCoord types.RootCoordClient
func (_e *MockDataNode_Expecter) SetRootCoordClient(rootCoord interface{}) *MockDataNode_SetRootCoordClient_Call {
return &MockDataNode_SetRootCoordClient_Call{Call: _e.mock.On("SetRootCoordClient", rootCoord)}
}
@ -1313,8 +1313,8 @@ type MockDataNode_ShowConfigurations_Call struct {
}
// ShowConfigurations is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *internalpb.ShowConfigurationsRequest
// - _a0 context.Context
// - _a1 *internalpb.ShowConfigurationsRequest
func (_e *MockDataNode_Expecter) ShowConfigurations(_a0 interface{}, _a1 interface{}) *MockDataNode_ShowConfigurations_Call {
return &MockDataNode_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations", _a0, _a1)}
}
@ -1450,8 +1450,8 @@ type MockDataNode_SyncSegments_Call struct {
}
// SyncSegments is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.SyncSegmentsRequest
// - _a0 context.Context
// - _a1 *datapb.SyncSegmentsRequest
func (_e *MockDataNode_Expecter) SyncSegments(_a0 interface{}, _a1 interface{}) *MockDataNode_SyncSegments_Call {
return &MockDataNode_SyncSegments_Call{Call: _e.mock.On("SyncSegments", _a0, _a1)}
}
@ -1484,7 +1484,7 @@ type MockDataNode_UpdateStateCode_Call struct {
}
// UpdateStateCode is a helper method to define mock.On call
// - stateCode commonpb.StateCode
// - stateCode commonpb.StateCode
func (_e *MockDataNode_Expecter) UpdateStateCode(stateCode interface{}) *MockDataNode_UpdateStateCode_Call {
return &MockDataNode_UpdateStateCode_Call{Call: _e.mock.On("UpdateStateCode", stateCode)}
}
@ -1538,8 +1538,8 @@ type MockDataNode_WatchDmChannels_Call struct {
}
// WatchDmChannels is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *datapb.WatchDmChannelsRequest
// - _a0 context.Context
// - _a1 *datapb.WatchDmChannelsRequest
func (_e *MockDataNode_Expecter) WatchDmChannels(_a0 interface{}, _a1 interface{}) *MockDataNode_WatchDmChannels_Call {
return &MockDataNode_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels", _a0, _a1)}
}

View File

@ -70,9 +70,9 @@ type MockDataNodeClient_AddImportSegment_Call struct {
}
// AddImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.AddImportSegmentRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.AddImportSegmentRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) AddImportSegment(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_AddImportSegment_Call {
return &MockDataNodeClient_AddImportSegment_Call{Call: _e.mock.On("AddImportSegment",
append([]interface{}{ctx, in}, opts...)...)}
@ -140,9 +140,9 @@ type MockDataNodeClient_CheckChannelOperationProgress_Call struct {
}
// CheckChannelOperationProgress is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.ChannelWatchInfo
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.ChannelWatchInfo
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) CheckChannelOperationProgress(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckChannelOperationProgress_Call {
return &MockDataNodeClient_CheckChannelOperationProgress_Call{Call: _e.mock.On("CheckChannelOperationProgress",
append([]interface{}{ctx, in}, opts...)...)}
@ -251,9 +251,9 @@ type MockDataNodeClient_Compaction_Call struct {
}
// Compaction is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.CompactionPlan
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.CompactionPlan
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) Compaction(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_Compaction_Call {
return &MockDataNodeClient_Compaction_Call{Call: _e.mock.On("Compaction",
append([]interface{}{ctx, in}, opts...)...)}
@ -321,9 +321,9 @@ type MockDataNodeClient_DropImport_Call struct {
}
// DropImport is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.DropImportRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.DropImportRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) DropImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_DropImport_Call {
return &MockDataNodeClient_DropImport_Call{Call: _e.mock.On("DropImport",
append([]interface{}{ctx, in}, opts...)...)}
@ -391,9 +391,9 @@ type MockDataNodeClient_FlushChannels_Call struct {
}
// FlushChannels is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.FlushChannelsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.FlushChannelsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) FlushChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushChannels_Call {
return &MockDataNodeClient_FlushChannels_Call{Call: _e.mock.On("FlushChannels",
append([]interface{}{ctx, in}, opts...)...)}
@ -461,9 +461,9 @@ type MockDataNodeClient_FlushSegments_Call struct {
}
// FlushSegments is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.FlushSegmentsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.FlushSegmentsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) FlushSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_FlushSegments_Call {
return &MockDataNodeClient_FlushSegments_Call{Call: _e.mock.On("FlushSegments",
append([]interface{}{ctx, in}, opts...)...)}
@ -531,9 +531,9 @@ type MockDataNodeClient_GetCompactionState_Call struct {
}
// GetCompactionState is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.CompactionStateRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.CompactionStateRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetCompactionState(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetCompactionState_Call {
return &MockDataNodeClient_GetCompactionState_Call{Call: _e.mock.On("GetCompactionState",
append([]interface{}{ctx, in}, opts...)...)}
@ -601,9 +601,9 @@ type MockDataNodeClient_GetComponentStates_Call struct {
}
// GetComponentStates is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.GetComponentStatesRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *milvuspb.GetComponentStatesRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetComponentStates(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetComponentStates_Call {
return &MockDataNodeClient_GetComponentStates_Call{Call: _e.mock.On("GetComponentStates",
append([]interface{}{ctx, in}, opts...)...)}
@ -671,9 +671,9 @@ type MockDataNodeClient_GetMetrics_Call struct {
}
// GetMetrics is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.GetMetricsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *milvuspb.GetMetricsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetMetrics(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetMetrics_Call {
return &MockDataNodeClient_GetMetrics_Call{Call: _e.mock.On("GetMetrics",
append([]interface{}{ctx, in}, opts...)...)}
@ -741,9 +741,9 @@ type MockDataNodeClient_GetStatisticsChannel_Call struct {
}
// GetStatisticsChannel is a helper method to define mock.On call
// - ctx context.Context
// - in *internalpb.GetStatisticsChannelRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *internalpb.GetStatisticsChannelRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) GetStatisticsChannel(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_GetStatisticsChannel_Call {
return &MockDataNodeClient_GetStatisticsChannel_Call{Call: _e.mock.On("GetStatisticsChannel",
append([]interface{}{ctx, in}, opts...)...)}
@ -811,9 +811,9 @@ type MockDataNodeClient_Import_Call struct {
}
// Import is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.ImportTaskRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.ImportTaskRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) Import(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_Import_Call {
return &MockDataNodeClient_Import_Call{Call: _e.mock.On("Import",
append([]interface{}{ctx, in}, opts...)...)}
@ -881,9 +881,9 @@ type MockDataNodeClient_ImportV2_Call struct {
}
// ImportV2 is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.ImportRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.ImportRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ImportV2(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ImportV2_Call {
return &MockDataNodeClient_ImportV2_Call{Call: _e.mock.On("ImportV2",
append([]interface{}{ctx, in}, opts...)...)}
@ -951,9 +951,9 @@ type MockDataNodeClient_NotifyChannelOperation_Call struct {
}
// NotifyChannelOperation is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.ChannelOperationsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.ChannelOperationsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) NotifyChannelOperation(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_NotifyChannelOperation_Call {
return &MockDataNodeClient_NotifyChannelOperation_Call{Call: _e.mock.On("NotifyChannelOperation",
append([]interface{}{ctx, in}, opts...)...)}
@ -1021,9 +1021,9 @@ type MockDataNodeClient_PreImport_Call struct {
}
// PreImport is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.PreImportRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.PreImportRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) PreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_PreImport_Call {
return &MockDataNodeClient_PreImport_Call{Call: _e.mock.On("PreImport",
append([]interface{}{ctx, in}, opts...)...)}
@ -1091,9 +1091,9 @@ type MockDataNodeClient_QueryImport_Call struct {
}
// QueryImport is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.QueryImportRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.QueryImportRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) QueryImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryImport_Call {
return &MockDataNodeClient_QueryImport_Call{Call: _e.mock.On("QueryImport",
append([]interface{}{ctx, in}, opts...)...)}
@ -1161,9 +1161,9 @@ type MockDataNodeClient_QueryPreImport_Call struct {
}
// QueryPreImport is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.QueryPreImportRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.QueryPreImportRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) QueryPreImport(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_QueryPreImport_Call {
return &MockDataNodeClient_QueryPreImport_Call{Call: _e.mock.On("QueryPreImport",
append([]interface{}{ctx, in}, opts...)...)}
@ -1231,9 +1231,9 @@ type MockDataNodeClient_ResendSegmentStats_Call struct {
}
// ResendSegmentStats is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.ResendSegmentStatsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.ResendSegmentStatsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ResendSegmentStats(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ResendSegmentStats_Call {
return &MockDataNodeClient_ResendSegmentStats_Call{Call: _e.mock.On("ResendSegmentStats",
append([]interface{}{ctx, in}, opts...)...)}
@ -1301,9 +1301,9 @@ type MockDataNodeClient_ShowConfigurations_Call struct {
}
// ShowConfigurations is a helper method to define mock.On call
// - ctx context.Context
// - in *internalpb.ShowConfigurationsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *internalpb.ShowConfigurationsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) ShowConfigurations(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_ShowConfigurations_Call {
return &MockDataNodeClient_ShowConfigurations_Call{Call: _e.mock.On("ShowConfigurations",
append([]interface{}{ctx, in}, opts...)...)}
@ -1371,9 +1371,9 @@ type MockDataNodeClient_SyncSegments_Call struct {
}
// SyncSegments is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.SyncSegmentsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.SyncSegmentsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) SyncSegments(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_SyncSegments_Call {
return &MockDataNodeClient_SyncSegments_Call{Call: _e.mock.On("SyncSegments",
append([]interface{}{ctx, in}, opts...)...)}
@ -1441,9 +1441,9 @@ type MockDataNodeClient_WatchDmChannels_Call struct {
}
// WatchDmChannels is a helper method to define mock.On call
// - ctx context.Context
// - in *datapb.WatchDmChannelsRequest
// - opts ...grpc.CallOption
// - ctx context.Context
// - in *datapb.WatchDmChannelsRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) WatchDmChannels(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_WatchDmChannels_Call {
return &MockDataNodeClient_WatchDmChannels_Call{Call: _e.mock.On("WatchDmChannels",
append([]interface{}{ctx, in}, opts...)...)}

View File

@ -7,281 +7,327 @@ option go_package = "github.com/milvus-io/milvus/internal/proto/indexpb";
import "common.proto";
import "internal.proto";
import "milvus.proto";
import "schema.proto";
service IndexCoord {
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc CreateIndex(CreateIndexRequest) returns (common.Status){}
// Deprecated: use DescribeIndex instead
rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) {}
rpc GetSegmentIndexState(GetSegmentIndexStateRequest) returns (GetSegmentIndexStateResponse) {}
rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse){}
rpc DropIndex(DropIndexRequest) returns (common.Status) {}
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {}
rpc GetIndexStatistics(GetIndexStatisticsRequest) returns (GetIndexStatisticsResponse) {}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(GetIndexBuildProgressRequest) returns (GetIndexBuildProgressResponse) {}
rpc GetComponentStates(milvus.GetComponentStatesRequest)
returns (milvus.ComponentStates) {
}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest)
returns (milvus.StringResponse) {
}
rpc CreateIndex(CreateIndexRequest) returns (common.Status) {
}
// Deprecated: use DescribeIndex instead
rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) {
}
rpc GetSegmentIndexState(GetSegmentIndexStateRequest)
returns (GetSegmentIndexStateResponse) {
}
rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse) {
}
rpc DropIndex(DropIndexRequest) returns (common.Status) {
}
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {
}
rpc GetIndexStatistics(GetIndexStatisticsRequest)
returns (GetIndexStatisticsResponse) {
}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(GetIndexBuildProgressRequest)
returns (GetIndexBuildProgressResponse) {
}
rpc ShowConfigurations(internal.ShowConfigurationsRequest) returns (internal.ShowConfigurationsResponse){}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
rpc ShowConfigurations(internal.ShowConfigurationsRequest)
returns (internal.ShowConfigurationsResponse) {
}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest)
returns (milvus.GetMetricsResponse) {
}
rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {}
rpc CheckHealth(milvus.CheckHealthRequest)
returns (milvus.CheckHealthResponse) {
}
}
service IndexNode {
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc CreateJob(CreateJobRequest) returns (common.Status) {}
rpc QueryJobs(QueryJobsRequest) returns (QueryJobsResponse) {}
rpc DropJobs(DropJobsRequest) returns (common.Status) {}
rpc GetJobStats(GetJobStatsRequest) returns (GetJobStatsResponse) {}
rpc GetComponentStates(milvus.GetComponentStatesRequest)
returns (milvus.ComponentStates) {
}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest)
returns (milvus.StringResponse) {
}
rpc CreateJob(CreateJobRequest) returns (common.Status) {
}
rpc QueryJobs(QueryJobsRequest) returns (QueryJobsResponse) {
}
rpc DropJobs(DropJobsRequest) returns (common.Status) {
}
rpc GetJobStats(GetJobStatsRequest) returns (GetJobStatsResponse) {
}
rpc ShowConfigurations(internal.ShowConfigurationsRequest) returns (internal.ShowConfigurationsResponse){}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
rpc ShowConfigurations(internal.ShowConfigurationsRequest)
returns (internal.ShowConfigurationsResponse) {
}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest)
returns (milvus.GetMetricsResponse) {
}
}
message IndexInfo {
int64 collectionID = 1;
int64 fieldID = 2;
string index_name = 3;
int64 indexID = 4;
repeated common.KeyValuePair type_params = 5;
repeated common.KeyValuePair index_params = 6;
// index build progress
// The real-time statistics may not be expected due to the existence of the compaction mechanism.
int64 indexed_rows = 7;
int64 total_rows = 8;
// index state
common.IndexState state = 9;
string index_state_fail_reason = 10;
bool is_auto_index = 11;
repeated common.KeyValuePair user_index_params = 12;
int64 pending_index_rows = 13;
int64 collectionID = 1;
int64 fieldID = 2;
string index_name = 3;
int64 indexID = 4;
repeated common.KeyValuePair type_params = 5;
repeated common.KeyValuePair index_params = 6;
// index build progress
// The real-time statistics may not be expected due to the existence of the compaction mechanism.
int64 indexed_rows = 7;
int64 total_rows = 8;
// index state
common.IndexState state = 9;
string index_state_fail_reason = 10;
bool is_auto_index = 11;
repeated common.KeyValuePair user_index_params = 12;
int64 pending_index_rows = 13;
}
message FieldIndex {
IndexInfo index_info = 1;
bool deleted = 2;
uint64 create_time = 3;
IndexInfo index_info = 1;
bool deleted = 2;
uint64 create_time = 3;
}
message SegmentIndex {
int64 collectionID = 1;
int64 partitionID = 2;
int64 segmentID = 3;
int64 num_rows = 4;
int64 indexID = 5;
int64 buildID = 6;
int64 nodeID = 7;
int64 index_version = 8;
common.IndexState state = 9;
string fail_reason = 10;
repeated string index_file_keys = 11;
bool deleted = 12;
uint64 create_time = 13;
uint64 serialize_size = 14;
bool write_handoff = 15;
int32 current_index_version = 16;
int64 collectionID = 1;
int64 partitionID = 2;
int64 segmentID = 3;
int64 num_rows = 4;
int64 indexID = 5;
int64 buildID = 6;
int64 nodeID = 7;
int64 index_version = 8;
common.IndexState state = 9;
string fail_reason = 10;
repeated string index_file_keys = 11;
bool deleted = 12;
uint64 create_time = 13;
uint64 serialize_size = 14;
bool write_handoff = 15;
int32 current_index_version = 16;
int64 index_store_version = 17;
}
message RegisterNodeRequest {
common.MsgBase base = 1;
common.Address address = 2;
int64 nodeID = 3;
common.MsgBase base = 1;
common.Address address = 2;
int64 nodeID = 3;
}
message RegisterNodeResponse {
common.Status status = 1;
internal.InitParams init_params = 2;
common.Status status = 1;
internal.InitParams init_params = 2;
}
message GetIndexStateRequest {
int64 collectionID = 1;
string index_name = 2;
int64 collectionID = 1;
string index_name = 2;
}
message GetIndexStateResponse {
common.Status status = 1;
common.IndexState state = 2;
string fail_reason = 3;
common.Status status = 1;
common.IndexState state = 2;
string fail_reason = 3;
}
message GetSegmentIndexStateRequest {
int64 collectionID = 1;
string index_name = 2;
repeated int64 segmentIDs = 3;
int64 collectionID = 1;
string index_name = 2;
repeated int64 segmentIDs = 3;
}
message SegmentIndexState {
int64 segmentID = 1;
common.IndexState state = 2;
string fail_reason = 3;
int64 segmentID = 1;
common.IndexState state = 2;
string fail_reason = 3;
}
message GetSegmentIndexStateResponse {
common.Status status = 1;
repeated SegmentIndexState states = 2;
common.Status status = 1;
repeated SegmentIndexState states = 2;
}
message CreateIndexRequest {
int64 collectionID = 1;
int64 fieldID = 2;
string index_name = 3;
repeated common.KeyValuePair type_params = 4;
repeated common.KeyValuePair index_params = 5;
uint64 timestamp = 6;
bool is_auto_index = 7;
repeated common.KeyValuePair user_index_params = 8;
int64 collectionID = 1;
int64 fieldID = 2;
string index_name = 3;
repeated common.KeyValuePair type_params = 4;
repeated common.KeyValuePair index_params = 5;
uint64 timestamp = 6;
bool is_auto_index = 7;
repeated common.KeyValuePair user_index_params = 8;
}
message GetIndexInfoRequest {
int64 collectionID = 1;
repeated int64 segmentIDs = 2;
string index_name = 3;
int64 collectionID = 1;
repeated int64 segmentIDs = 2;
string index_name = 3;
}
message IndexFilePathInfo {
int64 segmentID = 1;
int64 fieldID = 2;
int64 indexID = 3;
int64 buildID = 4;
string index_name = 5;
repeated common.KeyValuePair index_params = 6;
repeated string index_file_paths = 7;
uint64 serialized_size = 8;
int64 index_version = 9;
int64 num_rows = 10;
int32 current_index_version = 11;
int64 segmentID = 1;
int64 fieldID = 2;
int64 indexID = 3;
int64 buildID = 4;
string index_name = 5;
repeated common.KeyValuePair index_params = 6;
repeated string index_file_paths = 7;
uint64 serialized_size = 8;
int64 index_version = 9;
int64 num_rows = 10;
int32 current_index_version = 11;
}
message SegmentInfo {
int64 collectionID = 1;
int64 segmentID = 2;
bool enable_index = 3;
repeated IndexFilePathInfo index_infos = 4;
int64 collectionID = 1;
int64 segmentID = 2;
bool enable_index = 3;
repeated IndexFilePathInfo index_infos = 4;
}
message GetIndexInfoResponse {
common.Status status = 1;
map<int64, SegmentInfo> segment_info = 2;
common.Status status = 1;
map<int64, SegmentInfo> segment_info = 2;
}
message DropIndexRequest {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
string index_name = 3;
bool drop_all = 4;
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
string index_name = 3;
bool drop_all = 4;
}
message DescribeIndexRequest {
int64 collectionID = 1;
string index_name = 2;
uint64 timestamp = 3;
int64 collectionID = 1;
string index_name = 2;
uint64 timestamp = 3;
}
message DescribeIndexResponse {
common.Status status = 1;
repeated IndexInfo index_infos = 2;
common.Status status = 1;
repeated IndexInfo index_infos = 2;
}
message GetIndexBuildProgressRequest {
int64 collectionID = 1;
string index_name = 2;
int64 collectionID = 1;
string index_name = 2;
}
message GetIndexBuildProgressResponse {
common.Status status = 1;
int64 indexed_rows = 2;
int64 total_rows = 3;
int64 pending_index_rows = 4;
common.Status status = 1;
int64 indexed_rows = 2;
int64 total_rows = 3;
int64 pending_index_rows = 4;
}
message StorageConfig {
string address = 1;
string access_keyID = 2;
string secret_access_key = 3;
bool useSSL = 4;
string bucket_name = 5;
string root_path = 6;
bool useIAM = 7;
string IAMEndpoint = 8;
string storage_type = 9;
bool use_virtual_host = 10;
string region = 11;
string cloud_provider = 12;
int64 request_timeout_ms = 13;
string address = 1;
string access_keyID = 2;
string secret_access_key = 3;
bool useSSL = 4;
string bucket_name = 5;
string root_path = 6;
bool useIAM = 7;
string IAMEndpoint = 8;
string storage_type = 9;
bool use_virtual_host = 10;
string region = 11;
string cloud_provider = 12;
int64 request_timeout_ms = 13;
}
message CreateJobRequest {
string clusterID = 1;
string index_file_prefix = 2;
int64 buildID = 3;
repeated string data_paths = 4;
int64 index_version = 5;
int64 indexID = 6;
string index_name = 7;
StorageConfig storage_config = 8;
repeated common.KeyValuePair index_params = 9;
repeated common.KeyValuePair type_params = 10;
int64 num_rows = 11;
int32 current_index_version = 12;
string clusterID = 1;
string index_file_prefix = 2;
int64 buildID = 3;
repeated string data_paths = 4;
int64 index_version = 5;
int64 indexID = 6;
string index_name = 7;
StorageConfig storage_config = 8;
repeated common.KeyValuePair index_params = 9;
repeated common.KeyValuePair type_params = 10;
int64 num_rows = 11;
int32 current_index_version = 12;
int64 collectionID = 13;
int64 partitionID = 14;
int64 segmentID = 15;
int64 fieldID = 16;
string field_name = 17;
schema.DataType field_type = 18;
string store_path = 19;
int64 store_version = 20;
string index_store_path = 21;
int64 dim = 22;
}
message QueryJobsRequest {
string clusterID = 1;
repeated int64 buildIDs = 2;
string clusterID = 1;
repeated int64 buildIDs = 2;
}
message IndexTaskInfo {
int64 buildID = 1;
common.IndexState state = 2;
repeated string index_file_keys = 3;
uint64 serialized_size = 4;
string fail_reason = 5;
int32 current_index_version = 6;
int64 buildID = 1;
common.IndexState state = 2;
repeated string index_file_keys = 3;
uint64 serialized_size = 4;
string fail_reason = 5;
int32 current_index_version = 6;
int64 index_store_version = 7;
}
message QueryJobsResponse {
common.Status status = 1;
string clusterID = 2;
repeated IndexTaskInfo index_infos = 3;
common.Status status = 1;
string clusterID = 2;
repeated IndexTaskInfo index_infos = 3;
}
message DropJobsRequest {
string clusterID = 1;
repeated int64 buildIDs = 2;
string clusterID = 1;
repeated int64 buildIDs = 2;
}
message JobInfo {
int64 num_rows = 1;
int64 dim = 2;
int64 start_time = 3;
int64 end_time = 4;
repeated common.KeyValuePair index_params = 5;
int64 podID = 6;
int64 num_rows = 1;
int64 dim = 2;
int64 start_time = 3;
int64 end_time = 4;
repeated common.KeyValuePair index_params = 5;
int64 podID = 6;
}
message GetJobStatsRequest {
}
message GetJobStatsResponse {
common.Status status = 1;
int64 total_job_num = 2;
int64 in_progress_job_num = 3;
int64 enqueue_job_num = 4;
int64 task_slots = 5;
repeated JobInfo job_infos = 6;
bool enable_disk = 7;
common.Status status = 1;
int64 total_job_num = 2;
int64 in_progress_job_num = 3;
int64 enqueue_job_num = 4;
int64 task_slots = 5;
repeated JobInfo job_infos = 6;
bool enable_disk = 7;
}
message GetIndexStatisticsRequest {
int64 collectionID = 1;
string index_name = 2;
int64 collectionID = 1;
string index_name = 2;
}
message GetIndexStatisticsResponse {
common.Status status = 1;
repeated IndexInfo index_infos = 2;
common.Status status = 1;
repeated IndexInfo index_infos = 2;
}

View File

@ -97,6 +97,19 @@ func (bi *BuildIndexInfo) AppendFieldMetaInfo(collectionID int64, partitionID in
return HandleCStatus(&status, "appendFieldMetaInfo failed")
}
func (bi *BuildIndexInfo) AppendFieldMetaInfoV2(collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, fieldName string, dim int64) error {
cColID := C.int64_t(collectionID)
cParID := C.int64_t(partitionID)
cSegID := C.int64_t(segmentID)
cFieldID := C.int64_t(fieldID)
cintDType := uint32(fieldType)
cFieldName := C.CString(fieldName)
cDim := C.int64_t(dim)
defer C.free(unsafe.Pointer(cFieldName))
status := C.AppendFieldMetaInfoV2(bi.cBuildIndexInfo, cColID, cParID, cSegID, cFieldID, cFieldName, cintDType, cDim)
return HandleCStatus(&status, "appendFieldMetaInfo failed")
}
func (bi *BuildIndexInfo) AppendIndexMetaInfo(indexID int64, buildID int64, indexVersion int64) error {
cIndexID := C.int64_t(indexID)
cBuildID := C.int64_t(buildID)
@ -106,6 +119,16 @@ func (bi *BuildIndexInfo) AppendIndexMetaInfo(indexID int64, buildID int64, inde
return HandleCStatus(&status, "appendIndexMetaInfo failed")
}
func (bi *BuildIndexInfo) AppendIndexStorageInfo(dataStorePath, indexStorePath string, dataStoreVersion int64) error {
cDataStorePath := C.CString(dataStorePath)
defer C.free(unsafe.Pointer(cDataStorePath))
cIndexStorePath := C.CString(indexStorePath)
defer C.free(unsafe.Pointer(cIndexStorePath))
cVersion := C.int64_t(dataStoreVersion)
status := C.AppendIndexStorageInfo(bi.cBuildIndexInfo, cDataStorePath, cIndexStorePath, cVersion)
return HandleCStatus(&status, "appendIndexStorageInfo failed")
}
func (bi *BuildIndexInfo) AppendBuildIndexParam(indexParams map[string]string) error {
if len(indexParams) == 0 {
return nil

View File

@ -39,6 +39,7 @@ type CodecIndex interface {
Delete() error
CleanLocalData() error
UpLoad() (map[string]int64, error)
UpLoadV2() (int64, error)
}
var _ CodecIndex = (*CgoIndex)(nil)
@ -107,6 +108,21 @@ func CreateIndex(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecInde
return index, nil
}
func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIndex, error) {
var indexPtr C.CIndex
status := C.CreateIndexV2(&indexPtr, buildIndexInfo.cBuildIndexInfo)
if err := HandleCStatus(&status, "failed to create index"); err != nil {
return nil, err
}
index := &CgoIndex{
indexPtr: indexPtr,
close: false,
}
return index, nil
}
func (index *CgoIndex) Build(dataset *Dataset) error {
switch dataset.DType {
case schemapb.DataType_None:
@ -361,3 +377,40 @@ func (index *CgoIndex) UpLoad() (map[string]int64, error) {
return res, nil
}
func (index *CgoIndex) UpLoadV2() (int64, error) {
var cBinarySet C.CBinarySet
status := C.SerializeIndexAndUpLoadV2(index.indexPtr, &cBinarySet)
defer func() {
if cBinarySet != nil {
C.DeleteBinarySet(cBinarySet)
}
}()
if err := HandleCStatus(&status, "failed to serialize index and upload index"); err != nil {
return -1, err
}
buffer, err := GetBinarySetValue(cBinarySet, "index_store_version")
if err != nil {
return -1, err
}
var version int64
version = int64(buffer[7])
version = (version << 8) + int64(buffer[6])
version = (version << 8) + int64(buffer[5])
version = (version << 8) + int64(buffer[4])
version = (version << 8) + int64(buffer[3])
version = (version << 8) + int64(buffer[2])
version = (version << 8) + int64(buffer[1])
version = (version << 8) + int64(buffer[0])
runtime.SetFinalizer(index, func(index *CgoIndex) {
if index != nil && !index.close {
log.Error("there is leakage in index object, please check.")
}
})
return version, nil
}

View File

@ -0,0 +1,125 @@
package typeutil
import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error) {
arrowFields := make([]arrow.Field, 0, len(fields))
for _, field := range fields {
switch field.DataType {
case schemapb.DataType_Bool:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.FixedWidthTypes.Boolean,
})
case schemapb.DataType_Int8:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int8,
})
case schemapb.DataType_Int16:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int16,
})
case schemapb.DataType_Int32:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int32,
})
case schemapb.DataType_Int64:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Int64,
})
case schemapb.DataType_Float:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Float32,
})
case schemapb.DataType_Double:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.PrimitiveTypes.Float64,
})
case schemapb.DataType_String, schemapb.DataType_VarChar:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.BinaryTypes.String,
})
case schemapb.DataType_Array:
elemType, err := convertToArrowType(field.ElementType)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.ListOf(elemType),
})
case schemapb.DataType_JSON:
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: arrow.BinaryTypes.Binary,
})
case schemapb.DataType_BinaryVector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim / 8},
})
case schemapb.DataType_FloatVector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 4},
})
case schemapb.DataType_Float16Vector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 2},
})
default:
return nil, merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}
}
return arrow.NewSchema(arrowFields, nil), nil
}
func convertToArrowType(dataType schemapb.DataType) (arrow.DataType, error) {
switch dataType {
case schemapb.DataType_Bool:
return arrow.FixedWidthTypes.Boolean, nil
case schemapb.DataType_Int8:
return arrow.PrimitiveTypes.Int8, nil
case schemapb.DataType_Int16:
return arrow.PrimitiveTypes.Int16, nil
case schemapb.DataType_Int32:
return arrow.PrimitiveTypes.Int32, nil
case schemapb.DataType_Int64:
return arrow.PrimitiveTypes.Int64, nil
case schemapb.DataType_Float:
return arrow.PrimitiveTypes.Float32, nil
case schemapb.DataType_Double:
return arrow.PrimitiveTypes.Float64, nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
return arrow.BinaryTypes.String, nil
default:
return nil, merr.WrapErrParameterInvalidMsg("unknown type %v", dataType.String())
}
}

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package metacache
package typeutil
import (
"testing"