fix: solve incompitable problem for none-encoding index(#40838) (#41369)

related: #40838

Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
pull/41405/head
Chun Han 2025-04-20 22:56:44 +08:00 committed by GitHub
parent c4a41cc32b
commit 016920b023
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 167 additions and 56 deletions

View File

@ -50,6 +50,7 @@ constexpr const char* SCALAR_INDEX_ENGINE_VERSION =
"scalar_index_engine_version";
constexpr const char* TANTIVY_INDEX_VERSION = "tantivy_index_version";
constexpr uint32_t TANTIVY_INDEX_LATEST_VERSION = 7;
constexpr const char* INDEX_NON_ENCODING = "index.nonEncoding";
// index meta
constexpr const char* COLLECTION_ID = "collection_id";

View File

@ -180,6 +180,11 @@ CreateIndex(CIndex* res_index,
config[milvus::index::SCALAR_INDEX_ENGINE_VERSION] =
scalar_index_engine_version;
// check index encoding config
auto index_non_encoding_str =
config.value(milvus::index::INDEX_NON_ENCODING, "false");
bool index_non_encoding = index_non_encoding_str == "true";
// init file manager
milvus::storage::FieldDataMeta field_meta{
build_index_info->collectionid(),
@ -197,7 +202,7 @@ CreateIndex(CIndex* res_index,
build_index_info->field_schema().name(),
field_type,
build_index_info->dim(),
};
index_non_encoding};
auto chunk_manager =
milvus::storage::CreateChunkManager(storage_config);

View File

@ -48,7 +48,7 @@ IndexData::Serialize(StorageType medium) {
std::vector<uint8_t>
IndexData::serialize_to_remote_file() {
AssertInfo(field_data_meta_.has_value(), "field data not exist");
AssertInfo(field_data_meta_.has_value(), "field data meta not exist");
AssertInfo(index_meta_.has_value(), "index meta not exist");
// create descriptor event
DescriptorEvent descriptor_event;
@ -60,8 +60,11 @@ IndexData::serialize_to_remote_file() {
des_fix_part.field_id = field_data_meta_->field_id;
des_fix_part.start_timestamp = time_range_.first;
des_fix_part.end_timestamp = time_range_.second;
des_fix_part.data_type =
milvus::proto::schema::DataType(milvus::proto::schema::DataType::None);
des_fix_part.data_type = index_meta_->index_non_encoding
? milvus::proto::schema::DataType(
milvus::proto::schema::DataType::None)
: milvus::proto::schema::DataType(
payload_reader_->get_payload_datatype());
for (auto i = int8_t(EventType::DescriptorEvent);
i < int8_t(EventType::EventTypeEnd);
i++) {

View File

@ -67,6 +67,9 @@ class PayloadReader {
if (payload_buf_) {
return payload_buf_->Size();
}
if (field_data_) {
return field_data_->Size();
}
return 0;
}

View File

@ -83,6 +83,7 @@ struct IndexMeta {
std::string field_name;
DataType field_type;
int64_t dim;
bool index_non_encoding;
};
struct StorageConfig {

View File

@ -628,11 +628,21 @@ EncodeAndUploadIndexSlice(ChunkManager* chunk_manager,
IndexMeta index_meta,
FieldDataMeta field_meta,
std::string object_key) {
std::shared_ptr<IndexData> index_data = nullptr;
if (index_meta.index_non_encoding) {
index_data = std::make_shared<IndexData>(buf, batch_size);
// index-build tasks assigned from new milvus-coord nodes to none-encoding
} else {
auto field_data = CreateFieldData(DataType::INT8, false);
field_data->FillFieldData(buf, batch_size);
auto payload_reader = std::make_shared<PayloadReader>(field_data);
index_data = std::make_shared<IndexData>(payload_reader);
// index-build tasks assigned from old milvus-coord nodes, fallback to int8 encoding
}
// index not use valid_data, so no need to set nullable==true
auto indexData = std::make_shared<IndexData>(buf, batch_size);
indexData->set_index_meta(index_meta);
indexData->SetFieldDataMeta(field_meta);
auto serialized_index_data = indexData->serialize_to_remote_file();
index_data->set_index_meta(index_meta);
index_data->SetFieldDataMeta(field_meta);
auto serialized_index_data = index_data->serialize_to_remote_file();
auto serialized_index_size = serialized_index_data.size();
chunk_manager->Write(
object_key, serialized_index_data.data(), serialized_index_size);

View File

@ -21,18 +21,21 @@ type IndexEngineVersionManager interface {
GetCurrentScalarIndexEngineVersion() int32
GetMinimalScalarIndexEngineVersion() int32
GetIndexNonEncoding() bool
}
type versionManagerImpl struct {
mu lock.Mutex
versions map[int64]sessionutil.IndexEngineVersion
scalarIndexVersions map[int64]sessionutil.IndexEngineVersion
indexNonEncoding map[int64]bool
}
func newIndexEngineVersionManager() IndexEngineVersionManager {
return &versionManagerImpl{
versions: map[int64]sessionutil.IndexEngineVersion{},
scalarIndexVersions: map[int64]sessionutil.IndexEngineVersion{},
indexNonEncoding: map[int64]bool{},
}
}
@ -58,6 +61,7 @@ func (m *versionManagerImpl) RemoveNode(session *sessionutil.Session) {
delete(m.versions, session.ServerID)
delete(m.scalarIndexVersions, session.ServerID)
delete(m.indexNonEncoding, session.ServerID)
}
func (m *versionManagerImpl) Update(session *sessionutil.Session) {
@ -71,6 +75,7 @@ func (m *versionManagerImpl) addOrUpdate(session *sessionutil.Session) {
log.Info("addOrUpdate version", zap.Int64("nodeId", session.ServerID), zap.Int32("minimal", session.IndexEngineVersion.MinimalIndexVersion), zap.Int32("current", session.IndexEngineVersion.CurrentIndexVersion))
m.versions[session.ServerID] = session.IndexEngineVersion
m.scalarIndexVersions[session.ServerID] = session.ScalarIndexEngineVersion
m.indexNonEncoding[session.ServerID] = session.IndexNonEncoding
}
func (m *versionManagerImpl) GetCurrentIndexEngineVersion() int32 {
@ -148,3 +153,18 @@ func (m *versionManagerImpl) GetMinimalScalarIndexEngineVersion() int32 {
log.Info("Merged minimal scalar index version", zap.Int32("minimal", minimal))
return minimal
}
func (m *versionManagerImpl) GetIndexNonEncoding() bool {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.indexNonEncoding) == 0 {
log.Info("indexNonEncoding map is empty")
// by default, we fall back to old index format for safety
return false
}
noneEncoding := true
for _, encoding := range m.indexNonEncoding {
noneEncoding = noneEncoding && encoding
}
return noneEncoding
}

View File

@ -105,3 +105,53 @@ func Test_IndexEngineVersionManager_GetMergedScalarIndexVersion(t *testing.T) {
assert.Equal(t, int32(20), m.GetCurrentScalarIndexEngineVersion())
assert.Equal(t, int32(0), m.GetMinimalScalarIndexEngineVersion())
}
func Test_IndexEngineVersionManager_GetIndexNoneEncoding(t *testing.T) {
m := newIndexEngineVersionManager()
// empty
assert.False(t, m.GetIndexNonEncoding())
// startup
m.Startup(map[string]*sessionutil.Session{
"1": {
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 20, MinimalIndexVersion: 0},
IndexNonEncoding: false,
},
},
})
assert.False(t, m.GetIndexNonEncoding())
// add node
m.AddNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 10, MinimalIndexVersion: 5},
IndexNonEncoding: true,
},
})
// server1 is still use int8 encoding, the global index encoding must be int8
assert.False(t, m.GetIndexNonEncoding())
// update
m.Update(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 2,
ScalarIndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 2},
IndexNonEncoding: true,
},
})
assert.False(t, m.GetIndexNonEncoding())
// remove
m.RemoveNode(&sessionutil.Session{
SessionRaw: sessionutil.SessionRaw{
ServerID: 1,
IndexEngineVersion: sessionutil.IndexEngineVersion{CurrentIndexVersion: 5, MinimalIndexVersion: 3},
},
})
// after removing server1, then global none encoding should be true
assert.True(t, m.GetIndexNonEncoding())
}

View File

@ -143,6 +143,51 @@ func (_c *MockVersionManager_GetCurrentScalarIndexEngineVersion_Call) RunAndRetu
return _c
}
// GetIndexNonEncoding provides a mock function with given fields:
func (_m *MockVersionManager) GetIndexNonEncoding() bool {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetIndexNonEncoding")
}
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockVersionManager_GetIndexNonEncoding_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexNonEncoding'
type MockVersionManager_GetIndexNonEncoding_Call struct {
*mock.Call
}
// GetIndexNonEncoding is a helper method to define mock.On call
func (_e *MockVersionManager_Expecter) GetIndexNonEncoding() *MockVersionManager_GetIndexNonEncoding_Call {
return &MockVersionManager_GetIndexNonEncoding_Call{Call: _e.mock.On("GetIndexNonEncoding")}
}
func (_c *MockVersionManager_GetIndexNonEncoding_Call) Run(run func()) *MockVersionManager_GetIndexNonEncoding_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockVersionManager_GetIndexNonEncoding_Call) Return(_a0 bool) *MockVersionManager_GetIndexNonEncoding_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockVersionManager_GetIndexNonEncoding_Call) RunAndReturn(run func() bool) *MockVersionManager_GetIndexNonEncoding_Call {
_c.Call.Return(run)
return _c
}
// GetMinimalIndexEngineVersion provides a mock function with given fields:
func (_m *MockVersionManager) GetMinimalIndexEngineVersion() int32 {
ret := _m.Called()

View File

@ -816,7 +816,8 @@ func (s *Server) handleSessionEvent(ctx context.Context, role string, event *ses
case sessionutil.SessionAddEvent:
log.Info("received querynode register",
zap.String("address", event.Session.Address),
zap.Int64("serverID", event.Session.ServerID))
zap.Int64("serverID", event.Session.ServerID),
zap.Bool("indexNonEncoding", event.Session.IndexNonEncoding))
s.indexEngineVersionManager.AddNode(event.Session)
case sessionutil.SessionDelEvent:
log.Info("received querynode unregister",

View File

@ -67,51 +67,6 @@ func (_c *MockWorkerManager_AddNode_Call) RunAndReturn(run func(int64, string) e
return _c
}
// ClientSupportDisk provides a mock function with given fields:
func (_m *MockWorkerManager) ClientSupportDisk() bool {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for ClientSupportDisk")
}
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockWorkerManager_ClientSupportDisk_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClientSupportDisk'
type MockWorkerManager_ClientSupportDisk_Call struct {
*mock.Call
}
// ClientSupportDisk is a helper method to define mock.On call
func (_e *MockWorkerManager_Expecter) ClientSupportDisk() *MockWorkerManager_ClientSupportDisk_Call {
return &MockWorkerManager_ClientSupportDisk_Call{Call: _e.mock.On("ClientSupportDisk")}
}
func (_c *MockWorkerManager_ClientSupportDisk_Call) Run(run func()) *MockWorkerManager_ClientSupportDisk_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWorkerManager_ClientSupportDisk_Call) Return(_a0 bool) *MockWorkerManager_ClientSupportDisk_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWorkerManager_ClientSupportDisk_Call) RunAndReturn(run func() bool) *MockWorkerManager_ClientSupportDisk_Call {
_c.Call.Return(run)
return _c
}
// GetAllClients provides a mock function with given fields:
func (_m *MockWorkerManager) GetAllClients() map[int64]types.DataNodeClient {
ret := _m.Called()

View File

@ -245,7 +245,14 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
}
}
indexNonEncoding := "false"
if dependency.indexEngineVersionManager.GetIndexNonEncoding() {
indexNonEncoding = "true"
}
indexParams = append(indexParams, &commonpb.KeyValuePair{
Key: common.IndexNonEncoding,
Value: indexNonEncoding,
})
it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
@ -278,6 +285,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
zap.Int64("segID", segment.GetID()),
zap.Int32("CurrentIndexVersion", it.req.GetCurrentIndexVersion()),
zap.Int32("CurrentScalarIndexVersion", it.req.GetCurrentScalarIndexVersion()),
zap.String("IndexNonEncoding", indexNonEncoding),
zap.Int64("segID", segment.GetID()))
return true
}

View File

@ -160,7 +160,8 @@ func (node *QueryNode) initSession() error {
minimalIndexVersion, currentIndexVersion := getIndexEngineVersion()
node.session = sessionutil.NewSession(node.ctx,
sessionutil.WithIndexEngineVersion(minimalIndexVersion, currentIndexVersion),
sessionutil.WithScalarIndexEngineVersion(common.MinimalScalarIndexEngineVersion, common.CurrentScalarIndexEngineVersion))
sessionutil.WithScalarIndexEngineVersion(common.MinimalScalarIndexEngineVersion, common.CurrentScalarIndexEngineVersion),
sessionutil.WithIndexNonEncoding())
if node.session == nil {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}

View File

@ -103,6 +103,7 @@ type SessionRaw struct {
Version string `json:"Version"`
IndexEngineVersion IndexEngineVersion `json:"IndexEngineVersion,omitempty"`
ScalarIndexEngineVersion IndexEngineVersion `json:"ScalarIndexEngineVersion,omitempty"`
IndexNonEncoding bool `json:"IndexNonEncoding,omitempty"`
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
HostName string `json:"HostName,omitempty"`
@ -194,6 +195,12 @@ func WithScalarIndexEngineVersion(minimal, current int32) SessionOption {
}
}
func WithIndexNonEncoding() SessionOption {
return func(session *Session) {
session.IndexNonEncoding = true
}
}
func (s *Session) apply(opts ...SessionOption) {
for _, opt := range opts {
opt(s)

View File

@ -213,6 +213,7 @@ const (
IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled"
ReplicateIDKey = "replicate.id"
ReplicateEndTSKey = "replicate.endTS"
IndexNonEncoding = "index.nonEncoding"
)
const (