Change deserialize deltelog from 1 blob to blobs (#10085)

See also: #9530

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/10114/head
XuanYang-cn 2021-10-19 10:28:35 +08:00 committed by GitHub
parent a7f7bff651
commit 2255fe0b45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 30 deletions

View File

@ -638,44 +638,58 @@ func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID Unique
}
func (deleteCodec *DeleteCodec) Deserialize(blob *Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if blob == nil {
// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}
readerClose := func(reader *BinlogReader) func() error {
return func() error { return reader.Close() }
}
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
pid, sid := binlogReader.PartitionID, binlogReader.SegmentID
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
result := &DeleteData{
Data: make(map[string]int64),
}
length, err := eventReader.GetPayloadLengthFromReader()
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
var pid, sid UniqueID
result := &DeleteData{Data: make(map[string]int64)}
for _, blob := range blobs {
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
splits := strings.Split(singleString, ",")
if len(splits) != 2 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect")
}
ts, err := strconv.ParseInt(splits[1], 10, 64)
if err != nil {
return -1, -1, nil, err
}
result.Data[splits[0]] = ts
}
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
return pid, sid, result, nil
pid, sid = binlogReader.PartitionID, binlogReader.SegmentID
eventReader, err := binlogReader.NextEventReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
splits := strings.Split(singleString, ",")
if len(splits) != 2 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect")
}
ts, err := strconv.ParseInt(splits[1], 10, 64)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
result.Data[splits[0]] = ts
}
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
}
return pid, sid, result, nil
}
// Blob key example:

View File

@ -321,7 +321,7 @@ func TestDeleteCodec(t *testing.T) {
blob, err := deleteCodec.Serialize(1, 1, deleteData)
assert.Nil(t, err)
pid, sid, data, err := deleteCodec.Deserialize(blob)
pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob})
assert.Nil(t, err)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))