feat: support enable/disable mmap for index (#29005)

support enable/disable mmap for index, the user could alter the index's
mode by `AlterIndex` method
related: https://github.com/milvus-io/milvus/issues/21866

---------

Signed-off-by: yah01 <yah2er0ne@outlook.com>
Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/29302/head
yah01 2023-12-21 18:07:24 +08:00 committed by GitHub
parent 7a2374e698
commit a0e1a1eb31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1053 additions and 67 deletions

4
go.mod
View File

@ -23,8 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f
github.com/milvus-io/milvus/pkg v0.0.1
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c
github.com/minio/minio-go/v7 v7.0.61
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
@ -62,6 +61,7 @@ require github.com/apache/arrow/go/v12 v12.0.1
require github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092
require (
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/quasilyte/go-ruleguard/dsl v0.3.22
golang.org/x/net v0.17.0

6
go.sum
View File

@ -581,10 +581,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f h1:0cAMN9OsgBxlEUY8i1e1ocrBZ/cpu/Kdguz4JWz9fUc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY=
github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

View File

@ -18,6 +18,7 @@
package datacoord
import (
"context"
"fmt"
"strconv"
@ -211,6 +212,22 @@ func (m *meta) CreateIndex(index *model.Index) error {
return nil
}
func (m *meta) AlterIndex(ctx context.Context, indexes ...*model.Index) error {
m.Lock()
defer m.Unlock()
err := m.catalog.AlterIndexes(ctx, indexes)
if err != nil {
return err
}
for _, index := range indexes {
m.updateCollectionIndex(index)
}
return nil
}
// AddSegmentIndex adds the index meta corresponding the indexBuildID to meta table.
func (m *meta) AddSegmentIndex(segIndex *model.SegmentIndex) error {
m.Lock()

View File

@ -21,6 +21,7 @@ import (
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -236,6 +237,51 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
return merr.Success(), nil
}
func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()),
)
log.Info("received AlterIndex request", zap.Any("params", req.GetParams()))
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
params := make(map[string]string)
for _, index := range indexes {
for _, param := range index.UserIndexParams {
params[param.GetKey()] = param.GetValue()
}
// update the index params
for _, param := range req.GetParams() {
params[param.GetKey()] = param.GetValue()
}
log.Info("prepare to alter index",
zap.String("indexName", index.IndexName),
zap.Any("params", params),
)
index.UserIndexParams = lo.MapToSlice(params, func(k string, v string) *commonpb.KeyValuePair {
return &commonpb.KeyValuePair{
Key: k,
Value: v,
}
})
}
err := s.meta.AlterIndex(ctx, indexes...)
if err != nil {
log.Warn("failed to alter index", zap.Error(err))
return merr.Status(err), nil
}
return merr.Success(), nil
}
// GetIndexState gets the index state of the index name in the request from Proxy.
// Deprecated
func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {

View File

@ -237,6 +237,362 @@ func TestServer_CreateIndex(t *testing.T) {
})
}
func TestServer_AlterIndex(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
fieldID = UniqueID(10)
indexID = UniqueID(100)
segID = UniqueID(1000)
invalidSegID = UniqueID(1001)
buildID = UniqueID(10000)
indexName = "default_idx"
typeParams = []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
}
indexParams = []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "IVF_FLAT",
},
}
createTS = uint64(1000)
ctx = context.Background()
req = &indexpb.AlterIndexRequest{
CollectionID: collID,
IndexName: "default_idx",
Params: []*commonpb.KeyValuePair{{
Key: common.MmapEnabledKey,
Value: "true",
}},
}
)
catalog := catalogmocks.NewDataCoordCatalog(t)
catalog.On("AlterIndexes",
mock.Anything,
mock.Anything,
).Return(nil)
s := &Server{
meta: &meta{
catalog: catalog,
indexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
// finished
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// deleted
indexID + 1: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 1,
IndexID: indexID + 1,
IndexName: indexName + "_1",
IsDeleted: true,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// unissued
indexID + 2: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 2,
IndexID: indexID + 2,
IndexName: indexName + "_2",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// inProgress
indexID + 3: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 3,
IndexID: indexID + 3,
IndexName: indexName + "_3",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// failed
indexID + 4: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 4,
IndexID: indexID + 4,
IndexName: indexName + "_4",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
// unissued
indexID + 5: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID + 5,
IndexID: indexID + 5,
IndexName: indexName + "_5",
IsDeleted: false,
CreateTime: createTS,
TypeParams: typeParams,
IndexParams: indexParams,
IsAutoIndex: false,
UserIndexParams: nil,
},
},
},
segments: &SegmentsInfo{map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: createTS,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
indexID + 1: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 1,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
CreateTime: createTS,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
indexID + 3: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 3,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
FailReason: "",
IsDeleted: false,
CreateTime: createTS,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
indexID + 4: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 4,
BuildID: buildID + 4,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "mock failed",
IsDeleted: false,
CreateTime: createTS,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
indexID + 5: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 5,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Unissued,
FailReason: "",
IsDeleted: false,
CreateTime: createTS,
IndexFileKeys: nil,
IndexSize: 0,
WriteHandoff: false,
},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
segmentIndexes: map[UniqueID]*model.SegmentIndex{
indexID: {
SegmentID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID,
BuildID: buildID,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
indexID + 1: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 1,
BuildID: buildID + 1,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
indexID + 3: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 3,
BuildID: buildID + 3,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_InProgress,
CreateTime: createTS,
},
indexID + 4: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 4,
BuildID: buildID + 4,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Failed,
FailReason: "mock failed",
CreateTime: createTS,
},
indexID + 5: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
NumRows: 10000,
IndexID: indexID + 5,
BuildID: buildID + 5,
NodeID: 0,
IndexVersion: 1,
IndexState: commonpb.IndexState_Finished,
CreateTime: createTS,
},
},
},
}},
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.AlterIndex(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
})
s.stateCode.Store(commonpb.StateCode_Healthy)
t.Run("success", func(t *testing.T) {
resp, err := s.AlterIndex(ctx, req)
assert.NoError(t, merr.CheckRPCCall(resp, err))
describeResp, err := s.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
CollectionID: collID,
IndexName: "default_idx",
Timestamp: createTS,
})
assert.NoError(t, merr.CheckRPCCall(describeResp, err))
assert.True(t, common.IsMmapEnabled(describeResp.IndexInfos[0].GetUserIndexParams()...), "indexInfo: %+v", describeResp.IndexInfos[0])
})
}
func TestServer_GetIndexState(t *testing.T) {
var (
collID = UniqueID(1)

View File

@ -564,6 +564,13 @@ func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
})
}
// AlterIndex sends the alter index request to IndexCoord.
func (c *Client) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.AlterIndex(ctx, req)
})
}
// GetIndexState gets the index states from IndexCoord.
func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStateResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {

View File

@ -436,6 +436,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
return s.dataCoord.CreateIndex(ctx, req)
}
func (s *Server) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest) (*commonpb.Status, error) {
return s.dataCoord.AlterIndex(ctx, req)
}
// GetIndexState gets the index states from DataCoord.
// Deprecated: use DescribeIndex instead
func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {

View File

@ -821,9 +821,9 @@ func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexR
return s.proxy.CreateIndex(ctx, request)
}
// AlterIndex notifies Proxy to alter index
func (s *Server) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexRequest) (*commonpb.Status, error) {
// Todo
return nil, nil
return s.proxy.AlterIndex(ctx, request)
}
// DropIndex notifies Proxy to drop index
@ -871,8 +871,7 @@ func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*
}
func (s *Server) SearchV2(ctx context.Context, request *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) {
// Todo
return nil, nil
return s.proxy.SearchV2(ctx, request)
}
func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {

View File

@ -327,6 +327,11 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
for _, kvPair := range it.req.GetIndexParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
// knowhere would report error if encountered the unknown key,
// so skip this
if key == common.MmapEnabledKey {
continue
}
indexParams[key] = value
}
it.newTypeParams = typeParams

View File

@ -36,6 +36,61 @@ func (_m *MockDataCoord) EXPECT() *MockDataCoord_Expecter {
return &MockDataCoord_Expecter{mock: &_m.Mock}
}
// AlterIndex provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) AlterIndex(_a0 context.Context, _a1 *indexpb.AlterIndexRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.AlterIndexRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoord_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex'
type MockDataCoord_AlterIndex_Call struct {
*mock.Call
}
// AlterIndex is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *indexpb.AlterIndexRequest
func (_e *MockDataCoord_Expecter) AlterIndex(_a0 interface{}, _a1 interface{}) *MockDataCoord_AlterIndex_Call {
return &MockDataCoord_AlterIndex_Call{Call: _e.mock.On("AlterIndex", _a0, _a1)}
}
func (_c *MockDataCoord_AlterIndex_Call) Run(run func(_a0 context.Context, _a1 *indexpb.AlterIndexRequest)) *MockDataCoord_AlterIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*indexpb.AlterIndexRequest))
})
return _c
}
func (_c *MockDataCoord_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataCoord_AlterIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoord_AlterIndex_Call) RunAndReturn(run func(context.Context, *indexpb.AlterIndexRequest) (*commonpb.Status, error)) *MockDataCoord_AlterIndex_Call {
_c.Call.Return(run)
return _c
}
// AssignSegmentID provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) AssignSegmentID(_a0 context.Context, _a1 *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -33,6 +33,76 @@ func (_m *MockDataCoordClient) EXPECT() *MockDataCoordClient_Expecter {
return &MockDataCoordClient_Expecter{mock: &_m.Mock}
}
// AlterIndex provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) AlterIndex(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) (*commonpb.Status, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) *commonpb.Status); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataCoordClient_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex'
type MockDataCoordClient_AlterIndex_Call struct {
*mock.Call
}
// AlterIndex is a helper method to define mock.On call
// - ctx context.Context
// - in *indexpb.AlterIndexRequest
// - opts ...grpc.CallOption
func (_e *MockDataCoordClient_Expecter) AlterIndex(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_AlterIndex_Call {
return &MockDataCoordClient_AlterIndex_Call{Call: _e.mock.On("AlterIndex",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataCoordClient_AlterIndex_Call) Run(run func(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption)) *MockDataCoordClient_AlterIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*indexpb.AlterIndexRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataCoordClient_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockDataCoordClient_AlterIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataCoordClient_AlterIndex_Call) RunAndReturn(run func(context.Context, *indexpb.AlterIndexRequest, ...grpc.CallOption) (*commonpb.Status, error)) *MockDataCoordClient_AlterIndex_Call {
_c.Call.Return(run)
return _c
}
// AssignSegmentID provides a mock function with given fields: ctx, in, opts
func (_m *MockDataCoordClient) AssignSegmentID(ctx context.Context, in *datapb.AssignSegmentIDRequest, opts ...grpc.CallOption) (*datapb.AssignSegmentIDResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -199,6 +199,61 @@ func (_c *MockProxy_AlterCollection_Call) RunAndReturn(run func(context.Context,
return _c
}
// AlterIndex provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) AlterIndex(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
var r0 *commonpb.Status
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterIndexRequest) (*commonpb.Status, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterIndexRequest) *commonpb.Status); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterIndexRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockProxy_AlterIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AlterIndex'
type MockProxy_AlterIndex_Call struct {
*mock.Call
}
// AlterIndex is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.AlterIndexRequest
func (_e *MockProxy_Expecter) AlterIndex(_a0 interface{}, _a1 interface{}) *MockProxy_AlterIndex_Call {
return &MockProxy_AlterIndex_Call{Call: _e.mock.On("AlterIndex", _a0, _a1)}
}
func (_c *MockProxy_AlterIndex_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest)) *MockProxy_AlterIndex_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.AlterIndexRequest))
})
return _c
}
func (_c *MockProxy_AlterIndex_Call) Return(_a0 *commonpb.Status, _a1 error) *MockProxy_AlterIndex_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockProxy_AlterIndex_Call) RunAndReturn(run func(context.Context, *milvuspb.AlterIndexRequest) (*commonpb.Status, error)) *MockProxy_AlterIndex_Call {
_c.Call.Return(run)
return _c
}
// CalcDistance provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) CalcDistance(_a0 context.Context, _a1 *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
ret := _m.Called(_a0, _a1)
@ -639,11 +694,6 @@ func (_c *MockProxy_CreateIndex_Call) RunAndReturn(run func(context.Context, *mi
return _c
}
func (_m *MockProxy) AlterIndex(_a0 context.Context, _a1 *milvuspb.AlterIndexRequest) (*commonpb.Status, error) {
// Todo
return nil, nil
}
// CreatePartition provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) CreatePartition(_a0 context.Context, _a1 *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)
@ -4560,9 +4610,59 @@ func (_c *MockProxy_Search_Call) RunAndReturn(run func(context.Context, *milvusp
return _c
}
// SearchV2 provides a mock function with given fields: _a0, _a1
func (_m *MockProxy) SearchV2(_a0 context.Context, _a1 *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) {
// Todo
return nil, nil
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.SearchResults
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.SearchRequestV2) *milvuspb.SearchResults); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.SearchResults)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.SearchRequestV2) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockProxy_SearchV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SearchV2'
type MockProxy_SearchV2_Call struct {
*mock.Call
}
// SearchV2 is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.SearchRequestV2
func (_e *MockProxy_Expecter) SearchV2(_a0 interface{}, _a1 interface{}) *MockProxy_SearchV2_Call {
return &MockProxy_SearchV2_Call{Call: _e.mock.On("SearchV2", _a0, _a1)}
}
func (_c *MockProxy_SearchV2_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.SearchRequestV2)) *MockProxy_SearchV2_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.SearchRequestV2))
})
return _c
}
func (_c *MockProxy_SearchV2_Call) Return(_a0 *milvuspb.SearchResults, _a1 error) *MockProxy_SearchV2_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockProxy_SearchV2_Call) RunAndReturn(run func(context.Context, *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error)) *MockProxy_SearchV2_Call {
_c.Call.Return(run)
return _c
}
// SelectGrant provides a mock function with given fields: _a0, _a1

View File

@ -78,6 +78,7 @@ service DataCoord {
rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {}
rpc CreateIndex(index.CreateIndexRequest) returns (common.Status){}
rpc AlterIndex(index.AlterIndexRequest) returns (common.Status){}
// Deprecated: use DescribeIndex instead
rpc GetIndexState(index.GetIndexStateRequest) returns (index.GetIndexStateResponse) {}
rpc GetSegmentIndexState(index.GetSegmentIndexStateRequest) returns (index.GetSegmentIndexStateResponse) {}

View File

@ -10,33 +10,19 @@ import "milvus.proto";
import "schema.proto";
service IndexCoord {
rpc GetComponentStates(milvus.GetComponentStatesRequest)
returns (milvus.ComponentStates) {
}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest)
returns (milvus.StringResponse) {
}
rpc CreateIndex(CreateIndexRequest) returns (common.Status) {
}
// Deprecated: use DescribeIndex instead
rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) {
}
rpc GetSegmentIndexState(GetSegmentIndexStateRequest)
returns (GetSegmentIndexStateResponse) {
}
rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse) {
}
rpc DropIndex(DropIndexRequest) returns (common.Status) {
}
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {
}
rpc GetIndexStatistics(GetIndexStatisticsRequest)
returns (GetIndexStatisticsResponse) {
}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(GetIndexBuildProgressRequest)
returns (GetIndexBuildProgressResponse) {
}
rpc GetComponentStates(milvus.GetComponentStatesRequest) returns (milvus.ComponentStates) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc CreateIndex(CreateIndexRequest) returns (common.Status){}
rpc AlterIndex(AlterIndexRequest) returns (common.Status){}
// Deprecated: use DescribeIndex instead
rpc GetIndexState(GetIndexStateRequest) returns (GetIndexStateResponse) {}
rpc GetSegmentIndexState(GetSegmentIndexStateRequest) returns (GetSegmentIndexStateResponse) {}
rpc GetIndexInfos(GetIndexInfoRequest) returns (GetIndexInfoResponse){}
rpc DropIndex(DropIndexRequest) returns (common.Status) {}
rpc DescribeIndex(DescribeIndexRequest) returns (DescribeIndexResponse) {}
rpc GetIndexStatistics(GetIndexStatisticsRequest) returns (GetIndexStatisticsResponse) {}
// Deprecated: use DescribeIndex instead
rpc GetIndexBuildProgress(GetIndexBuildProgressRequest) returns (GetIndexBuildProgressResponse) {}
rpc ShowConfigurations(internal.ShowConfigurationsRequest)
returns (internal.ShowConfigurationsResponse) {
@ -171,6 +157,12 @@ message CreateIndexRequest {
repeated common.KeyValuePair user_index_params = 8;
}
message AlterIndexRequest {
int64 collectionID = 1;
string index_name = 2;
repeated common.KeyValuePair params = 3;
}
message GetIndexInfoRequest {
int64 collectionID = 1;
repeated int64 segmentIDs = 2;

View File

@ -1815,8 +1815,74 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde
}
func (node *Proxy) AlterIndex(ctx context.Context, request *milvuspb.AlterIndexRequest) (*commonpb.Status, error) {
// Todo
return nil, nil
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return merr.Status(err), nil
}
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterIndex")
defer sp.End()
task := &alterIndexTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: request,
datacoord: node.dataCoord,
querycoord: node.queryCoord,
replicateMsgStream: node.replicateMsgStream,
}
method := "AlterIndex"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
log := log.Ctx(ctx).With(
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.String("indexName", request.GetIndexName()),
zap.Any("extraParams", request.ExtraParams))
log.Info(rpcReceived(method))
if err := node.sched.ddQueue.Enqueue(task); err != nil {
log.Warn(
rpcFailedToEnqueue(method),
zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel).Inc()
return merr.Status(err), nil
}
log.Info(
rpcEnqueued(method),
zap.Uint64("BeginTs", task.BeginTs()),
zap.Uint64("EndTs", task.EndTs()))
if err := task.WaitToFinish(); err != nil {
log.Warn(
rpcFailedToWaitToFinish(method),
zap.Error(err),
zap.Uint64("BeginTs", task.BeginTs()),
zap.Uint64("EndTs", task.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return merr.Status(err), nil
}
log.Info(
rpcDone(method),
zap.Uint64("BeginTs", task.BeginTs()),
zap.Uint64("EndTs", task.EndTs()))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return task.result, nil
}
// DescribeIndex get the meta information of index, such as index state, index id and etc.
@ -2655,8 +2721,9 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
}
func (node *Proxy) SearchV2(ctx context.Context, request *milvuspb.SearchRequestV2) (*milvuspb.SearchResults, error) {
// Todo
return nil, nil
return &milvuspb.SearchResults{
Status: merr.Status(merr.WrapErrServiceInternal("unimplemented")),
}, nil
}
func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) {

View File

@ -1107,6 +1107,26 @@ func TestProxy(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
wg.Add(1)
t.Run("alter_index", func(t *testing.T) {
defer wg.Done()
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: collectionName,
IndexName: indexName,
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.MmapEnabledKey,
Value: "true",
},
},
}
resp, err := proxy.AlterIndex(ctx, req)
err = merr.CheckRPCCall(resp, err)
assert.NoError(t, err)
})
wg.Add(1)
t.Run("describe index", func(t *testing.T) {
defer wg.Done()
@ -1117,9 +1137,26 @@ func TestProxy(t *testing.T) {
FieldName: floatVecField,
IndexName: "",
})
err = merr.CheckRPCCall(resp, err)
assert.NoError(t, err)
assert.Equal(t, indexName, resp.IndexDescriptions[0].IndexName)
assert.True(t, common.IsMmapEnabled(resp.IndexDescriptions[0].GetParams()...), "params: %+v", resp.IndexDescriptions[0])
// disable mmap then the tests below could continue
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: collectionName,
IndexName: indexName,
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.MmapEnabledKey,
Value: "false",
},
},
}
status, err := proxy.AlterIndex(ctx, req)
err = merr.CheckRPCCall(status, err)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
indexName = resp.IndexDescriptions[0].IndexName
})
wg.Add(1)

View File

@ -42,6 +42,7 @@ import (
const (
CreateIndexTaskName = "CreateIndexTask"
AlterIndexTaskName = "AlterIndexTask"
DescribeIndexTaskName = "DescribeIndexTask"
DropIndexTaskName = "DropIndexTask"
GetIndexStateTaskName = "GetIndexStateTask"
@ -423,6 +424,121 @@ func (cit *createIndexTask) PostExecute(ctx context.Context) error {
return nil
}
type alterIndexTask struct {
Condition
req *milvuspb.AlterIndexRequest
ctx context.Context
datacoord types.DataCoordClient
querycoord types.QueryCoordClient
result *commonpb.Status
replicateMsgStream msgstream.MsgStream
collectionID UniqueID
}
func (t *alterIndexTask) TraceCtx() context.Context {
return t.ctx
}
func (t *alterIndexTask) ID() UniqueID {
return t.req.GetBase().GetMsgID()
}
func (t *alterIndexTask) SetID(uid UniqueID) {
t.req.GetBase().MsgID = uid
}
func (t *alterIndexTask) Name() string {
return CreateIndexTaskName
}
func (t *alterIndexTask) Type() commonpb.MsgType {
return t.req.GetBase().GetMsgType()
}
func (t *alterIndexTask) BeginTs() Timestamp {
return t.req.GetBase().GetTimestamp()
}
func (t *alterIndexTask) EndTs() Timestamp {
return t.req.GetBase().GetTimestamp()
}
func (t *alterIndexTask) SetTs(ts Timestamp) {
t.req.Base.Timestamp = ts
}
func (t *alterIndexTask) OnEnqueue() error {
if t.req.Base == nil {
t.req.Base = commonpbutil.NewMsgBase()
}
return nil
}
func (t *alterIndexTask) PreExecute(ctx context.Context) error {
t.req.Base.MsgType = commonpb.MsgType_AlterIndex
t.req.Base.SourceID = paramtable.GetNodeID()
for _, param := range t.req.GetExtraParams() {
if !indexparams.IsConfigableIndexParam(param.GetKey()) {
return merr.WrapErrParameterInvalidMsg("%s is not configable index param", param.GetKey())
}
}
collName := t.req.GetCollectionName()
collection, err := globalMetaCache.GetCollectionID(ctx, t.req.GetDbName(), collName)
if err != nil {
return err
}
t.collectionID = collection
if err = validateIndexName(t.req.GetIndexName()); err != nil {
return err
}
loaded, err := isCollectionLoaded(ctx, t.querycoord, collection)
if err != nil {
return err
}
if loaded {
return merr.WrapErrCollectionLoaded(collName, "can't alter index on loaded collection, please release the collection first")
}
return nil
}
func (t *alterIndexTask) Execute(ctx context.Context) error {
log := log.Ctx(ctx).With(
zap.String("collection", t.req.GetCollectionName()),
zap.String("indexName", t.req.GetIndexName()),
zap.Any("params", t.req.GetExtraParams()),
)
log.Info("alter index")
var err error
req := &indexpb.AlterIndexRequest{
CollectionID: t.collectionID,
IndexName: t.req.GetIndexName(),
Params: t.req.GetExtraParams(),
}
t.result, err = t.datacoord.AlterIndex(ctx, req)
if err != nil {
return err
}
if t.result.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(t.result.Reason)
}
SendReplicateMessagePack(ctx, t.replicateMsgStream, t.req)
return nil
}
func (t *alterIndexTask) PostExecute(ctx context.Context) error {
return nil
}
type describeIndexTask struct {
Condition
*milvuspb.DescribeIndexRequest

View File

@ -1568,6 +1568,11 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream.
BaseMsg: getBaseMsg(ctx, ts),
ReleasePartitionsRequest: *r,
}
case *milvuspb.AlterIndexRequest:
tsMsg = &msgstream.AlterIndexMsg{
BaseMsg: getBaseMsg(ctx, ts),
AlterIndexRequest: *r,
}
default:
log.Warn("unknown request", zap.Any("request", request))
return

View File

@ -22,15 +22,19 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -194,14 +198,32 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
if channel == nil {
channel = ex.targetMgr.GetDmChannel(task.CollectionID(), segment.GetInsertChannel(), meta.NextTarget)
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes)
// Get collection index info
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
indexInfos, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
if err != nil {
log.Warn("fail to get index meta of collection")
return err
}
// update the field index params
for _, segmentIndex := range indexes {
index, found := lo.Find(indexInfos, func(indexInfo *indexpb.IndexInfo) bool {
return indexInfo.IndexID == segmentIndex.IndexID
})
if !found {
log.Warn("no collection index info for the given segment index", zap.String("indexName", segmentIndex.GetIndexName()))
}
params := funcutil.KeyValuePair2Map(segmentIndex.GetIndexParams())
for _, kv := range index.GetUserIndexParams() {
if indexparams.IsConfigableIndexParam(kv.GetKey()) {
params[kv.GetKey()] = kv.GetValue()
}
}
segmentIndex.IndexParams = funcutil.Map2KeyValuePair(params)
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes)
req := packLoadSegmentRequest(
task,
@ -210,7 +232,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
collectionInfo.GetProperties(),
loadMeta,
loadInfo,
indexInfo,
indexInfos,
)
// Get shard leader for the given replica and segment

View File

@ -27,8 +27,12 @@ import "C"
import (
"unsafe"
"github.com/pingcap/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
@ -56,10 +60,15 @@ func deleteLoadIndexInfo(info *LoadIndexInfo) {
C.DeleteLoadIndexInfo(info.cLoadIndexInfo)
}
func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType, enableMmap bool) error {
func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error {
fieldID := indexInfo.FieldID
indexPaths := indexInfo.IndexFilePaths
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
enableMmap := indexParams[common.MmapEnabledKey] == "true"
// as Knowhere reports error if encounter a unknown param, we need to delete it
delete(indexParams, common.MmapEnabledKey)
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
err := li.appendFieldInfo(collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath)
if err != nil {
@ -72,7 +81,6 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo,
}
// some build params also exist in indexParams, which are useless during loading process
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
if indexParams["index_type"] == indexparamcheck.IndexDISKANN {
err = indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows())
if err != nil {
@ -85,6 +93,7 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(indexInfo *querypb.FieldIndexInfo,
return err
}
log.Info("load with index params", zap.Any("indexParams", indexParams))
for key, value := range indexParams {
err = li.appendIndexParam(key, value)
if err != nil {

View File

@ -857,14 +857,14 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return nil
}
func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType, enableMmap bool) error {
func (s *LocalSegment) LoadIndex(indexInfo *querypb.FieldIndexInfo, fieldType schemapb.DataType) error {
loadIndexInfo, err := newLoadIndexInfo()
defer deleteLoadIndexInfo(loadIndexInfo)
if err != nil {
return err
}
err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType, enableMmap)
err = loadIndexInfo.appendLoadIndexInfo(indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType)
if err != nil {
if loadIndexInfo.cleanLocalData() != nil {
log.Warn("failed to clean cached data on disk after append index failed",

View File

@ -727,7 +727,7 @@ func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalS
return merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load field index")
}
return segment.LoadIndex(indexInfo, fieldType, common.IsFieldMmapEnabled(collection.Schema(), indexInfo.GetFieldID()))
return segment.LoadIndex(indexInfo, fieldType)
}
func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int64, bfs *pkoracle.BloomFilterSet,

View File

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.16.5
github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c
github.com/nats-io/nats-server/v2 v2.9.17
github.com/nats-io/nats.go v1.24.0
github.com/panjf2000/ants/v2 v2.7.2
@ -44,6 +44,7 @@ require (
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17
golang.org/x/net v0.17.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.13.0
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
@ -156,7 +157,6 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect

View File

@ -477,12 +477,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231008032233-5d64d443769d h1:K8yyzz8BCBm+wirhRgySyB8wN+sw33eB3VsLz6Slu5s=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20231008032233-5d64d443769d/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e h1:IH1WAXwEF8vbwahPdupi4zzRNWViT4B7fZzIjtRLpG4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231114080011-9a495865219e/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f h1:0cAMN9OsgBxlEUY8i1e1ocrBZ/cpu/Kdguz4JWz9fUc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231208092431-02cbad30332f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c h1:Wbc2IZt/13+B5jc8JPU/dOxGYy+1jeOsChVgcza+qgw=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231211073628-ce99324c276c/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

View File

@ -86,6 +86,68 @@ func (it *CreateIndexMsg) Size() int {
return proto.Size(&it.CreateIndexRequest)
}
// AlterIndexMsg is a message pack that contains create index request
type AlterIndexMsg struct {
BaseMsg
milvuspb.AlterIndexRequest
}
// interface implementation validation
var _ TsMsg = &AlterIndexMsg{}
// ID returns the ID of this message pack
func (it *AlterIndexMsg) ID() UniqueID {
return it.Base.MsgID
}
// SetID set the ID of this message pack
func (it *AlterIndexMsg) SetID(id UniqueID) {
it.Base.MsgID = id
}
// Type returns the type of this message pack
func (it *AlterIndexMsg) Type() MsgType {
return it.Base.MsgType
}
// SourceID indicates which component generated this message
func (it *AlterIndexMsg) SourceID() int64 {
return it.Base.SourceID
}
// Marshal is used to serialize a message pack to byte array
func (it *AlterIndexMsg) Marshal(input TsMsg) (MarshalType, error) {
AlterIndexMsg := input.(*AlterIndexMsg)
AlterIndexRequest := &AlterIndexMsg.AlterIndexRequest
mb, err := proto.Marshal(AlterIndexRequest)
if err != nil {
return nil, err
}
return mb, nil
}
// Unmarshal is used to deserialize a message pack from byte array
func (it *AlterIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) {
alterIndexRequest := milvuspb.AlterIndexRequest{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &alterIndexRequest)
if err != nil {
return nil, err
}
alterIndexMsg := &AlterIndexMsg{AlterIndexRequest: alterIndexRequest}
alterIndexMsg.BeginTimestamp = alterIndexMsg.GetBase().GetTimestamp()
alterIndexMsg.EndTimestamp = alterIndexMsg.GetBase().GetTimestamp()
return alterIndexMsg, nil
}
func (it *AlterIndexMsg) Size() int {
return proto.Size(&it.AlterIndexRequest)
}
// DropIndexMsg is a message pack that contains drop index request
type DropIndexMsg struct {
BaseMsg

View File

@ -20,7 +20,7 @@ func Test_GetPrivilegeExtObj(t *testing.T) {
assert.Equal(t, commonpb.ObjectPrivilege_PrivilegeLoad, privilegeExt.ObjectPrivilege)
assert.Equal(t, int32(3), privilegeExt.ObjectNameIndex)
request2 := &milvuspb.GetPartitionStatisticsRequest{}
request2 := &milvuspb.ListAliasesRequest{}
_, err = GetPrivilegeExtObj(request2)
assert.Error(t, err)
}

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -48,6 +49,16 @@ const (
MaxBeamWidth = 16
)
var configableIndexParams = typeutil.NewSet[string]()
func init() {
configableIndexParams.Insert(common.MmapEnabledKey)
}
func IsConfigableIndexParam(key string) bool {
return configableIndexParams.Contain(key)
}
func getRowDataSizeOfFloatVector(numRows int64, dim int64) int64 {
var floatValue float32
/* #nosec G103 */

View File

@ -49,6 +49,7 @@ var (
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)

View File

@ -85,6 +85,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound)
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to query"), ErrCollectionNotLoaded)
s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded)
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded)
// Partition related
s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound)

View File

@ -456,6 +456,15 @@ func WrapErrCollectionNotFullyLoaded(collection any, msg ...string) error {
return err
}
func WrapErrCollectionLoaded(collection string, msgAndArgs ...any) error {
err := wrapFields(ErrCollectionLoaded, value("collection", collection))
if len(msgAndArgs) > 0 {
msg := msgAndArgs[0].(string)
err = errors.Wrapf(err, msg, msgAndArgs[1:]...)
}
return err
}
func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
err := wrapFields(ErrAliasNotFound,
value("database", db),