mirror of https://github.com/milvus-io/milvus.git
Fix compaction target segment rowNum is always 0 (#20937)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/20914/head
parent
e5c9da7152
commit
f745d7f489
|
@ -285,7 +285,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
|
|||
kvs[key] = value
|
||||
inpaths[fID] = &datapb.FieldBinlog{
|
||||
FieldID: fID,
|
||||
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
|
||||
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -302,7 +302,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta
|
|||
kvs[key] = value
|
||||
statspaths[fID] = &datapb.FieldBinlog{
|
||||
FieldID: fID,
|
||||
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key}},
|
||||
Binlogs: []*datapb.Binlog{{LogSize: int64(fileLen), LogPath: key, EntriesNum: blob.RowNum}},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -64,9 +64,10 @@ const InvalidUniqueID = UniqueID(-1)
|
|||
|
||||
// Blob is a pack of key&value
|
||||
type Blob struct {
|
||||
Key string
|
||||
Value []byte
|
||||
Size int64
|
||||
Key string
|
||||
Value []byte
|
||||
Size int64
|
||||
RowNum int64
|
||||
}
|
||||
|
||||
// BlobList implements sort.Interface for a list of Blob
|
||||
|
@ -277,6 +278,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
|||
if timeFieldData.RowNum() <= 0 {
|
||||
return nil, nil, fmt.Errorf("there's no data in InsertData")
|
||||
}
|
||||
rowNum := int64(timeFieldData.RowNum())
|
||||
|
||||
ts := timeFieldData.(*Int64FieldData).Data
|
||||
startTs := ts[0]
|
||||
|
@ -420,8 +422,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
|||
}
|
||||
blobKey := fmt.Sprintf("%d", field.FieldID)
|
||||
blobs = append(blobs, &Blob{
|
||||
Key: blobKey,
|
||||
Value: buffer,
|
||||
Key: blobKey,
|
||||
Value: buffer,
|
||||
RowNum: rowNum,
|
||||
})
|
||||
eventWriter.Close()
|
||||
writer.Close()
|
||||
|
@ -435,8 +438,9 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
|||
}
|
||||
statsBuffer := statsWriter.GetBuffer()
|
||||
statsBlobs = append(statsBlobs, &Blob{
|
||||
Key: blobKey,
|
||||
Value: statsBuffer,
|
||||
Key: blobKey,
|
||||
Value: statsBuffer,
|
||||
RowNum: rowNum,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,19 +118,19 @@ func TestIndexCodec(t *testing.T) {
|
|||
indexCodec := NewIndexCodec()
|
||||
blobs := []*Blob{
|
||||
{
|
||||
"12345",
|
||||
[]byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7},
|
||||
14,
|
||||
Key: "12345",
|
||||
Value: []byte{1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7},
|
||||
Size: 14,
|
||||
},
|
||||
{
|
||||
"6666",
|
||||
[]byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7},
|
||||
12,
|
||||
Key: "6666",
|
||||
Value: []byte{6, 6, 6, 6, 6, 1, 2, 3, 4, 5, 6, 7},
|
||||
Size: 12,
|
||||
},
|
||||
{
|
||||
"8885",
|
||||
[]byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7},
|
||||
14,
|
||||
Key: "8885",
|
||||
Value: []byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7},
|
||||
Size: 14,
|
||||
},
|
||||
}
|
||||
indexParams := map[string]string{
|
||||
|
|
Loading…
Reference in New Issue