From d182a51653818375a332f3ab1846b76d0374929d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 25 Jan 2024 19:49:01 +0800 Subject: [PATCH] fix: [Cherry-pick] Use correct pools for all CGO methods in segments pkg (#30275) Cherry-pick from master pr: #30274 See also #30273 This PR: - Rename confusing `LoadIndexInfo` to `UpdateIndexInfo` for LocalSegment - Use `DynamicPool` instead of `LoadPool` for `UpdateSealedSegmentIndex` - Fix cgo call missing pool control Signed-off-by: Congqi Xia --- .../segments/load_field_data_info.go | 47 ++++++--- .../querynodev2/segments/load_index_info.go | 98 +++++++++++++------ internal/querynodev2/segments/segment.go | 6 +- 3 files changed, 102 insertions(+), 49 deletions(-) diff --git a/internal/querynodev2/segments/load_field_data_info.go b/internal/querynodev2/segments/load_field_data_info.go index ce722c7f16..340642efef 100644 --- a/internal/querynodev2/segments/load_field_data_info.go +++ b/internal/querynodev2/segments/load_field_data_info.go @@ -34,9 +34,12 @@ type LoadFieldDataInfo struct { } func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) { + var status C.CStatus var cLoadFieldDataInfo C.CLoadFieldDataInfo - - status := C.NewLoadFieldDataInfo(&cLoadFieldDataInfo) + GetDynamicPool().Submit(func() (any, error) { + status = C.NewLoadFieldDataInfo(&cLoadFieldDataInfo) + return nil, nil + }).Await() if err := HandleCStatus(ctx, &status, "newLoadFieldDataInfo failed"); err != nil { return nil, err } @@ -44,30 +47,46 @@ func newLoadFieldDataInfo(ctx context.Context) (*LoadFieldDataInfo, error) { } func deleteFieldDataInfo(info *LoadFieldDataInfo) { - C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo) + GetDynamicPool().Submit(func() (any, error) { + C.DeleteLoadFieldDataInfo(info.cLoadFieldDataInfo) + return nil, nil + }).Await() } func (ld *LoadFieldDataInfo) appendLoadFieldInfo(ctx context.Context, fieldID int64, rowCount int64) error { - cFieldID := C.int64_t(fieldID) - cRowCount := C.int64_t(rowCount) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cFieldID := C.int64_t(fieldID) + cRowCount := C.int64_t(rowCount) + + status = C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount) + return nil, nil + }).Await() - status := C.AppendLoadFieldInfo(ld.cLoadFieldDataInfo, cFieldID, cRowCount) return HandleCStatus(ctx, &status, "appendLoadFieldInfo failed") } func (ld *LoadFieldDataInfo) appendLoadFieldDataPath(ctx context.Context, fieldID int64, binlog *datapb.Binlog) error { - cFieldID := C.int64_t(fieldID) - cEntriesNum := C.int64_t(binlog.GetEntriesNum()) - cFile := C.CString(binlog.GetLogPath()) - defer C.free(unsafe.Pointer(cFile)) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cFieldID := C.int64_t(fieldID) + cEntriesNum := C.int64_t(binlog.GetEntriesNum()) + cFile := C.CString(binlog.GetLogPath()) + defer C.free(unsafe.Pointer(cFile)) + + status = C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile) + return nil, nil + }).Await() - status := C.AppendLoadFieldDataPath(ld.cLoadFieldDataInfo, cFieldID, cEntriesNum, cFile) return HandleCStatus(ctx, &status, "appendLoadFieldDataPath failed") } func (ld *LoadFieldDataInfo) appendMMapDirPath(dir string) { - cDir := C.CString(dir) - defer C.free(unsafe.Pointer(cDir)) + GetDynamicPool().Submit(func() (any, error) { + cDir := C.CString(dir) + defer C.free(unsafe.Pointer(cDir)) - C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir) + C.AppendMMapDirPath(ld.cLoadFieldDataInfo, cDir) + return nil, nil + }).Await() } diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index 4dda6cc585..5c2aa057b1 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -56,7 +56,10 @@ func newLoadIndexInfo(ctx context.Context) (*LoadIndexInfo, error) { // deleteLoadIndexInfo would delete C.CLoadIndexInfo func deleteLoadIndexInfo(info *LoadIndexInfo) { - C.DeleteLoadIndexInfo(info.cLoadIndexInfo) + GetDynamicPool().Submit(func() (any, error) { + C.DeleteLoadIndexInfo(info.cLoadIndexInfo) + return nil, nil + }).Await() } func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error { @@ -105,46 +108,66 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *que // appendIndexParam append indexParam to index func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error { - cIndexKey := C.CString(indexKey) - defer C.free(unsafe.Pointer(cIndexKey)) - cIndexValue := C.CString(indexValue) - defer C.free(unsafe.Pointer(cIndexValue)) - status := C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cIndexKey := C.CString(indexKey) + defer C.free(unsafe.Pointer(cIndexKey)) + cIndexValue := C.CString(indexValue) + defer C.free(unsafe.Pointer(cIndexValue)) + status = C.AppendIndexParam(li.cLoadIndexInfo, cIndexKey, cIndexValue) + return nil, nil + }).Await() return HandleCStatus(ctx, &status, "AppendIndexParam failed") } func (li *LoadIndexInfo) appendIndexInfo(ctx context.Context, indexID int64, buildID int64, indexVersion int64) error { - cIndexID := C.int64_t(indexID) - cBuildID := C.int64_t(buildID) - cIndexVersion := C.int64_t(indexVersion) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cIndexID := C.int64_t(indexID) + cBuildID := C.int64_t(buildID) + cIndexVersion := C.int64_t(indexVersion) - status := C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion) + status = C.AppendIndexInfo(li.cLoadIndexInfo, cIndexID, cBuildID, cIndexVersion) + return nil, nil + }).Await() return HandleCStatus(ctx, &status, "AppendIndexInfo failed") } func (li *LoadIndexInfo) cleanLocalData(ctx context.Context) error { - status := C.CleanLoadedIndex(li.cLoadIndexInfo) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + status = C.CleanLoadedIndex(li.cLoadIndexInfo) + return nil, nil + }).Await() return HandleCStatus(ctx, &status, "failed to clean cached data on disk") } func (li *LoadIndexInfo) appendIndexFile(ctx context.Context, filePath string) error { - cIndexFilePath := C.CString(filePath) - defer C.free(unsafe.Pointer(cIndexFilePath)) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cIndexFilePath := C.CString(filePath) + defer C.free(unsafe.Pointer(cIndexFilePath)) - status := C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath) + status = C.AppendIndexFilePath(li.cLoadIndexInfo, cIndexFilePath) + return nil, nil + }).Await() return HandleCStatus(ctx, &status, "AppendIndexIFile failed") } // appendFieldInfo appends fieldID & fieldType to index func (li *LoadIndexInfo) appendFieldInfo(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, fieldID int64, fieldType schemapb.DataType, mmapDirPath string) error { - cColID := C.int64_t(collectionID) - cParID := C.int64_t(partitionID) - cSegID := C.int64_t(segmentID) - cFieldID := C.int64_t(fieldID) - cintDType := uint32(fieldType) - cMmapDirPath := C.CString(mmapDirPath) - defer C.free(unsafe.Pointer(cMmapDirPath)) - status := C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath) + var status C.CStatus + GetDynamicPool().Submit(func() (any, error) { + cColID := C.int64_t(collectionID) + cParID := C.int64_t(partitionID) + cSegID := C.int64_t(segmentID) + cFieldID := C.int64_t(fieldID) + cintDType := uint32(fieldType) + cMmapDirPath := C.CString(mmapDirPath) + defer C.free(unsafe.Pointer(cMmapDirPath)) + status = C.AppendFieldInfo(li.cLoadIndexInfo, cColID, cParID, cSegID, cFieldID, cintDType, cMmapDirPath) + return nil, nil + }).Await() return HandleCStatus(ctx, &status, "AppendFieldInfo failed") } @@ -157,23 +180,34 @@ func (li *LoadIndexInfo) appendIndexData(ctx context.Context, indexKeys []string } } - span := trace.SpanFromContext(ctx) + var status C.CStatus + GetLoadPool().Submit(func() (any, error) { + span := trace.SpanFromContext(ctx) - traceID := span.SpanContext().TraceID() - spanID := span.SpanContext().SpanID() - traceCtx := C.CTraceContext{ - traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), - spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), - flag: C.uchar(span.SpanContext().TraceFlags()), - } + traceID := span.SpanContext().TraceID() + spanID := span.SpanContext().SpanID() + traceCtx := C.CTraceContext{ + traceID: (*C.uint8_t)(unsafe.Pointer(&traceID[0])), + spanID: (*C.uint8_t)(unsafe.Pointer(&spanID[0])), + flag: C.uchar(span.SpanContext().TraceFlags()), + } + + status = C.AppendIndexV2(traceCtx, li.cLoadIndexInfo) + return nil, nil + }).Await() - status := C.AppendIndexV2(traceCtx, li.cLoadIndexInfo) return HandleCStatus(ctx, &status, "AppendIndex failed") } func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngineVersion int32) error { cIndexEngineVersion := C.int32_t(indexEngineVersion) - status := C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion) + var status C.CStatus + + GetDynamicPool().Submit(func() (any, error) { + status = C.AppendIndexEngineVersionToLoadInfo(li.cLoadIndexInfo, cIndexEngineVersion) + return nil, nil + }).Await() + return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed") } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index e947611480..86637af153 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -902,10 +902,10 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn return errors.New(errMsg) } - return s.LoadIndexInfo(ctx, indexInfo, loadIndexInfo) + return s.UpdateIndexInfo(ctx, indexInfo, loadIndexInfo) } -func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { +func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("partitionID", s.Partition()), @@ -920,7 +920,7 @@ func (s *LocalSegment) LoadIndexInfo(ctx context.Context, indexInfo *querypb.Fie } var status C.CStatus - GetLoadPool().Submit(func() (any, error) { + GetDynamicPool().Submit(func() (any, error) { status = C.UpdateSealedSegmentIndex(s.ptr, info.cLoadIndexInfo) return nil, nil }).Await()