mirror of https://github.com/milvus-io/milvus.git
Change delete data primary key to int64 (#10438)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/10456/head
parent
beb26a1c19
commit
ffc0c07610
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -73,7 +72,7 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
|
|||
func newDelDataBuf() *DelDataBuf {
|
||||
return &DelDataBuf{
|
||||
delData: &DeleteData{
|
||||
Data: make(map[string]int64),
|
||||
Data: make(map[int64]int64),
|
||||
},
|
||||
size: 0,
|
||||
tsFrom: math.MaxUint64,
|
||||
|
@ -120,7 +119,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
|
|||
delData := delDataBuf.(*DelDataBuf).delData
|
||||
|
||||
for i := 0; i < rows; i++ {
|
||||
delData.Data[strconv.FormatInt(pks[i], 10)] = tss[i]
|
||||
delData.Data[pks[i]] = tss[i]
|
||||
log.Debug("delete", zap.Int64("primary key", pks[i]), zap.Int64("ts", tss[i]))
|
||||
}
|
||||
|
||||
|
@ -141,7 +140,7 @@ func (dn *deleteNode) showDelBuf() {
|
|||
delDataBuf, _ := v.(*DelDataBuf)
|
||||
log.Debug("del data buffer status", zap.Int64("segID", segID), zap.Int64("size", delDataBuf.size))
|
||||
for pk, ts := range delDataBuf.delData.Data {
|
||||
log.Debug("del data", zap.String("pk", pk), zap.Int64("ts", ts))
|
||||
log.Debug("del data", zap.Int64("pk", pk), zap.Int64("ts", ts))
|
||||
}
|
||||
} else {
|
||||
log.Error("segment not exist", zap.Int64("segID", segID))
|
||||
|
|
|
@ -402,15 +402,14 @@ func (loader *segmentLoader) loadDeltaLogs(segment *Segment, deltaLogs []*datapb
|
|||
return err
|
||||
}
|
||||
|
||||
// TODO yukun: implements segment.Delete
|
||||
//rowCount := len(deltaData.Data)
|
||||
pks := make([]string, 0)
|
||||
pks := make([]int64, 0)
|
||||
tss := make([]int64, 0)
|
||||
for pk, ts := range deltaData.Data {
|
||||
pks = append(pks, pk)
|
||||
tss = append(tss, ts)
|
||||
}
|
||||
// segment.Delete(pks, tss, rowCount)
|
||||
//segment.Delete(pks, tss, rowCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -600,7 +600,7 @@ func (insertCodec *InsertCodec) Close() error {
|
|||
// DeleteData saves each entity delete message represented as <primarykey,timestamp> map.
|
||||
// timestamp represents the time when this instance was deleted
|
||||
type DeleteData struct {
|
||||
Data map[string]int64 // primary key to timestamp
|
||||
Data map[int64]int64 // primary key to timestamp
|
||||
}
|
||||
|
||||
// DeleteCodec serializes and deserializes the delete data
|
||||
|
@ -631,11 +631,11 @@ func (deleteCodec *DeleteCodec) Serialize(partitionID UniqueID, segmentID Unique
|
|||
if value > int64(endTs) {
|
||||
endTs = int(value)
|
||||
}
|
||||
err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%s,%d", key, value))
|
||||
err := eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", key, value))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sizeTotal += len(key)
|
||||
sizeTotal += binary.Size(key)
|
||||
sizeTotal += binary.Size(value)
|
||||
}
|
||||
eventWriter.SetEventTimestamp(uint64(startTs), uint64(endTs))
|
||||
|
@ -672,7 +672,7 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
}
|
||||
|
||||
var pid, sid UniqueID
|
||||
result := &DeleteData{Data: make(map[string]int64)}
|
||||
result := &DeleteData{Data: make(map[int64]int64)}
|
||||
for _, blob := range blobs {
|
||||
binlogReader, err := NewBinlogReader(blob.Value)
|
||||
if err != nil {
|
||||
|
@ -701,12 +701,17 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
|
|||
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect")
|
||||
}
|
||||
|
||||
pk, err := strconv.ParseInt(splits[0], 10, 64)
|
||||
if err != nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, err
|
||||
}
|
||||
|
||||
ts, err := strconv.ParseInt(splits[1], 10, 64)
|
||||
if err != nil {
|
||||
return InvalidUniqueID, InvalidUniqueID, nil, err
|
||||
}
|
||||
|
||||
result.Data[splits[0]] = ts
|
||||
result.Data[pk] = ts
|
||||
}
|
||||
|
||||
deleteCodec.readerCloseFunc = append(deleteCodec.readerCloseFunc, readerClose(binlogReader))
|
||||
|
|
|
@ -316,7 +316,7 @@ func TestDeleteCodec(t *testing.T) {
|
|||
}
|
||||
deleteCodec := NewDeleteCodec(schema)
|
||||
deleteData := &DeleteData{
|
||||
Data: map[string]int64{"1": 43757345, "2": 23578294723},
|
||||
Data: map[int64]int64{1: 43757345, 2: 23578294723},
|
||||
}
|
||||
blob, err := deleteCodec.Serialize(1, 1, deleteData)
|
||||
assert.Nil(t, err)
|
||||
|
|
Loading…
Reference in New Issue