mirror of https://github.com/milvus-io/milvus.git
Add tests where segments load delta logs (#16825)
issue: #16821 /kind improvements Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/16875/head
parent
2d0f908dba
commit
216c45fbd6
|
@ -18,6 +18,7 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -1030,6 +1031,51 @@ func saveBinLog(ctx context.Context,
|
|||
return fieldBinlog, err
|
||||
}
|
||||
|
||||
// saveDeltaLog saves delta logs into MinIO for testing purpose.
|
||||
func saveDeltaLog(collectionID UniqueID,
|
||||
partitionID UniqueID,
|
||||
segmentID UniqueID) ([]*datapb.FieldBinlog, error) {
|
||||
|
||||
binlogWriter := storage.NewDeleteBinlogWriter(schemapb.DataType_String, collectionID, partitionID, segmentID)
|
||||
eventWriter, _ := binlogWriter.NextDeleteEventWriter()
|
||||
dData := &storage.DeleteData{
|
||||
Pks: []storage.PrimaryKey{&storage.Int64PrimaryKey{Value: 1}, &storage.Int64PrimaryKey{Value: 2}},
|
||||
Tss: []Timestamp{100, 200},
|
||||
RowCount: 2,
|
||||
}
|
||||
|
||||
sizeTotal := 0
|
||||
for i := int64(0); i < dData.RowCount; i++ {
|
||||
int64PkValue := dData.Pks[i].(*storage.Int64PrimaryKey).Value
|
||||
ts := dData.Tss[i]
|
||||
eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
|
||||
sizeTotal += binary.Size(int64PkValue)
|
||||
sizeTotal += binary.Size(ts)
|
||||
}
|
||||
eventWriter.SetEventTimestamp(100, 200)
|
||||
binlogWriter.SetEventTimeStamp(100, 200)
|
||||
binlogWriter.AddExtra("original_size", fmt.Sprintf("%v", sizeTotal))
|
||||
|
||||
binlogWriter.Finish()
|
||||
buffer, _ := binlogWriter.GetBuffer()
|
||||
blob := &storage.Blob{Key: "deltaLogPath1", Value: buffer}
|
||||
|
||||
kvs := make(map[string][]byte, 1)
|
||||
|
||||
// write insert binlog
|
||||
fieldBinlog := make([]*datapb.FieldBinlog, 0)
|
||||
log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", 999))
|
||||
key := JoinIDPath(collectionID, partitionID, segmentID, 999)
|
||||
kvs[key] = blob.Value[:]
|
||||
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
|
||||
FieldID: 999,
|
||||
Binlogs: []*datapb.Binlog{{LogPath: key}},
|
||||
})
|
||||
log.Debug("[query node unittest] save delta log file to MinIO/S3")
|
||||
|
||||
return fieldBinlog, storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage)).MultiWrite(kvs)
|
||||
}
|
||||
|
||||
func genSimpleTimestampFieldData(numRows int) []Timestamp {
|
||||
times := make([]Timestamp, numRows)
|
||||
for i := 0; i < numRows; i++ {
|
||||
|
|
|
@ -575,13 +575,13 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
|
|||
dCodec := storage.DeleteCodec{}
|
||||
var blobs []*storage.Blob
|
||||
for _, deltaLog := range deltaLogs {
|
||||
for _, log := range deltaLog.GetBinlogs() {
|
||||
value, err := loader.cm.Read(log.GetLogPath())
|
||||
for _, bLog := range deltaLog.GetBinlogs() {
|
||||
value, err := loader.cm.Read(bLog.GetLogPath())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blob := &storage.Blob{
|
||||
Key: log.GetLogPath(),
|
||||
Key: bLog.GetLogPath(),
|
||||
Value: value,
|
||||
}
|
||||
blobs = append(blobs, blob)
|
||||
|
|
|
@ -427,6 +427,9 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
|
|||
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
deltaLogs, err := saveDeltaLog(defaultCollectionID, defaultPartitionID, defaultSegmentID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
t.Run("test load growing and sealed segments", func(t *testing.T) {
|
||||
node, err := genSimpleQueryNode(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
@ -448,6 +451,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
|
|||
PartitionID: defaultPartitionID,
|
||||
CollectionID: defaultCollectionID,
|
||||
BinlogPaths: fieldBinlog,
|
||||
Deltalogs: deltaLogs,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -483,6 +487,30 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, segment1.getRowCount(), segment2.getRowCount())
|
||||
|
||||
// Loading growing segments with delta log, expect to fail (this is a bug).
|
||||
// See: https://github.com/milvus-io/milvus/issues/16821
|
||||
segmentID3 := UniqueID(102)
|
||||
req3 := &querypb.LoadSegmentsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_WatchQueryChannels,
|
||||
MsgID: rand.Int63(),
|
||||
},
|
||||
DstNodeID: 0,
|
||||
Schema: schema,
|
||||
Infos: []*querypb.SegmentLoadInfo{
|
||||
{
|
||||
SegmentID: segmentID3,
|
||||
PartitionID: defaultPartitionID,
|
||||
CollectionID: defaultCollectionID,
|
||||
BinlogPaths: fieldBinlog,
|
||||
Deltalogs: deltaLogs,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = loader.loadSegment(req3, segmentTypeGrowing)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue