Fix flush didn't respect binaryvector and other schemas (#21120)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/21134/head
Xiaofan 2022-12-12 10:33:26 +08:00 committed by GitHub
parent 18ef74007f
commit e977e014a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 136 additions and 85 deletions

View File

@ -271,11 +271,11 @@ func (t *compactionTrigger) estimateDiskSegmentMaxNumOfRows(collectionID UniqueI
return t.estimateDiskSegmentPolicy(collMeta.Schema)
}
func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) error {
func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) {
ctx := context.Background()
if len(segments) == 0 {
return nil
return false, nil
}
collectionID := segments[0].GetCollectionID()
@ -284,24 +284,26 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) error
IndexName: "",
})
if err != nil {
return err
return false, err
}
isDiskIndex := false
for _, indexInfo := range resp.IndexInfos {
indexParamsMap := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
if indexType, ok := indexParamsMap["index_type"]; ok {
if indexType == indexparamcheck.IndexDISKANN {
diskSegmentMaxRows, err := t.estimateDiskSegmentMaxNumOfRows(collectionID)
if err != nil {
return err
return false, err
}
for _, segment := range segments {
segment.MaxRowNum = int64(diskSegmentMaxRows)
}
isDiskIndex = true
}
}
}
return nil
return isDiskIndex, nil
}
func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
@ -336,7 +338,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...)
err := t.updateSegmentMaxSize(group.segments)
isDiskIndex, err := t.updateSegmentMaxSize(group.segments)
if err != nil {
log.Warn("failed to update segment max size,", zap.Error(err))
continue
@ -351,7 +353,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
return
}
plans := t.generatePlans(group.segments, signal.isForce, ct)
plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct)
for _, plan := range plans {
segIDs := fetchSegIDs(plan.GetSegmentBinlogs())
@ -419,7 +421,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
return
}
err := t.updateSegmentMaxSize(segments)
isDiskIndex, err := t.updateSegmentMaxSize(segments)
if err != nil {
log.Warn("failed to update segment max size", zap.Error(err))
}
@ -438,7 +440,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
return
}
plans := t.generatePlans(segments, signal.isForce, ct)
plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
for _, plan := range plans {
if t.compactionHandler.isFull() {
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
@ -467,7 +469,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
}
}
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime) []*datapb.CompactionPlan {
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, isDiskIndex bool, compactTime *compactTime) []*datapb.CompactionPlan {
// find segments need internal compaction
// TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively
var prioritizedCandidates []*SegmentInfo
@ -478,7 +480,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
for _, segment := range segments {
segment := segment.ShadowClone()
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
if force || t.ShouldDoSingleCompaction(segment, compactTime) {
if force || t.ShouldDoSingleCompaction(segment, isDiskIndex, compactTime) {
prioritizedCandidates = append(prioritizedCandidates, segment)
} else if t.isSmallSegment(segment) {
smallCandidates = append(smallCandidates, segment)
@ -741,24 +743,43 @@ func (t *compactionTrigger) isStaleSegment(segment *SegmentInfo) bool {
return time.Since(segment.lastFlushTime).Minutes() >= segmentTimedFlushDuration
}
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
// count all the binlog file count
var totalLogNum int
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDiskIndex bool, compactTime *compactTime) bool {
// no longer restricted binlog numbers because this is now related to field numbers
var binLog int
for _, binlogs := range segment.GetBinlogs() {
totalLogNum += len(binlogs.GetBinlogs())
binLog += len(binlogs.GetBinlogs())
}
// count all the statlog file count, only for flush generated segments
if len(segment.CompactionFrom) == 0 {
var statsLog int
for _, statsLogs := range segment.GetStatslogs() {
statsLog += len(statsLogs.GetBinlogs())
}
var maxSize int
if isDiskIndex {
maxSize = int(Params.DataCoordCfg.DiskSegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64())
} else {
maxSize = int(Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64())
}
// if stats log is more than expected, trigger compaction to reduce stats log size.
// TODO maybe we want to compact to single statslog to reduce watch dml channel cost
// TODO avoid rebuild index twice.
if statsLog > maxSize*2.0 {
log.Info("stats number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Stat logs", statsLog))
return true
}
}
var deltaLog int
for _, deltaLogs := range segment.GetDeltalogs() {
totalLogNum += len(deltaLogs.GetBinlogs())
deltaLog += len(deltaLogs.GetBinlogs())
}
for _, statsLogs := range segment.GetStatslogs() {
totalLogNum += len(statsLogs.GetBinlogs())
}
// avoid segment has too many bin logs and the etcd meta is too large, force trigger compaction
if totalLogNum > Params.DataCoordCfg.SingleCompactionBinlogMaxNum.GetAsInt() {
log.Info("total binlog number is too much, trigger compaction", zap.Int64("segment", segment.ID),
zap.Int("Delta logs", len(segment.GetDeltalogs())), zap.Int("Bin Logs", len(segment.GetBinlogs())), zap.Int("Stat logs", len(segment.GetStatslogs())))
if deltaLog > Params.DataCoordCfg.SingleCompactionDeltalogMaxNum.GetAsInt() {
log.Info("total delta number is too much, trigger compaction", zap.Int64("segment", segment.ID), zap.Int("Bin logs", binLog), zap.Int("Delta logs", deltaLog))
return true
}

View File

@ -825,6 +825,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
compactTime *compactTime
}
Params.Init()
Params.DataCoordCfg.MinSegmentToMerge.DefaultValue = "4"
vecFieldID := int64(201)
tests := []struct {
name string
@ -1522,9 +1523,9 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler())
// Test too many files.
// Test too many deltalogs.
var binlogs []*datapb.FieldBinlog
for i := UniqueID(0); i < 5000; i++ {
for i := UniqueID(0); i < 1000; i++ {
binlogs = append(binlogs, &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogPath: "log1", LogSize: 100},
@ -1541,13 +1542,46 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: binlogs,
Deltalogs: binlogs,
},
}
couldDo := trigger.ShouldDoSingleCompaction(info, &compactTime{travelTime: 200, expireTime: 0})
couldDo := trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
assert.True(t, couldDo)
//Test too many stats log
info = &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 100,
MaxRowNum: 300,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Statslogs: binlogs,
},
}
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
assert.True(t, couldDo)
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0})
assert.True(t, couldDo)
// if only 10 bin logs, then disk index won't trigger compaction
info.Statslogs = binlogs[0:20]
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
assert.True(t, couldDo)
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0})
assert.False(t, couldDo)
//Test too many stats log but compacted
info.CompactionFrom = []int64{0, 1}
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
assert.False(t, couldDo)
//Test expire triggered compaction
var binlogs2 []*datapb.FieldBinlog
for i := UniqueID(0); i < 100; i++ {
@ -1580,15 +1614,15 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
}
// expire time < Timestamp To
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 300})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 300})
assert.False(t, couldDo)
// didn't reach single compaction size 10 * 1024 * 1024
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 600})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 600})
assert.False(t, couldDo)
// expire time < Timestamp False
couldDo = trigger.ShouldDoSingleCompaction(info2, &compactTime{travelTime: 200, expireTime: 1200})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 1200})
assert.True(t, couldDo)
// Test Delete triggered compaction
@ -1623,11 +1657,11 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
}
// expire time < Timestamp To
couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{travelTime: 600, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 600, expireTime: 0})
assert.False(t, couldDo)
// deltalog is large enough, should do compaction
couldDo = trigger.ShouldDoSingleCompaction(info3, &compactTime{travelTime: 800, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 800, expireTime: 0})
assert.True(t, couldDo)
}

View File

@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect
@ -332,32 +333,20 @@ func (ddb *DelDataBuf) updateStartAndEndPosition(startPos *internalpb.MsgPositio
// * This need to change for string field support and multi-vector fields support.
func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) {
// Get Dimension
// TODO GOOSE: under assumption that there's only 1 Vector field in one collection schema
var vectorSize int
for _, field := range collSchema.Fields {
if field.DataType == schemapb.DataType_FloatVector ||
field.DataType == schemapb.DataType_BinaryVector {
dimension, err := storage.GetDimFromParams(field.TypeParams)
switch field.DataType {
case schemapb.DataType_FloatVector:
vectorSize = dimension * 4
case schemapb.DataType_BinaryVector:
vectorSize = dimension / 8
}
if err != nil {
log.Error("failed to get dim from field", zap.Error(err))
return nil, err
}
break
}
size, err := typeutil.EstimateSizePerRecord(collSchema)
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return nil, err
}
if vectorSize == 0 {
return nil, errors.New("Invalid dimension")
if size == 0 {
return nil, errors.New("Invalid schema")
}
limit := Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(vectorSize)
limit := Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / int64(size)
if Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()%int64(size) != 0 {
limit++
}
//TODO::xige-16 eval vec and string field
return &BufferData{

View File

@ -240,12 +240,10 @@ func (t *compactionTask) merge(
mergeStart := time.Now()
var (
dim int // dimension of float/binary vector field
maxRowsPerBinlog int // maximum rows populating one binlog
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
err error
// statslog generation
pkID UniqueID
@ -300,25 +298,25 @@ func (t *compactionTask) merge(
pkID = fs.GetFieldID()
pkType = fs.GetDataType()
}
if fs.GetDataType() == schemapb.DataType_FloatVector ||
fs.GetDataType() == schemapb.DataType_BinaryVector {
for _, t := range fs.GetTypeParams() {
if t.Key == "dim" {
if dim, err = strconv.Atoi(t.Value); err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, nil, 0, err
}
break
}
}
}
}
// estimate Rows per binlog
// TODO should not convert size to row because we already know the size, this is especially important on varchar types.
size, err := typeutil.EstimateSizePerRecord(meta.GetSchema())
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return nil, nil, 0, err
}
maxRowsPerBinlog = int(Params.DataNodeCfg.BinLogMaxSize.GetAsInt64() / int64(size))
if Params.DataNodeCfg.BinLogMaxSize.GetAsInt64()%int64(size) != 0 {
maxRowsPerBinlog++
}
expired = 0
numRows = 0
numBinlogs = 0
currentTs := t.GetCurrentTime()
maxRowsPerBinlog = int(Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64() / (int64(dim) * 4))
currentRows := 0
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)
@ -327,14 +325,14 @@ func (t *compactionTask) merge(
downloadStart := time.Now()
data, err := t.download(ctxTimeout, path)
if err != nil {
log.Warn("download insertlogs wrong")
log.Warn("download insertlogs wrong", zap.Error(err))
return nil, nil, 0, err
}
downloadTimeCost += time.Since(downloadStart)
iter, err := storage.NewInsertBinlogIterator(data, pkID, pkType)
if err != nil {
log.Warn("new insert binlogs Itr wrong")
log.Warn("new insert binlogs Itr wrong", zap.Error(err))
return nil, nil, 0, err
}
for iter.HasNext() {
@ -370,11 +368,11 @@ func (t *compactionTask) merge(
}
currentRows++
if currentRows == maxRowsPerBinlog {
if currentRows >= maxRowsPerBinlog {
uploadInsertStart := time.Now()
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
@ -392,6 +390,7 @@ func (t *compactionTask) merge(
uploadInsertStart := time.Now()
inPaths, statsPaths, err := t.uploadSingleInsertLog(ctxTimeout, targetSegID, partID, meta, fID2Content, fID2Type)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
}
uploadInsertTimeCost += time.Since(uploadInsertStart)

View File

@ -307,11 +307,11 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
alloc := NewAllocatorFactory(1)
mockbIO := &binlogIO{cm, alloc}
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
flushInsertBufferSize := Params.DataNodeCfg.FlushInsertBufferSize
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = flushInsertBufferSize
Params.DataNodeCfg.BinLogMaxSize = BinLogMaxSize
}()
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "128")
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "128")
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64)

View File

@ -373,7 +373,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
t.Run("Pure auto flush", func(t *testing.T) {
// iBNode.insertBuffer.maxSize = 2
tmp := Params.DataNodeCfg.FlushInsertBufferSize
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16")
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200")
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
@ -465,7 +465,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
t.Run("Auto with manual flush", func(t *testing.T) {
tmp := Params.DataNodeCfg.FlushInsertBufferSize
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16")
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200")
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
@ -607,7 +607,7 @@ func TestRollBF(t *testing.T) {
t.Run("Pure roll BF", func(t *testing.T) {
tmp := Params.DataNodeCfg.FlushInsertBufferSize
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16")
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200")
defer func() {
Params.DataNodeCfg.FlushInsertBufferSize = tmp
}()
@ -697,7 +697,7 @@ func (s *InsertBufferNodeSuit) SetupSuite() {
s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
// change flushing size to 2
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "16")
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "200")
}
func (s *InsertBufferNodeSuit) TearDownSuite() {

View File

@ -1252,7 +1252,7 @@ type dataCoordConfig struct {
SingleCompactionRatioThreshold ParamItem
SingleCompactionDeltaLogMaxSize ParamItem
SingleCompactionExpiredLogMaxSize ParamItem
SingleCompactionBinlogMaxNum ParamItem
SingleCompactionDeltalogMaxNum ParamItem
GlobalCompactionInterval ParamItem
// Garbage Collection
@ -1338,7 +1338,7 @@ func (p *dataCoordConfig) init(base *BaseTable) {
p.MinSegmentToMerge = ParamItem{
Key: "dataCoord.compaction.min.segment",
Version: "2.0.0",
DefaultValue: "4",
DefaultValue: "3",
}
p.MinSegmentToMerge.Init(base.mgr)
@ -1405,12 +1405,12 @@ func (p *dataCoordConfig) init(base *BaseTable) {
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)
p.SingleCompactionBinlogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.binlog.maxnum",
p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "1000",
}
p.SingleCompactionBinlogMaxNum.Init(base.mgr)
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)
p.GlobalCompactionInterval = ParamItem{
Key: "dataCoord.compaction.global.interval",
@ -1464,6 +1464,7 @@ type dataNodeConfig struct {
// segment
FlushInsertBufferSize ParamItem
FlushDeleteBufferBytes ParamItem
BinLogMaxSize ParamItem
SyncPeriod ParamItem
// io concurrency to fetch stats logs
@ -1501,6 +1502,13 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.FlushDeleteBufferBytes.Init(base.mgr)
p.BinLogMaxSize = ParamItem{
Key: "datanode.segment.binlog.maxsize",
Version: "2.0.0",
DefaultValue: "67108864",
}
p.BinLogMaxSize.Init(base.mgr)
p.SyncPeriod = ParamItem{
Key: "datanode.segment.syncPeriod",
Version: "2.0.0",