mirror of https://github.com/milvus-io/milvus.git
fix: Remove enableLevelZeroSegment config (#36535)
See also: #36504 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/36947/head
parent
4d08eec1af
commit
b172ea1093
|
@ -26,7 +26,7 @@ func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
|
|||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) Enable() bool {
|
||||
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() && Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool()
|
||||
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
|
||||
}
|
||||
|
||||
func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) {
|
||||
|
|
|
@ -288,10 +288,6 @@ func (node *DataNode) tryToReleaseFlowgraph(channel string) {
|
|||
|
||||
// Start will update DataNode state to HEALTHY
|
||||
func (node *DataNode) Start() error {
|
||||
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
|
||||
panic("In non-L0 mode, skip loading of bloom filter stats is not allowed.")
|
||||
}
|
||||
|
||||
var startErr error
|
||||
node.startOnce.Do(func() {
|
||||
if err := node.allocator.Start(); err != nil {
|
||||
|
|
|
@ -1,126 +0,0 @@
|
|||
package writebuffer
|
||||
|
||||
import (
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type bfWriteBuffer struct {
|
||||
*writeBufferBase
|
||||
|
||||
syncMgr syncmgr.SyncManager
|
||||
metacache metacache.MetaCache
|
||||
}
|
||||
|
||||
func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (WriteBuffer, error) {
|
||||
base, err := newWriteBufferBase(channel, metacache, syncMgr, option)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &bfWriteBuffer{
|
||||
writeBufferBase: base,
|
||||
syncMgr: syncMgr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
|
||||
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
|
||||
|
||||
split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) {
|
||||
lc := storage.NewBatchLocationsCache(pks)
|
||||
for _, segment := range segments {
|
||||
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
|
||||
var deletePks []storage.PrimaryKey
|
||||
var deleteTss []typeutil.Timestamp
|
||||
for i, hit := range hits {
|
||||
if hit {
|
||||
deletePks = append(deletePks, pks[i])
|
||||
deleteTss = append(deleteTss, pkTss[i])
|
||||
}
|
||||
}
|
||||
|
||||
if len(deletePks) > 0 {
|
||||
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// distribute delete msg for previous data
|
||||
for _, delMsg := range deleteMsgs {
|
||||
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
|
||||
pkTss := delMsg.GetTimestamps()
|
||||
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
|
||||
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
|
||||
|
||||
for idx := 0; idx < len(pks); idx += batchSize {
|
||||
endIdx := idx + batchSize
|
||||
if endIdx > len(pks) {
|
||||
endIdx = len(pks)
|
||||
}
|
||||
split(pks[idx:endIdx], pkTss[idx:endIdx], segments)
|
||||
}
|
||||
|
||||
for _, inData := range groups {
|
||||
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {
|
||||
var deletePks []storage.PrimaryKey
|
||||
var deleteTss []typeutil.Timestamp
|
||||
for idx, pk := range pks {
|
||||
ts := delMsg.GetTimestamps()[idx]
|
||||
if inData.pkExists(pk, ts) {
|
||||
deletePks = append(deletePks, pk)
|
||||
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
|
||||
}
|
||||
}
|
||||
if len(deletePks) > 0 {
|
||||
wb.bufferDelete(inData.segmentID, deletePks, deleteTss, startPos, endPos)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wb *bfWriteBuffer) BufferData(insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
|
||||
wb.mut.Lock()
|
||||
defer wb.mut.Unlock()
|
||||
|
||||
// buffer insert data and add segment if not exists
|
||||
for _, inData := range insertData {
|
||||
err := wb.bufferInsert(inData, startPos, endPos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// distribute delete msg
|
||||
// bf write buffer check bloom filter of segment and current insert batch to decide which segment to write delete data
|
||||
wb.dispatchDeleteMsgs(insertData, deleteMsgs, startPos, endPos)
|
||||
|
||||
// update pk oracle
|
||||
for _, inData := range insertData {
|
||||
// segment shall always exists after buffer insert
|
||||
segments := wb.metaCache.GetSegmentsBy(
|
||||
metacache.WithSegmentIDs(inData.segmentID))
|
||||
for _, segment := range segments {
|
||||
for _, fieldData := range inData.pkField {
|
||||
err := segment.GetBloomFilterSet().UpdatePKRange(fieldData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update buffer last checkpoint
|
||||
wb.checkpoint = endPos
|
||||
|
||||
_ = wb.triggerSync()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,334 +0,0 @@
|
|||
package writebuffer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/broker"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
|
||||
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/testutils"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
)
|
||||
|
||||
type BFWriteBufferSuite struct {
|
||||
testutils.PromMetricsSuite
|
||||
collID int64
|
||||
channelName string
|
||||
syncMgr *syncmgr.MockSyncManager
|
||||
metacacheInt64 *metacache.MockMetaCache
|
||||
metacacheVarchar *metacache.MockMetaCache
|
||||
broker *broker.MockBroker
|
||||
|
||||
collInt64Schema *schemapb.CollectionSchema
|
||||
collInt64PkField *schemapb.FieldSchema
|
||||
|
||||
collVarcharSchema *schemapb.CollectionSchema
|
||||
collVarcharPkField *schemapb.FieldSchema
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) SetupSuite() {
|
||||
paramtable.Get().Init(paramtable.NewBaseTable())
|
||||
s.collID = 100
|
||||
s.collInt64Schema = &schemapb.CollectionSchema{
|
||||
Name: "test_collection",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
|
||||
},
|
||||
{
|
||||
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "128"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
s.collInt64PkField = &schemapb.FieldSchema{
|
||||
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
|
||||
}
|
||||
|
||||
s.collVarcharSchema = &schemapb.CollectionSchema{
|
||||
Name: "test_collection",
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.MaxLengthKey, Value: "100"},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "128"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
s.collVarcharPkField = &schemapb.FieldSchema{
|
||||
FieldID: 100, Name: "pk", DataType: schemapb.DataType_VarChar, IsPrimaryKey: true, TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.MaxLengthKey, Value: "100"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) composeInsertMsg(segmentID int64, rowCount int, dim int, pkType schemapb.DataType) ([]int64, *msgstream.InsertMsg) {
|
||||
tss := lo.RepeatBy(rowCount, func(idx int) int64 { return int64(tsoutil.ComposeTSByTime(time.Now(), int64(idx))) })
|
||||
vectors := lo.RepeatBy(rowCount, func(_ int) []float32 {
|
||||
return lo.RepeatBy(dim, func(_ int) float32 { return rand.Float32() })
|
||||
})
|
||||
|
||||
var pkField *schemapb.FieldData
|
||||
switch pkType {
|
||||
case schemapb.DataType_Int64:
|
||||
pkField = &schemapb.FieldData{
|
||||
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: tss,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
pkField = &schemapb.FieldData{
|
||||
FieldId: common.StartOfUserFieldID, FieldName: "pk", Type: schemapb.DataType_VarChar,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{
|
||||
Data: lo.Map(tss, func(v int64, _ int) string { return fmt.Sprintf("%v", v) }),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
flatten := lo.Flatten(vectors)
|
||||
return tss, &msgstream.InsertMsg{
|
||||
InsertRequest: &msgpb.InsertRequest{
|
||||
SegmentID: segmentID,
|
||||
Version: msgpb.InsertDataVersion_ColumnBased,
|
||||
RowIDs: tss,
|
||||
Timestamps: lo.Map(tss, func(id int64, _ int) uint64 { return uint64(id) }),
|
||||
FieldsData: []*schemapb.FieldData{
|
||||
{
|
||||
FieldId: common.RowIDField, FieldName: common.RowIDFieldName, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: tss,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldId: common.TimeStampField, FieldName: common.TimeStampFieldName, Type: schemapb.DataType_Int64,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{
|
||||
Data: tss,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
pkField,
|
||||
{
|
||||
FieldId: common.StartOfUserFieldID + 1, FieldName: "vector", Type: schemapb.DataType_FloatVector,
|
||||
Field: &schemapb.FieldData_Vectors{
|
||||
Vectors: &schemapb.VectorField{
|
||||
Dim: int64(dim),
|
||||
Data: &schemapb.VectorField_FloatVector{
|
||||
FloatVector: &schemapb.FloatArray{
|
||||
Data: flatten,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) composeDeleteMsg(pks []storage.PrimaryKey) *msgstream.DeleteMsg {
|
||||
delMsg := &msgstream.DeleteMsg{
|
||||
DeleteRequest: &msgpb.DeleteRequest{
|
||||
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
|
||||
Timestamps: lo.RepeatBy(len(pks), func(idx int) uint64 { return tsoutil.ComposeTSByTime(time.Now(), int64(idx+1)) }),
|
||||
},
|
||||
}
|
||||
return delMsg
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) SetupTest() {
|
||||
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
|
||||
s.metacacheInt64 = metacache.NewMockMetaCache(s.T())
|
||||
s.metacacheInt64.EXPECT().Schema().Return(s.collInt64Schema).Maybe()
|
||||
s.metacacheInt64.EXPECT().Collection().Return(s.collID).Maybe()
|
||||
s.metacacheVarchar = metacache.NewMockMetaCache(s.T())
|
||||
s.metacacheVarchar.EXPECT().Schema().Return(s.collVarcharSchema).Maybe()
|
||||
s.metacacheVarchar.EXPECT().Collection().Return(s.collID).Maybe()
|
||||
|
||||
s.broker = broker.NewMockBroker(s.T())
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) TestBufferData() {
|
||||
s.Run("normal_run_int64", func() {
|
||||
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.syncMgr, &writeBufferOption{})
|
||||
s.NoError(err)
|
||||
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil)
|
||||
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
|
||||
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
|
||||
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
|
||||
|
||||
metrics.DataNodeFlowGraphBufferDataSize.Reset()
|
||||
insertData, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg})
|
||||
s.NoError(err)
|
||||
|
||||
err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
|
||||
s.NoError(err)
|
||||
|
||||
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
|
||||
s.NoError(err)
|
||||
s.MetricsEqual(value, 5607)
|
||||
|
||||
delMsg = s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
|
||||
err = wb.BufferData([]*InsertData{}, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
|
||||
s.NoError(err)
|
||||
s.MetricsEqual(value, 5847)
|
||||
})
|
||||
|
||||
s.Run("normal_run_varchar", func() {
|
||||
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheVarchar, s.syncMgr, &writeBufferOption{})
|
||||
s.NoError(err)
|
||||
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil)
|
||||
s.metacacheVarchar.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacacheVarchar.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false)
|
||||
s.metacacheVarchar.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacacheVarchar.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
|
||||
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
|
||||
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewVarCharPrimaryKey(fmt.Sprintf("%v", id)) }))
|
||||
|
||||
metrics.DataNodeFlowGraphBufferDataSize.Reset()
|
||||
insertData, err := PrepareInsert(s.collVarcharSchema, s.collVarcharPkField, []*msgstream.InsertMsg{msg})
|
||||
s.NoError(err)
|
||||
|
||||
err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
|
||||
s.NoError(err)
|
||||
|
||||
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
|
||||
s.NoError(err)
|
||||
s.MetricsEqual(value, 7227)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) TestPrepareInsert() {
|
||||
s.Run("int_pk_type_not_match", func() {
|
||||
_, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_VarChar)
|
||||
_, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg})
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("varchar_pk_not_match", func() {
|
||||
_, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
|
||||
_, err := PrepareInsert(s.collVarcharSchema, s.collVarcharPkField, []*msgstream.InsertMsg{msg})
|
||||
s.Error(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) TestAutoSync() {
|
||||
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushInsertBufferSize.Key, "1")
|
||||
|
||||
s.Run("normal_auto_sync", func() {
|
||||
wb, err := NewBFWriteBuffer(s.channelName, s.metacacheInt64, s.syncMgr, &writeBufferOption{
|
||||
syncPolicies: []SyncPolicy{
|
||||
GetFullBufferPolicy(),
|
||||
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
|
||||
GetSealedSegmentsPolicy(s.metacacheInt64),
|
||||
},
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1000}, pkoracle.NewBloomFilterSet(), nil)
|
||||
seg1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 1002}, pkoracle.NewBloomFilterSet(), nil)
|
||||
s.metacacheInt64.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
|
||||
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(nil, false).Once()
|
||||
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1000)).Return(seg, true).Once()
|
||||
s.metacacheInt64.EXPECT().GetSegmentByID(int64(1002)).Return(seg1, true)
|
||||
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything).Return([]int64{1002})
|
||||
s.metacacheInt64.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything, mock.Anything).Return([]int64{})
|
||||
s.metacacheInt64.EXPECT().AddSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
|
||||
s.metacacheInt64.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
|
||||
pks, msg := s.composeInsertMsg(1000, 10, 128, schemapb.DataType_Int64)
|
||||
delMsg := s.composeDeleteMsg(lo.Map(pks, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
|
||||
|
||||
insertData, err := PrepareInsert(s.collInt64Schema, s.collInt64PkField, []*msgstream.InsertMsg{msg})
|
||||
s.NoError(err)
|
||||
|
||||
metrics.DataNodeFlowGraphBufferDataSize.Reset()
|
||||
err = wb.BufferData(insertData, []*msgstream.DeleteMsg{delMsg}, &msgpb.MsgPosition{Timestamp: 100}, &msgpb.MsgPosition{Timestamp: 200})
|
||||
s.NoError(err)
|
||||
|
||||
value, err := metrics.DataNodeFlowGraphBufferDataSize.GetMetricWithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(s.metacacheInt64.Collection()))
|
||||
s.NoError(err)
|
||||
s.MetricsEqual(value, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BFWriteBufferSuite) TestCreateFailure() {
|
||||
metacache := metacache.NewMockMetaCache(s.T())
|
||||
metacache.EXPECT().Collection().Return(s.collID)
|
||||
metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{})
|
||||
_, err := NewBFWriteBuffer(s.channelName, metacache, s.syncMgr, &writeBufferOption{})
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func TestBFWriteBuffer(t *testing.T) {
|
||||
suite.Run(t, new(BFWriteBufferSuite))
|
||||
}
|
|
@ -9,20 +9,11 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
const (
|
||||
// DeletePolicyBFPKOracle is the const config value for using bf pk oracle as delete policy
|
||||
DeletePolicyBFPkOracle = `bloom_filter_pkoracle`
|
||||
|
||||
// DeletePolicyL0Delta is the const config value for using L0 delta as deleta policy.
|
||||
DeletePolicyL0Delta = `l0_delta`
|
||||
)
|
||||
|
||||
type WriteBufferOption func(opt *writeBufferOption)
|
||||
|
||||
type TaskObserverCallback func(t syncmgr.Task, err error)
|
||||
|
||||
type writeBufferOption struct {
|
||||
deletePolicy string
|
||||
idAllocator allocator.Interface
|
||||
syncPolicies []SyncPolicy
|
||||
|
||||
|
@ -33,13 +24,7 @@ type writeBufferOption struct {
|
|||
}
|
||||
|
||||
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
|
||||
deletePolicy := DeletePolicyBFPkOracle
|
||||
if paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
|
||||
deletePolicy = DeletePolicyL0Delta
|
||||
}
|
||||
|
||||
return &writeBufferOption{
|
||||
deletePolicy: deletePolicy,
|
||||
syncPolicies: []SyncPolicy{
|
||||
GetFullBufferPolicy(),
|
||||
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
|
||||
|
@ -53,12 +38,6 @@ func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithDeletePolicy(policy string) WriteBufferOption {
|
||||
return func(opt *writeBufferOption) {
|
||||
opt.deletePolicy = policy
|
||||
}
|
||||
}
|
||||
|
||||
func WithIDAllocator(allocator allocator.Interface) WriteBufferOption {
|
||||
return func(opt *writeBufferOption) {
|
||||
opt.idAllocator = allocator
|
||||
|
|
|
@ -119,14 +119,7 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, syncMgr syncm
|
|||
opt(option)
|
||||
}
|
||||
|
||||
switch option.deletePolicy {
|
||||
case DeletePolicyBFPkOracle:
|
||||
return NewBFWriteBuffer(channel, metacache, syncMgr, option)
|
||||
case DeletePolicyL0Delta:
|
||||
return NewL0WriteBuffer(channel, metacache, syncMgr, option)
|
||||
default:
|
||||
return nil, merr.WrapErrParameterInvalid("valid delete policy config", option.deletePolicy)
|
||||
}
|
||||
return NewL0WriteBuffer(channel, metacache, syncMgr, option)
|
||||
}
|
||||
|
||||
// writeBufferBase is the common component for buffering data
|
||||
|
|
|
@ -60,42 +60,6 @@ func (s *WriteBufferSuite) SetupTest() {
|
|||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestDefaultOption() {
|
||||
s.Run("default BFPkOracle", func() {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "false")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr)
|
||||
s.NoError(err)
|
||||
_, ok := wb.(*bfWriteBuffer)
|
||||
s.True(ok)
|
||||
})
|
||||
|
||||
s.Run("default L0Delta policy", func() {
|
||||
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key)
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithIDAllocator(allocator.NewMockGIDAllocator()))
|
||||
s.NoError(err)
|
||||
_, ok := wb.(*l0WriteBuffer)
|
||||
s.True(ok)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestWriteBufferType() {
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
|
||||
s.NoError(err)
|
||||
|
||||
_, ok := wb.(*bfWriteBuffer)
|
||||
s.True(ok)
|
||||
|
||||
wb, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyL0Delta), WithIDAllocator(allocator.NewMockGIDAllocator()))
|
||||
s.NoError(err)
|
||||
_, ok = wb.(*l0WriteBuffer)
|
||||
s.True(ok)
|
||||
|
||||
_, err = NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(""))
|
||||
s.Error(err)
|
||||
}
|
||||
|
||||
func (s *WriteBufferSuite) TestHasSegment() {
|
||||
segmentID := int64(1001)
|
||||
|
||||
|
@ -111,8 +75,7 @@ func (s *WriteBufferSuite) TestFlushSegments() {
|
|||
|
||||
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
s.metacache.EXPECT().GetSegmentByID(mock.Anything, mock.Anything, mock.Anything).Return(nil, true)
|
||||
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
|
||||
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.syncMgr, WithIDAllocator(allocator.NewMockAllocator(s.T())))
|
||||
s.NoError(err)
|
||||
|
||||
err = wb.SealSegments(context.Background(), []int64{segmentID})
|
||||
|
|
|
@ -13,7 +13,6 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
|
||||
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
|
||||
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
// Server is the streamingnode server.
|
||||
|
@ -44,9 +43,6 @@ func (s *Server) Init(ctx context.Context) (err error) {
|
|||
|
||||
// Start starts the streamingnode server.
|
||||
func (s *Server) Start() {
|
||||
if !paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() {
|
||||
panic("In streaming service mode, disable L0 is not allowed.")
|
||||
}
|
||||
resource.Resource().Flusher().Start()
|
||||
log.Info("flusher started")
|
||||
}
|
||||
|
|
|
@ -3231,7 +3231,6 @@ type dataCoordConfig struct {
|
|||
ClusteringCompactionMaxClusterSize ParamItem `refreshable:"true"`
|
||||
|
||||
// LevelZero Segment
|
||||
EnableLevelZeroSegment ParamItem `refreshable:"false"`
|
||||
LevelZeroCompactionTriggerMinSize ParamItem `refreshable:"true"`
|
||||
LevelZeroCompactionTriggerMaxSize ParamItem `refreshable:"true"`
|
||||
LevelZeroCompactionTriggerDeltalogMinNum ParamItem `refreshable:"true"`
|
||||
|
@ -3627,15 +3626,6 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.SyncSegmentsInterval.Init(base.mgr)
|
||||
|
||||
// LevelZeroCompaction
|
||||
p.EnableLevelZeroSegment = ParamItem{
|
||||
Key: "dataCoord.segment.enableLevelZero",
|
||||
Version: "2.4.0",
|
||||
Doc: "Whether to enable LevelZeroCompaction",
|
||||
DefaultValue: "true",
|
||||
}
|
||||
p.EnableLevelZeroSegment.Init(base.mgr)
|
||||
|
||||
p.LevelZeroCompactionTriggerMinSize = ParamItem{
|
||||
Key: "dataCoord.compaction.levelzero.forceTrigger.minSize",
|
||||
Version: "2.4.0",
|
||||
|
|
Loading…
Reference in New Issue