mirror of https://github.com/milvus-io/milvus.git
parent
308977e844
commit
af173dd2a0
|
@ -263,7 +263,7 @@ func TestInsertBinlog(t *testing.T) {
|
|||
|
||||
/* #nosec G103 */
|
||||
func TestDeleteBinlog(t *testing.T) {
|
||||
w := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50)
|
||||
w := NewDeleteBinlogWriter(schemapb.DataType_Int64, 50, 1, 1)
|
||||
|
||||
e1, err := w.NextDeleteEventWriter()
|
||||
assert.Nil(t, err)
|
||||
|
@ -332,12 +332,12 @@ func TestDeleteBinlog(t *testing.T) {
|
|||
|
||||
//descriptor data fix, partition id
|
||||
partID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, partID, int64(-1))
|
||||
assert.Equal(t, partID, int64(1))
|
||||
pos += int(unsafe.Sizeof(partID))
|
||||
|
||||
//descriptor data fix, segment id
|
||||
segID := UnsafeReadInt64(buf, pos)
|
||||
assert.Equal(t, segID, int64(-1))
|
||||
assert.Equal(t, segID, int64(1))
|
||||
pos += int(unsafe.Sizeof(segID))
|
||||
|
||||
//descriptor data fix, field id
|
||||
|
@ -1075,7 +1075,7 @@ func TestInsertBinlogWriterCloseError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDeleteBinlogWriteCloseError(t *testing.T) {
|
||||
deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10)
|
||||
deleteWriter := NewDeleteBinlogWriter(schemapb.DataType_Int64, 10, 1, 1)
|
||||
e1, err := deleteWriter.NextDeleteEventWriter()
|
||||
assert.Nil(t, err)
|
||||
err = e1.AddDataToPayload([]int64{1, 2, 3})
|
||||
|
|
|
@ -239,10 +239,12 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
|
|||
}
|
||||
|
||||
// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID int64) *DeleteBinlogWriter {
|
||||
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
|
||||
descriptorEvent := newDescriptorEvent()
|
||||
descriptorEvent.PayloadDataType = dataType
|
||||
descriptorEvent.CollectionID = collectionID
|
||||
descriptorEvent.PartitionID = partitionID
|
||||
descriptorEvent.SegmentID = segmentID
|
||||
return &DeleteBinlogWriter{
|
||||
baseBinlogWriter: baseBinlogWriter{
|
||||
descriptorEvent: *descriptorEvent,
|
||||
|
|
|
@ -14,6 +14,7 @@ package storage
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -487,6 +488,99 @@ func (insertCodec *InsertCodec) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
|
||||
// timestamp represents the time when this instance was deleted
|
||||
type DeleteData struct {
|
||||
Data map[string]int64 // primary key to timestamp
|
||||
}
|
||||
|
||||
type DeleteCodec struct {
|
||||
Schema *etcdpb.CollectionMeta
|
||||
readerCloseFunc []func() error
|
||||
}
|
||||
|
||||
func NewDeleteCodec(schema *etcdpb.CollectionMeta) *DeleteCodec {
|
||||
return &DeleteCodec{Schema: schema}
|
||||
}
|
||||
|
||||
// Serialize transfer delete data to blob. .
|
||||
// For each delete message, it will save "pk,ts" string to binlog.
|
||||
func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *DeleteData) (*Blob, error) {
|
||||
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, deleteCodec.Schema.ID, partitionID, segmentID)
|
||||
eventWriter, err := binlogWriter.NextDeleteEventWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
startTs, endTs := math.MaxInt64, math.MinInt64
|
||||
for key, value := range data.Data {
|
||||
if value < int64(startTs) {
|
||||
startTs = int(value)
|
||||
}
|
||||
if value > int64(endTs) {
|
||||
endTs = int(value)
|
||||
}
|
||||
err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%s,%d", key, value))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs))
|
||||
binlogWriter.SetEventTimeStamp(uint64(startTs), uint64(endTs))
|
||||
err = binlogWriter.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buffer, err := binlogWriter.GetBuffer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blob := &Blob{
|
||||
Value: buffer,
|
||||
}
|
||||
return blob, nil
|
||||
|
||||
}
|
||||
|
||||
func (deleteCodec *DeleteCodec) Deserialize(blob *Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
|
||||
if blob == nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
|
||||
}
|
||||
readerClose := func(reader *BinlogReader) func() error {
|
||||
return func() error { return reader.Close() }
|
||||
}
|
||||
binlogReader, err := NewBinlogReader(blob.Value)
|
||||
if err != nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, err
|
||||
}
|
||||
pid, sid := binlogReader.PartitionID, binlogReader.SegmentID
|
||||
eventReader, err := binlogReader.NextEventReader()
|
||||
if err != nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, err
|
||||
}
|
||||
result := &DeleteData{
|
||||
Data: make(map[string]int64),
|
||||
}
|
||||
length, err := eventReader.GetPayloadLengthFromReader()
|
||||
for i := 0; i < length; i++ {
|
||||
singleString, err := eventReader.GetOneStringFromPayload(i)
|
||||
if err != nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, err
|
||||
}
|
||||
splits := strings.Split(singleString, ",")
|
||||
if len(splits) != 2 {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect")
|
||||
}
|
||||
ts, err := strconv.ParseInt(splits[1], 10, 64)
|
||||
if err != nil {
|
||||
return -1, -1, nil, err
|
||||
}
|
||||
result.Data[splits[0]] = ts
|
||||
}
|
||||
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
|
||||
return pid, sid, result, nil
|
||||
|
||||
}
|
||||
|
||||
// Blob key example:
|
||||
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
|
||||
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
|
||||
|
|
|
@ -297,6 +297,25 @@ func TestInsertCodec(t *testing.T) {
|
|||
_, _, _, err = insertCodec.Deserialize(blobs)
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestDeleteCodec(t *testing.T) {
|
||||
schema := &etcdpb.CollectionMeta{
|
||||
ID: CollectionID,
|
||||
}
|
||||
deleteCodec := NewDeleteCodec(schema)
|
||||
deleteData := &DeleteData{
|
||||
Data: map[string]int64{"1": 43757345, "2": 23578294723},
|
||||
}
|
||||
blob, err := deleteCodec.Serialize(1, 1, deleteData)
|
||||
assert.Nil(t, err)
|
||||
|
||||
pid, sid, data, err := deleteCodec.Deserialize(blob)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, pid, int64(1))
|
||||
assert.Equal(t, sid, int64(1))
|
||||
assert.Equal(t, data, deleteData)
|
||||
}
|
||||
|
||||
func TestDDCodec(t *testing.T) {
|
||||
dataDefinitionCodec := NewDataDefinitionCodec(int64(1))
|
||||
ts := []Timestamp{1, 2, 3, 4}
|
||||
|
|
Loading…
Reference in New Issue