Remove frequent bulkload log (#21822)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/21798/head
Xiaofan 2023-01-20 00:27:45 +08:00 committed by GitHub
parent e7558571fe
commit 709bf35b9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 11 deletions

View File

@ -40,8 +40,7 @@ import (
)
const (
MaxPendingCount = 65536 // TODO: Make this configurable.
delimiter = "/"
delimiter = "/"
)
// checkPendingTasksInterval is the default interval to check and send out pending tasks,
@ -94,7 +93,7 @@ func newImportManager(ctx context.Context, client kv.TxnKV,
mgr := &importManager{
ctx: ctx,
taskStore: client,
pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32
pendingTasks: make([]*datapb.ImportTaskInfo, 0, Params.RootCoordCfg.ImportMaxPendingTaskCount), // currently task queue max size is 32
workingTasks: make(map[int64]*datapb.ImportTaskInfo),
busyNodes: make(map[int64]int64),
pendingLock: sync.RWMutex{},

View File

@ -541,6 +541,7 @@ func TestImportManager_ImportJob(t *testing.T) {
return globalCount, 0, nil
}
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
Params.RootCoordCfg.ImportMaxPendingTaskCount = 16
colID := int64(100)
mockKv := memkv.NewMemoryKV()
callMarkSegmentsDropped := func(ctx context.Context, segIDs []typeutil.UniqueID) (*commonpb.Status, error) {
@ -652,9 +653,9 @@ func TestImportManager_ImportJob(t *testing.T) {
// the pending list already has one task
// once task count exceeds MaxPendingCount, return error
for i := 0; i <= MaxPendingCount; i++ {
for i := 0; i <= Params.RootCoordCfg.ImportMaxPendingTaskCount; i++ {
resp = mgr.importJob(context.TODO(), rowReq, colID, 0)
if i < MaxPendingCount-1 {
if i < Params.RootCoordCfg.ImportMaxPendingTaskCount-1 {
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
} else {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)

View File

@ -189,9 +189,13 @@ func Test_scheduler_updateDdlMinTsLoop(t *testing.T) {
Params.ProxyCfg.TimeTickInterval = time.Millisecond
s.Start()
time.Sleep(time.Millisecond * 4)
assert.Greater(t, s.GetMinDdlTs(), Timestamp(100))
for i := 0; i < 100; i++ {
if s.GetMinDdlTs() > Timestamp(100) {
break
}
assert.True(t, i < 100)
time.Sleep(time.Millisecond)
}
// add task to queue.
n := 10

View File

@ -500,8 +500,11 @@ type rootCoordConfig struct {
DmlChannelNum int64
MaxPartitionNum int64
MinSegmentSizeToEnableIndex int64
ImportTaskExpiration float64
ImportTaskRetention float64
// IMPORT
ImportMaxPendingTaskCount int
ImportTaskExpiration float64
ImportTaskRetention float64
// --- ETCD Path ---
ImportTaskSubPath string
@ -519,7 +522,8 @@ func (p *rootCoordConfig) init(base *BaseTable) {
p.MinSegmentSizeToEnableIndex = p.Base.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024)
p.ImportTaskExpiration = p.Base.ParseFloatWithDefault("rootCoord.importTaskExpiration", 15*60)
p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 24*60*60)
p.ImportTaskSubPath = "importtask"
p.ImportMaxPendingTaskCount = p.Base.ParseIntWithDefault("rootCoord.importMaxPendingTaskCount", 65536)
p.ImportTaskSubPath = p.Base.LoadWithDefault("rootCoord.importTaskSubPath", "importtask")
p.EnableActiveStandby = p.Base.ParseBool("rootCoord.enableActiveStandby", false)
p.NodeID.Store(UniqueID(0))
}