mirror of https://github.com/milvus-io/milvus.git
Refine merge operation during compacting phase (#18399)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com>pull/18439/head
parent
a287a2b3fd
commit
c9174d55ba
|
@ -253,7 +253,12 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
continue
|
||||
}
|
||||
|
||||
log.Info("time cost of generating global compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()),
|
||||
segIDs := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs))
|
||||
for _, seg := range plan.SegmentBinlogs {
|
||||
segIDs[seg.SegmentID] = seg.Deltalogs
|
||||
}
|
||||
|
||||
log.Info("time cost of generating global compaction", zap.Any("segID2DeltaLogs", segIDs), zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()),
|
||||
zap.Int64("collectionID", signal.collectionID), zap.String("channel", group.channelName), zap.Int64("partitionID", group.partitionID))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,12 +133,12 @@ func (t *compactionTask) getChannelName() string {
|
|||
return t.plan.GetChannel()
|
||||
}
|
||||
|
||||
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[primaryKey]Timestamp, *DelDataBuf, error) {
|
||||
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) {
|
||||
mergeStart := time.Now()
|
||||
dCodec := storage.NewDeleteCodec()
|
||||
|
||||
var (
|
||||
pk2ts = make(map[primaryKey]Timestamp)
|
||||
pk2ts = make(map[interface{}]Timestamp)
|
||||
dbuff = &DelDataBuf{
|
||||
delData: &DeleteData{
|
||||
Pks: make([]primaryKey, 0),
|
||||
|
@ -162,7 +162,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
|
|||
ts := dData.Tss[i]
|
||||
|
||||
if timetravelTs != Timestamp(0) && dData.Tss[i] <= timetravelTs {
|
||||
pk2ts[pk] = ts
|
||||
pk2ts[pk.GetValue()] = ts
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ func nano2Milli(nano time.Duration) float64 {
|
|||
return float64(nano) / float64(time.Millisecond)
|
||||
}
|
||||
|
||||
func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) {
|
||||
func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) {
|
||||
mergeStart := time.Now()
|
||||
|
||||
var (
|
||||
|
@ -207,12 +207,10 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp
|
|||
)
|
||||
|
||||
isDeletedValue := func(v *storage.Value) bool {
|
||||
for pk, ts := range delta {
|
||||
if pk.EQ(v.PK) && uint64(v.Timestamp) <= ts {
|
||||
return true
|
||||
}
|
||||
ts, ok := delta[v.PK.GetValue()]
|
||||
if ok && uint64(v.Timestamp) <= ts {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -259,9 +259,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
|
||||
mitr := storage.NewMergeIterator([]iterator{iitr})
|
||||
|
||||
pk := newInt64PrimaryKey(1)
|
||||
dm := map[primaryKey]Timestamp{
|
||||
pk: 10000,
|
||||
dm := map[interface{}]Timestamp{
|
||||
1: 10000,
|
||||
}
|
||||
|
||||
ct := &compactionTask{}
|
||||
|
@ -289,7 +288,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
|
||||
mitr := storage.NewMergeIterator([]iterator{iitr})
|
||||
|
||||
dm := map[primaryKey]Timestamp{}
|
||||
dm := map[interface{}]Timestamp{}
|
||||
|
||||
ct := &compactionTask{}
|
||||
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
|
||||
|
@ -311,10 +310,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
mitr := storage.NewMergeIterator([]iterator{iitr})
|
||||
|
||||
pk := newInt64PrimaryKey(1)
|
||||
dm := map[primaryKey]Timestamp{
|
||||
pk: 10000,
|
||||
dm := map[interface{}]Timestamp{
|
||||
1: 10000,
|
||||
}
|
||||
|
||||
ct := &compactionTask{}
|
||||
|
|
|
@ -34,6 +34,7 @@ type PrimaryKey interface {
|
|||
MarshalJSON() ([]byte, error)
|
||||
UnmarshalJSON(data []byte) error
|
||||
SetValue(interface{}) error
|
||||
GetValue() interface{}
|
||||
Type() schemapb.DataType
|
||||
}
|
||||
|
||||
|
@ -147,6 +148,10 @@ func (ip *Int64PrimaryKey) Type() schemapb.DataType {
|
|||
return schemapb.DataType_Int64
|
||||
}
|
||||
|
||||
func (ip *Int64PrimaryKey) GetValue() interface{} {
|
||||
return ip.Value
|
||||
}
|
||||
|
||||
type BaseStringPrimaryKey struct {
|
||||
Value string
|
||||
}
|
||||
|
@ -199,6 +204,10 @@ func (sp *BaseStringPrimaryKey) SetValue(data interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (sp *BaseStringPrimaryKey) GetValue() interface{} {
|
||||
return sp.Value
|
||||
}
|
||||
|
||||
type VarCharPrimaryKey struct {
|
||||
BaseStringPrimaryKey
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue