mirror of https://github.com/milvus-io/milvus.git
fix: Fix stats task wrong RootPath when upload binlog (#38539)
issue: #38336 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/38544/head
parent
1aa31e2a9b
commit
7a05b5bbea
|
@ -39,7 +39,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/workerpb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
_ "github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
|
@ -201,7 +200,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
|
|||
|
||||
if (i+1)%statsBatchSize == 0 && writer.IsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) {
|
||||
serWriteStart := time.Now()
|
||||
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer)
|
||||
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -224,7 +223,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
|
|||
|
||||
if !writer.FlushAndIsEmpty() {
|
||||
serWriteStart := time.Now()
|
||||
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer)
|
||||
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.req.GetStartLogID()+st.logIDOffset, writer)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("stats wrong, failed to serialize writer", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -244,7 +243,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
|
|||
}
|
||||
|
||||
serWriteStart := time.Now()
|
||||
binlogNums, sPath, err := statSerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
|
||||
binlogNums, sPath, err := statSerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("stats wrong, failed to serialize write segment stats", zap.Int64("taskID", st.req.GetTaskID()),
|
||||
zap.Int64("remaining row count", numRows), zap.Error(err))
|
||||
|
@ -256,7 +255,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
|
|||
|
||||
var bm25StatsLogs []*datapb.FieldBinlog
|
||||
if len(bm25FieldIds) > 0 {
|
||||
binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
|
||||
binlogNums, bm25StatsLogs, err = bm25SerializeWrite(ctx, st.req.GetStorageConfig().GetRootPath(), st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Warn("compact wrong, failed to serialize write segment bm25 stats", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -510,7 +509,7 @@ func mergeFieldBinlogs(base, paths map[typeutil.UniqueID]*datapb.FieldBinlog) {
|
|||
}
|
||||
}
|
||||
|
||||
func serializeWrite(ctx context.Context, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
|
||||
func serializeWrite(ctx context.Context, rootPath string, startID int64, writer *compaction.SegmentWriter) (binlogNum int64, kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
|
||||
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "serializeWrite")
|
||||
defer span.End()
|
||||
|
||||
|
@ -525,7 +524,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme
|
|||
for i := range blobs {
|
||||
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
|
||||
fID, _ := strconv.ParseInt(blobs[i].GetKey(), 10, 64)
|
||||
key, _ := binlog.BuildLogPath(storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i))
|
||||
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.InsertBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fID, startID+int64(i))
|
||||
|
||||
kvs[key] = blobs[i].GetValue()
|
||||
fieldBinlogs[fID] = &datapb.FieldBinlog{
|
||||
|
@ -546,7 +545,7 @@ func serializeWrite(ctx context.Context, startID int64, writer *compaction.Segme
|
|||
return
|
||||
}
|
||||
|
||||
func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) {
|
||||
func statSerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, *datapb.FieldBinlog, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite")
|
||||
defer span.End()
|
||||
sblob, err := writer.Finish()
|
||||
|
@ -555,7 +554,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
|
|||
}
|
||||
|
||||
binlogNum := int64(1)
|
||||
key, _ := binlog.BuildLogPath(storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID)
|
||||
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), startID)
|
||||
kvs := map[string][]byte{key: sblob.GetValue()}
|
||||
statFieldLog := &datapb.FieldBinlog{
|
||||
FieldID: writer.GetPkID(),
|
||||
|
@ -576,7 +575,7 @@ func statSerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
|
|||
return binlogNum, statFieldLog, nil
|
||||
}
|
||||
|
||||
func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) {
|
||||
func bm25SerializeWrite(ctx context.Context, rootPath string, io io.BinlogIO, startID int64, writer *compaction.SegmentWriter, finalRowCount int64) (int64, []*datapb.FieldBinlog, error) {
|
||||
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "bm25log serializeWrite")
|
||||
defer span.End()
|
||||
stats, err := writer.GetBm25StatsBlob()
|
||||
|
@ -588,7 +587,7 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
|
|||
binlogs := []*datapb.FieldBinlog{}
|
||||
cnt := int64(0)
|
||||
for fieldID, blob := range stats {
|
||||
key, _ := binlog.BuildLogPath(storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt)
|
||||
key, _ := binlog.BuildLogPathWithRootPath(rootPath, storage.BM25Binlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), fieldID, startID+cnt)
|
||||
kvs[key] = blob.GetValue()
|
||||
fieldLog := &datapb.FieldBinlog{
|
||||
FieldID: fieldID,
|
||||
|
@ -614,10 +613,6 @@ func bm25SerializeWrite(ctx context.Context, io io.BinlogIO, startID int64, writ
|
|||
return cnt, binlogs, nil
|
||||
}
|
||||
|
||||
func buildTextLogPrefix(rootPath string, collID, partID, segID, fieldID, version int64) string {
|
||||
return fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", rootPath, common.TextIndexPath, collID, partID, segID, fieldID, version)
|
||||
}
|
||||
|
||||
func ParseStorageConfig(s *indexpb.StorageConfig) (*indexcgopb.StorageConfig, error) {
|
||||
bs, err := proto.Marshal(s)
|
||||
if err != nil {
|
||||
|
|
|
@ -86,7 +86,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() {
|
|||
s.schema = genCollectionSchemaWithBM25()
|
||||
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
s.GenSegmentWriterWithBM25(0)
|
||||
cnt, binlogs, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1)
|
||||
cnt, binlogs, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1)
|
||||
s.Require().NoError(err)
|
||||
s.Equal(int64(1), cnt)
|
||||
s.Equal(1, len(binlogs))
|
||||
|
@ -96,7 +96,7 @@ func (s *TaskStatsSuite) Testbm25SerializeWriteError() {
|
|||
s.schema = genCollectionSchemaWithBM25()
|
||||
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once()
|
||||
s.GenSegmentWriterWithBM25(0)
|
||||
_, _, err := bm25SerializeWrite(context.Background(), s.mockBinlogIO, 0, s.segWriter, 1)
|
||||
_, _, err := bm25SerializeWrite(context.Background(), "root_path", s.mockBinlogIO, 0, s.segWriter, 1)
|
||||
s.Error(err)
|
||||
})
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
|
|||
s.Run("normal case", func() {
|
||||
s.schema = genCollectionSchemaWithBM25()
|
||||
s.GenSegmentWriterWithBM25(0)
|
||||
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter)
|
||||
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter)
|
||||
s.NoError(err)
|
||||
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
|
||||
result := make([][]byte, len(paths))
|
||||
|
@ -149,7 +149,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
|
|||
s.Run("upload bm25 binlog failed", func() {
|
||||
s.schema = genCollectionSchemaWithBM25()
|
||||
s.GenSegmentWriterWithBM25(0)
|
||||
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), 0, s.segWriter)
|
||||
_, kvs, fBinlogs, err := serializeWrite(context.TODO(), "root_path", 0, s.segWriter)
|
||||
s.NoError(err)
|
||||
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, paths []string) ([][]byte, error) {
|
||||
result := make([][]byte, len(paths))
|
||||
|
|
Loading…
Reference in New Issue