fix: [Cherry-pick] Use correct pools for all CGO methods in segments pkg (#30275)

Cherry-pick from master
pr: #30274
See also #30273

This PR:
- Rename confusing `LoadIndexInfo` to `UpdateIndexInfo` for LocalSegment
- Use `DynamicPool` instead of `LoadPool` for `UpdateSealedSegmentIndex`
- Fix cgo call missing pool control

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30293/head
congqixia 2024-01-25 19:49:01 +08:00 committed by GitHub
parent 1a54571c10
commit d182a51653
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 102 additions and 49 deletions

View File

@ -34,9 +34,12 @@ type LoadFieldDataInfo struct {
} }
func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) { func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) {
var status C.CStatus
var cLoadFieldDataInfo C.CLoadFieldDataInfo var cLoadFieldDataInfo C.CLoadFieldDataInfo
GetDynamicPool().Submit(func() (any, error) {
status := C.NewLoadFieldDataInfo(&cLoadFieldDataInfo) status = C.NewLoadFieldDataInfo(&cLoadFieldDataInfo)
return nil, nil
}).Await()
if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil { if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil {
return nil, err return nil, err
} }
@ -44,30 +47,46 @@ func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) {
} }
func deleteFieldDataInfo(info *LoadFieldDataInfo) { func deleteFieldDataInfo(info *LoadFieldDataInfo) {
C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo) GetDynamicPool().Submit(func() (any, error) {
C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo)
return nil, nil
}).Await()
} }
func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error { func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error {
cFieldID := C.int64_t(fieldID) var status C.CStatus
cRowCount := C.int64_t(rowCount) GetDynamicPool().Submit(func() (any, error) {
cFieldID := C.int64_t(fieldID)
cRowCount := C.int64_t(rowCount)
status = C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount)
return nil, nil
}).Await()
status := C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount)
return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed") return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed")
} }
func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error { func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error {
cFieldID := C.int64_t(fieldID) var status C.CStatus
cEntriesNum := C.int64_t(binlog.GetEntriesNum()) GetDynamicPool().Submit(func() (any, error) {
cFile := C.CString(binlog.GetLogPath()) cFieldID := C.int64_t(fieldID)
defer C.free(unsafe.Pointer(cFile)) cEntriesNum := C.int64_t(binlog.GetEntriesNum())
cFile := C.CString(binlog.GetLogPath())
defer C.free(unsafe.Pointer(cFile))
status = C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile)
return nil, nil
}).Await()
status := C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile)
return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed") return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed")
} }
func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) { func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) {
cDir := C.CString(dir) GetDynamicPool().Submit(func() (any, error) {
defer C.free(unsafe.Pointer(cDir)) cDir := C.CString(dir)
defer C.free(unsafe.Pointer(cDir))
C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir) C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir)
return nil, nil
}).Await()
} }

View File

@ -56,7 +56,10 @@ func newLoadIndexInfo(ctx context.Context) (*LoadIndexInfo, error) {
// deleteLoadIndexInfo would delete C.CLoadIndexInfo // deleteLoadIndexInfo would delete C.CLoadIndexInfo
func deleteLoadIndexInfo(info *LoadIndexInfo) { func deleteLoadIndexInfo(info *LoadIndexInfo) {
C.DeleteLoadIndexInfo(info.cLoadIndexInfo) GetDynamicPool().Submit(func() (any, error) {
C.DeleteLoadIndexInfo(info.cLoadIndexInfo)
return nil, nil
}).Await()
} }
func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error { func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error {
@ -105,46 +108,66 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *que
// appendIndexParam append indexParam to index // appendIndexParam append indexParam to index
func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error { func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error {
cIndexKey := C.CString(indexKey) var status C.CStatus
defer C.free(unsafe.Pointer(cIndexKey)) GetDynamicPool().Submit(func() (any, error) {
cIndexValue := C.CString(indexValue) cIndexKey := C.CString(indexKey)
defer C.free(unsafe.Pointer(cIndexValue)) defer C.free(unsafe.Pointer(cIndexKey))
status := C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue) cIndexValue := C.CString(indexValue)
defer C.free(unsafe.Pointer(cIndexValue))
status = C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendIndexParam failed") return HandleCStatus(ctx, &status, "AppendIndexParam failed")
} }
func (li *LoadIndexInfo) appendIndexInfo(ctx context.Context, indexID int64, buildID int64, indexVersion int64) error { func (li *LoadIndexInfo) appendIndexInfo(ctx context.Context, indexID int64, buildID int64, indexVersion int64) error {
cIndexID := C.int64_t(indexID) var status C.CStatus
cBuildID := C.int64_t(buildID) GetDynamicPool().Submit(func() (any, error) {
cIndexVersion := C.int64_t(indexVersion) cIndexID := C.int64_t(indexID)
cBuildID := C.int64_t(buildID)
cIndexVersion := C.int64_t(indexVersion)
status := C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion) status = C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendIndexInfo failed") return HandleCStatus(ctx, &status, "AppendIndexInfo failed")
} }
func (li *LoadIndexInfo) cleanLocalData(ctx context.Context) error { func (li *LoadIndexInfo) cleanLocalData(ctx context.Context) error {
status := C.CleanLoadedIndex(li.cLoadIndexInfo) var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
status = C.CleanLoadedIndex(li.cLoadIndexInfo)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "failed to clean cached data on disk") return HandleCStatus(ctx, &status, "failed to clean cached data on disk")
} }
func (li *LoadIndexInfo) appendIndexFile(ctx context.Context, filePath string) error { func (li *LoadIndexInfo) appendIndexFile(ctx context.Context, filePath string) error {
cIndexFilePath := C.CString(filePath) var status C.CStatus
defer C.free(unsafe.Pointer(cIndexFilePath)) GetDynamicPool().Submit(func() (any, error) {
cIndexFilePath := C.CString(filePath)
defer C.free(unsafe.Pointer(cIndexFilePath))
status := C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath) status = C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendIndexIFile failed") return HandleCStatus(ctx, &status, "AppendIndexIFile failed")
} }
// appendFieldInfo appends fieldID & fieldType to index // appendFieldInfo appends fieldID & fieldType to index
func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, mmapDirPath string) error { func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, mmapDirPath string) error {
cColID := C.int64_t(collectionID) var status C.CStatus
cParID := C.int64_t(partitionID) GetDynamicPool().Submit(func() (any, error) {
cSegID := C.int64_t(segmentID) cColID := C.int64_t(collectionID)
cFieldID := C.int64_t(fieldID) cParID := C.int64_t(partitionID)
cintDType := uint32(fieldType) cSegID := C.int64_t(segmentID)
cMmapDirPath := C.CString(mmapDirPath) cFieldID := C.int64_t(fieldID)
defer C.free(unsafe.Pointer(cMmapDirPath)) cintDType := uint32(fieldType)
status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath) cMmapDirPath := C.CString(mmapDirPath)
defer C.free(unsafe.Pointer(cMmapDirPath))
status = C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendFieldInfo failed") return HandleCStatus(ctx, &status, "AppendFieldInfo failed")
} }
@ -157,23 +180,34 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string
} }
} }
span := trace.SpanFromContext(ctx) var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
span := trace.SpanFromContext(ctx)
traceID := span.SpanContext().TraceID() traceID := span.SpanContext().TraceID()
spanID := span.SpanContext().SpanID() spanID := span.SpanContext().SpanID()
traceCtx := C.CTraceContext{ traceCtx := C.CTraceContext{
traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])),
spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])),
flag: C.uchar(span.SpanContext().TraceFlags()), flag: C.uchar(span.SpanContext().TraceFlags()),
} }
status = C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
return nil, nil
}).Await()
status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo)
return HandleCStatus(ctx, &status, "AppendIndex failed") return HandleCStatus(ctx, &status, "AppendIndex failed")
} }
func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngineVersion int32) error { func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngineVersion int32) error {
cIndexEngineVersion := C.int32_t(indexEngineVersion) cIndexEngineVersion := C.int32_t(indexEngineVersion)
status := C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion) var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
status = C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion)
return nil, nil
}).Await()
return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed") return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed")
} }

View File

@ -902,10 +902,10 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
return errors.New(errMsg) return errors.New(errMsg)
} }
return s.LoadIndexInfo(ctx, indexInfo, loadIndexInfo) return s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo)
} }
func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error {
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()), zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()), zap.Int64("partitionID", s.Partition()),
@ -920,7 +920,7 @@ func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.Fie
} }
var status C.CStatus var status C.CStatus
GetLoadPool().Submit(func() (any, error) { GetDynamicPool().Submit(func() (any, error) {
status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo) status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo)
return nil, nil return nil, nil
}).Await() }).Await()