enhance: only buffer delete if it match insert has smaller timestamp (#30122)

See also: #30121 #27675

This PR changes the delete buffering logic:
- Write buffer shall buffer insert first
- Then the delete messages shall be evaluated
  - Whether PK matches previous Bloom filter, which ts is always smaller
  - Whether PK matches insert data which has smaller timestamp
- Then the segment bloom filter is updates by the newly buffered pk rows

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30141/head
congqixia 2024-01-19 17:28:53 +08:00 committed by GitHub
parent 4ef6217fc0
commit 0d356b0545
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 562 additions and 289 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -28,30 +29,8 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
}, nil
}
func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
// process insert msgs
pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos)
if err != nil {
return err
}
// update pk oracle
for segmentID, dataList := range pkData {
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
for _, segment := range segments {
for _, fieldData := range dataList {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
// distribute delete msg
func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
// distribute delete msg for previous data
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
@ -72,6 +51,59 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
}
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID {
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
ts := delMsg.GetTimestamps()[idx]
if inData.pkExists(pk, ts) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(inData.segmentID, deletePks, deleteTss, startPos, endPos)
}
}
}
}
}
func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
groups, err := wb.prepareInsert(insertMsgs)
if err != nil {
return err
}
// buffer insert data and add segment if not exists
for _, inData := range groups {
err := wb.bufferInsert(inData, startPos, endPos)
if err != nil {
return err
}
}
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
// update pk oracle
for _, inData := range groups {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
}
}
}
}
// update buffer last checkpoint

View File

@ -33,19 +33,21 @@ import (
type BFWriteBufferSuite struct {
testutils.PromMetricsSuite
collID int64
channelName string
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
broker *broker.MockBroker
storageV2Cache *metacache.StorageV2Cache
collID int64
channelName string
collInt64Schema *schemapb.CollectionSchema
collVarcharSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacacheInt64 *metacache.MockMetaCache
metacacheVarchar *metacache.MockMetaCache
broker *broker.MockBroker
storageV2Cache *metacache.StorageV2Cache
}
func (s *BFWriteBufferSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
s.collID = 100
s.collSchema = &schemapb.CollectionSchema{
s.collInt64Schema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
@ -65,17 +67,69 @@ func (s *BFWriteBufferSuite) SetupSuite() {
},
},
}
s.collVarcharSchema = &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "100"},
},
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
storageCache, err := metacache.NewStorageV2Cache(s.collSchema)
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
s.storageV2Cache = storageCache
}
func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) {
func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
var pkField *schemapb.FieldData
switch pkType {
case schemapb.DataType_Int64:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
}
case schemapb.DataType_VarChar:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }),
},
},
},
},
}
}
flatten := lo.Flatten(vectors)
return tss, &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
@ -108,18 +162,7 @@ func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim
},
},
},
{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
pkField,
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
@ -142,7 +185,7 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre
delMsg := &msgstream.DeleteMsg{
DeleteRequest: msgpb.DeleteRequest{
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx+1)) }),
},
}
return delMsg
@ -150,74 +193,152 @@ func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre
func (s *BFWriteBufferSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.metacache.EXPECT().Schema().Return(s.collSchema).Maybe()
s.metacache.EXPECT().Collection().Return(s.collID).Maybe()
s.metacacheInt64 = metacache.NewMockMetaCache(s.T())
s.metacacheInt64.EXPECT().Schema().Return(s.collInt64Schema).Maybe()
s.metacacheInt64.EXPECT().Collection().Return(s.collID).Maybe()
s.metacacheVarchar = metacache.NewMockMetaCache(s.T())
s.metacacheVarchar.EXPECT().Schema().Return(s.collVarcharSchema).Maybe()
s.metacacheVarchar.EXPECT().Collection().Return(s.collID).Maybe()
s.broker = broker.NewMockBroker(s.T())
var err error
s.storageV2Cache, err = metacache.NewStorageV2Cache(s.collSchema)
s.storageV2Cache, err = metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
}
func (s *BFWriteBufferSuite) TestBufferData() {
storageCache, err := metacache.NewStorageV2Cache(s.collSchema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
}
func (s *BFWriteBufferSuite) TestAutoSync() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
})
s.Run("normal_run_int64", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
s.MetricsEqual(value, 5684)
})
s.Run("normal_run_varchar", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5884)
})
s.Run("int_pk_type_not_match", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collInt64Schema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
s.Run("varchar_pk_not_match", func() {
storageCache, err := metacache.NewStorageV2Cache(s.collVarcharSchema)
s.Require().NoError(err)
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, storageCache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheVarchar.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
}
func (s *BFWriteBufferSuite) TestAutoSync() {
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, nil, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacacheInt64),
},
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, metacache.NewBloomFilterSet())
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 0)
})
@ -228,7 +349,7 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
params.Params.CommonCfg.StorageScheme.SwapTempValue("file")
tmpDir := s.T().TempDir()
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().
SetSchema(schema.NewSchema(arrowSchema, &schema.SchemaOptions{
@ -236,17 +357,17 @@ func (s *BFWriteBufferSuite) TestBufferDataWithStorageV2() {
})).Build())
s.Require().NoError(err)
s.storageV2Cache.SetSpace(1000, space)
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{})
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
pks, msg := s.composeInsertMsg(1000, 10, 128)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
@ -258,7 +379,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
tmpDir := s.T().TempDir()
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collSchema.Fields)
arrowSchema, err := typeutil.ConvertToArrowSchema(s.collInt64Schema.Fields)
s.Require().NoError(err)
space, err := milvus_storage.Open(fmt.Sprintf("file:///%s", tmpDir), options.NewSpaceOptionBuilder().
@ -269,11 +390,11 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
s.storageV2Cache.SetSpace(1002, space)
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.storageV2Cache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
GetFlushingSegmentsPolicy(s.metacacheInt64),
},
})
s.NoError(err)
@ -283,26 +404,26 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
segCompacted := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
metacache.CompactTo(2001)(segCompacted)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacache.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted
s.metacache.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg, segCompacted})
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{1003}) // mocked compacted
s.metacacheInt64.EXPECT().RemoveSegments(mock.Anything).Return([]int64{1003})
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
pks, msg := s.composeInsertMsg(1000, 10, 128)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
s.NoError(err)
s.MetricsEqual(value, 0)
})

View File

@ -10,8 +10,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -118,38 +116,17 @@ func (ib *InsertBuffer) Yield() *storage.InsertData {
return ib.buffer
}
func (ib *InsertBuffer) Buffer(msgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) ([]storage.FieldData, int64, error) {
pkData := make([]storage.FieldData, 0, len(msgs))
var totalMemSize int64 = 0
for _, msg := range msgs {
tmpBuffer, err := storage.InsertMsgToInsertData(msg, ib.collSchema)
if err != nil {
log.Warn("failed to transfer insert msg to insert data", zap.Error(err))
return nil, 0, err
}
pkFieldData, err := storage.GetPkFromInsertData(ib.collSchema, tmpBuffer)
if err != nil {
return nil, 0, err
}
if pkFieldData.RowNum() != tmpBuffer.GetRowNum() {
return nil, 0, merr.WrapErrServiceInternal("pk column row num not match")
}
pkData = append(pkData, pkFieldData)
storage.MergeInsertData(ib.buffer, tmpBuffer)
tsData, err := storage.GetTimestampFromInsertData(tmpBuffer)
if err != nil {
log.Warn("no timestamp field found in insert msg", zap.Error(err))
return nil, 0, err
}
func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 {
totalMemSize := int64(0)
for idx, data := range inData.data {
storage.MergeInsertData(ib.buffer, data)
tsData := inData.tsField[idx]
// update buffer size
ib.UpdateStatistics(int64(tmpBuffer.GetRowNum()), int64(tmpBuffer.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos)
totalMemSize += int64(tmpBuffer.GetMemorySize())
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos)
totalMemSize += int64(data.GetMemorySize())
}
return pkData, totalMemSize, nil
return totalMemSize
}
func (ib *InsertBuffer) getTimestampRange(tsData *storage.Int64FieldData) TimeRange {

View File

@ -11,7 +11,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -128,65 +127,28 @@ func (s *InsertBufferSuite) TestBasic() {
}
func (s *InsertBufferSuite) TestBuffer() {
s.Run("normal_buffer", func() {
pks, insertMsg := s.composeInsertMsg(10, 128)
wb := &writeBufferBase{
collSchema: s.collSchema,
}
_, insertMsg := s.composeInsertMsg(10, 128)
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
fieldData, memSize, err := insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
groups, err := wb.prepareInsert([]*msgstream.InsertMsg{insertMsg})
s.Require().NoError(err)
s.Require().Len(groups, 1)
pkData := lo.Map(fieldData, func(fd storage.FieldData, _ int) []int64 {
return lo.RepeatBy(fd.RowNum(), func(idx int) int64 { return fd.GetRow(idx).(int64) })
})
s.ElementsMatch(pks, lo.Flatten(pkData))
s.EqualValues(100, insertBuffer.MinTimestamp())
s.EqualValues(5364, memSize)
})
memSize := insertBuffer.Buffer(groups[0], &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Run("pk_not_found", func() {
_, insertMsg := s.composeInsertMsg(10, 128)
insertMsg.FieldsData = []*schemapb.FieldData{insertMsg.FieldsData[0], insertMsg.FieldsData[1], insertMsg.FieldsData[3]}
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
_, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
s.Run("schema_without_pk", func() {
badSchema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
_, insertMsg := s.composeInsertMsg(10, 128)
insertBuffer, err := NewInsertBuffer(badSchema)
s.Require().NoError(err)
_, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
s.EqualValues(100, insertBuffer.MinTimestamp())
s.EqualValues(5364, memSize)
}
func (s *InsertBufferSuite) TestYield() {
wb := &writeBufferBase{
collSchema: s.collSchema,
}
insertBuffer, err := NewInsertBuffer(s.collSchema)
s.Require().NoError(err)
@ -197,8 +159,11 @@ func (s *InsertBufferSuite) TestYield() {
s.Require().NoError(err)
pks, insertMsg := s.composeInsertMsg(10, 128)
_, _, err = insertBuffer.Buffer([]*msgstream.InsertMsg{insertMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
groups, err := wb.prepareInsert([]*msgstream.InsertMsg{insertMsg})
s.Require().NoError(err)
s.Require().Len(groups, 1)
insertBuffer.Buffer(groups[0], &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
result = insertBuffer.Yield()
s.NotNil(result)

View File

@ -12,10 +12,12 @@ import (
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type l0WriteBuffer struct {
@ -45,22 +47,75 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
}, nil
}
func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
for _, delMsg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for _, segment := range segments {
if segment.CompactTo() != 0 {
continue
}
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
if segment.GetBloomFilterSet().PkExists(pk) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.InvalidPartitionID || delMsg.GetPartitionID() == inData.partitionID {
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
ts := delMsg.GetTimestamps()[idx]
if inData.pkExists(pk, ts) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
}
}
}
}
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
wb.mut.Lock()
defer wb.mut.Unlock()
// process insert msgs
pkData, err := wb.bufferInsert(insertMsgs, startPos, endPos)
groups, err := wb.prepareInsert(insertMsgs)
if err != nil {
log.Warn("failed to buffer insert data", zap.Error(err))
return err
}
// buffer insert data and add segment if not exists
for _, inData := range groups {
err := wb.bufferInsert(inData, startPos, endPos)
if err != nil {
return err
}
}
// distribute delete msg
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
wb.dispatchDeleteMsgs(groups, deleteMsgs, startPos, endPos)
// update pk oracle
for segmentID, dataList := range pkData {
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(segmentID))
for _, inData := range groups {
// segment shall always exists after buffer insert
segments := wb.metaCache.GetSegmentsBy(metacache.WithSegmentIDs(inData.segmentID))
for _, segment := range segments {
for _, fieldData := range dataList {
for _, fieldData := range inData.pkField {
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
if err != nil {
return err
@ -69,16 +124,6 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
}
}
for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos)
if err != nil {
log.Warn("failed to buffer delete data", zap.Error(err))
return err
}
}
// update buffer last checkpoint
wb.checkpoint = endPos

View File

@ -67,12 +67,41 @@ func (s *L0WriteBufferSuite) SetupSuite() {
s.storageCache = storageCache
}
func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int) ([]int64, *msgstream.InsertMsg) {
func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) {
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
})
flatten := lo.Flatten(vectors)
var pkField *schemapb.FieldData
switch pkType {
case schemapb.DataType_Int64:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
}
case schemapb.DataType_VarChar:
pkField = &schemapb.FieldData{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }),
},
},
},
},
}
}
return tss, &msgstream.InsertMsg{
InsertRequest: msgpb.InsertRequest{
SegmentID: segmentID,
@ -104,18 +133,7 @@ func (s *L0WriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim
},
},
},
{
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: tss,
},
},
},
},
},
pkField,
{
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
Field: &schemapb.FieldData_Vectors{
@ -138,7 +156,7 @@ func (s *L0WriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstre
delMsg := &msgstream.DeleteMsg{
DeleteRequest: msgpb.DeleteRequest{
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)) }),
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx)+1) }),
},
}
return delMsg
@ -154,28 +172,55 @@ func (s *L0WriteBufferSuite) SetupTest() {
}
func (s *L0WriteBufferSuite) TestBufferData() {
wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
s.Run("normal_run", func() {
wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.NoError(err)
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
err = wb.BufferData([]*msgstream.InsertMsg{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
s.MetricsEqual(value, 5684)
})
s.NoError(err)
pks, msg := s.composeInsertMsg(1000, 10, 128)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
s.Run("pk_type_not_match", func() {
wb, err := NewL0WriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
idAllocator: s.allocator,
})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.NoError(err)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, metacache.NewBloomFilterSet())
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
s.metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return([]int64{})
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacache.Collection()))
s.NoError(err)
s.MetricsEqual(value, 5524)
metrics.DataNodeFlowGraphBufferDataSize.Reset()
err = wb.BufferData([]*msgstream.InsertMsg{msg}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
s.Error(err)
})
}
func (s *L0WriteBufferSuite) TestCreateFailure() {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
@ -325,51 +326,138 @@ func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *s
return insert, delta, timeRange, start
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, startPos, endPos *msgpb.MsgPosition) (map[int64][]storage.FieldData, error) {
insertGroups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.GetSegmentID() })
segmentPKData := make(map[int64][]storage.FieldData)
segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() })
type inData struct {
segmentID int64
partitionID int64
data []*storage.InsertData
pkField []storage.FieldData
tsField []*storage.Int64FieldData
rowNum int64
for segmentID, msgs := range insertGroups {
_, ok := wb.metaCache.GetSegmentByID(segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
PartitionID: segmentPartition[segmentID],
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.SetStartPosRecorded(false))
}
batchBF *storage.PkStatistics
}
segBuf := wb.getOrCreateBuffer(segmentID)
pkData, totalMemSize, err := segBuf.insertBuffer.Buffer(msgs, startPos, endPos)
if err != nil {
log.Warn("failed to buffer insert data", zap.Int64("segmentID", segmentID), zap.Error(err))
return nil, err
}
segmentPKData[segmentID] = pkData
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(segmentID))
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize))
func (id *inData) generatePkStats() {
id.batchBF = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
return segmentPKData, nil
for _, ids := range id.pkField {
id.batchBF.UpdatePKRange(ids)
}
}
func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
if !id.batchBF.PkExist(pk) {
return false
}
for batchIdx, timestamps := range id.tsField {
ids := id.pkField[batchIdx]
var primaryKey storage.PrimaryKey
switch pk.Type() {
case schemapb.DataType_Int64:
primaryKey = storage.NewInt64PrimaryKey(0)
case schemapb.DataType_VarChar:
primaryKey = storage.NewVarCharPrimaryKey("")
}
for idx := 0; idx < timestamps.RowNum(); idx++ {
timestamp := timestamps.GetRow(idx).(int64)
if int64(ts) <= timestamp {
continue
}
primaryKey.SetValue(ids.GetRow(idx))
if pk.EQ(primaryKey) {
return true
}
}
}
return false
}
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
// also returns primary key field data
func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*inData, error) {
groups := lo.GroupBy(insertMsgs, func(msg *msgstream.InsertMsg) int64 { return msg.SegmentID })
segmentPartition := lo.SliceToMap(insertMsgs, func(msg *msgstream.InsertMsg) (int64, int64) { return msg.GetSegmentID(), msg.GetPartitionID() })
result := make([]*inData, 0, len(groups))
for segment, msgs := range groups {
inData := &inData{
segmentID: segment,
partitionID: segmentPartition[segment],
data: make([]*storage.InsertData, 0, len(msgs)),
pkField: make([]storage.FieldData, 0, len(msgs)),
}
for _, msg := range msgs {
data, err := storage.InsertMsgToInsertData(msg, wb.collSchema)
if err != nil {
log.Warn("failed to transfer insert msg to insert data", zap.Error(err))
return nil, err
}
pkFieldData, err := storage.GetPkFromInsertData(wb.collSchema, data)
if err != nil {
return nil, err
}
if pkFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("pk column row num not match")
}
tsFieldData, err := storage.GetTimestampFromInsertData(data)
if err != nil {
return nil, err
}
if tsFieldData.RowNum() != data.GetRowNum() {
return nil, merr.WrapErrServiceInternal("timestamp column row num not match")
}
inData.data = append(inData.data, data)
inData.pkField = append(inData.pkField, pkFieldData)
inData.tsField = append(inData.tsField, tsFieldData)
inData.rowNum += int64(data.GetRowNum())
}
inData.generatePkStats()
result = append(result, inData)
}
return result, nil
}
// bufferInsert transform InsertMsg into bufferred InsertData and returns primary key field data for future usage.
func (wb *writeBufferBase) bufferInsert(inData *inData, startPos, endPos *msgpb.MsgPosition) error {
_, ok := wb.metaCache.GetSegmentByID(inData.segmentID)
// new segment
if !ok {
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: inData.segmentID,
PartitionID: inData.partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize())
}, metacache.SetStartPosRecorded(false))
}
segBuf := wb.getOrCreateBuffer(inData.segmentID)
totalMemSize := segBuf.insertBuffer.Buffer(inData, startPos, endPos)
wb.metaCache.UpdateSegments(metacache.UpdateBufferedRows(segBuf.insertBuffer.rows),
metacache.WithSegmentIDs(inData.segmentID))
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(totalMemSize))
return nil
}
// bufferDelete buffers DeleteMsg into DeleteData.
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) error {
func (wb *writeBufferBase) bufferDelete(segmentID int64, pks []storage.PrimaryKey, tss []typeutil.Timestamp, startPos, endPos *msgpb.MsgPosition) {
segBuf := wb.getOrCreateBuffer(segmentID)
bufSize := segBuf.deltaBuffer.Buffer(pks, tss, startPos, endPos)
metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Add(float64(bufSize))
return nil
}
func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (syncmgr.Task, error) {