mirror of https://github.com/milvus-io/milvus.git
enhance: Use map PK to timestamp in buffer insert (#33566)
Related to #27675 Store pk to minimal timestamp in `inData` instead of bloom filter to check whether some delete entry hit current insert batch Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/32975/head
parent
d25c755480
commit
7f4698f4a7
|
@ -19,7 +19,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
|
@ -83,6 +82,8 @@ type writeBufferBase struct {
|
|||
|
||||
metaWriter syncmgr.MetaWriter
|
||||
collSchema *schemapb.CollectionSchema
|
||||
helper *typeutil.SchemaHelper
|
||||
pkField *schemapb.FieldSchema
|
||||
estSizePerRecord int
|
||||
metaCache metacache.MetaCache
|
||||
syncMgr syncmgr.SyncManager
|
||||
|
@ -130,11 +131,21 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
helper, err := typeutil.CreateSchemaHelper(schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pkField, err := helper.GetPrimaryKeyField()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wb := &writeBufferBase{
|
||||
channelName: channel,
|
||||
collectionID: metacache.Collection(),
|
||||
collSchema: schema,
|
||||
helper: helper,
|
||||
pkField: pkField,
|
||||
estSizePerRecord: estSize,
|
||||
syncMgr: syncMgr,
|
||||
metaWriter: option.metaWriter,
|
||||
|
@ -378,49 +389,21 @@ type inData struct {
|
|||
tsField []*storage.Int64FieldData
|
||||
rowNum int64
|
||||
|
||||
batchBF *storage.PkStatistics
|
||||
}
|
||||
|
||||
func (id *inData) generatePkStats() {
|
||||
id.batchBF = &storage.PkStatistics{
|
||||
PkFilter: bloomfilter.NewBloomFilterWithType(
|
||||
uint(id.rowNum),
|
||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||
paramtable.Get().CommonCfg.BloomFilterType.GetValue()),
|
||||
}
|
||||
|
||||
for _, ids := range id.pkField {
|
||||
id.batchBF.UpdatePKRange(ids)
|
||||
}
|
||||
intPKTs map[int64]int64
|
||||
strPKTs map[string]int64
|
||||
}
|
||||
|
||||
func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
|
||||
if !id.batchBF.PkExist(pk) {
|
||||
return false
|
||||
var ok bool
|
||||
var minTs int64
|
||||
switch pk.Type() {
|
||||
case schemapb.DataType_Int64:
|
||||
minTs, ok = id.intPKTs[pk.GetValue().(int64)]
|
||||
case schemapb.DataType_VarChar:
|
||||
minTs, ok = id.strPKTs[pk.GetValue().(string)]
|
||||
}
|
||||
|
||||
for batchIdx, timestamps := range id.tsField {
|
||||
ids := id.pkField[batchIdx]
|
||||
var primaryKey storage.PrimaryKey
|
||||
switch pk.Type() {
|
||||
case schemapb.DataType_Int64:
|
||||
primaryKey = storage.NewInt64PrimaryKey(0)
|
||||
case schemapb.DataType_VarChar:
|
||||
primaryKey = storage.NewVarCharPrimaryKey("")
|
||||
}
|
||||
for idx := 0; idx < timestamps.RowNum(); idx++ {
|
||||
timestamp := timestamps.GetRow(idx).(int64)
|
||||
if int64(ts) <= timestamp {
|
||||
continue
|
||||
}
|
||||
primaryKey.SetValue(ids.GetRow(idx))
|
||||
|
||||
if pk.EQ(primaryKey) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
return ok && ts > uint64(minTs)
|
||||
}
|
||||
|
||||
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
|
||||
|
@ -437,6 +420,13 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*
|
|||
data: make([]*storage.InsertData, 0, len(msgs)),
|
||||
pkField: make([]storage.FieldData, 0, len(msgs)),
|
||||
}
|
||||
switch wb.pkField.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
inData.intPKTs = make(map[int64]int64)
|
||||
case schemapb.DataType_VarChar:
|
||||
inData.strPKTs = make(map[string]int64)
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
data, err := storage.InsertMsgToInsertData(msg, wb.collSchema)
|
||||
if err != nil {
|
||||
|
@ -460,12 +450,32 @@ func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*
|
|||
return nil, merr.WrapErrServiceInternal("timestamp column row num not match")
|
||||
}
|
||||
|
||||
timestamps := tsFieldData.GetRows().([]int64)
|
||||
|
||||
switch wb.pkField.GetDataType() {
|
||||
case schemapb.DataType_Int64:
|
||||
pks := pkFieldData.GetRows().([]int64)
|
||||
for idx, pk := range pks {
|
||||
ts, ok := inData.intPKTs[pk]
|
||||
if !ok || timestamps[idx] < ts {
|
||||
inData.intPKTs[pk] = timestamps[idx]
|
||||
}
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
pks := pkFieldData.GetRows().([]string)
|
||||
for idx, pk := range pks {
|
||||
ts, ok := inData.strPKTs[pk]
|
||||
if !ok || timestamps[idx] < ts {
|
||||
inData.strPKTs[pk] = timestamps[idx]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inData.data = append(inData.data, data)
|
||||
inData.pkField = append(inData.pkField, pkFieldData)
|
||||
inData.tsField = append(inData.tsField, tsFieldData)
|
||||
inData.rowNum += int64(data.GetRowNum())
|
||||
}
|
||||
inData.generatePkStats()
|
||||
result = append(result, inData)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue