mirror of https://github.com/milvus-io/milvus.git
issue: https://github.com/milvus-io/milvus/issues/36890 pr: https://github.com/milvus-io/milvus/pull/36891 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/36973/head
parent
7847b662d4
commit
4e0f5845a1
|
@ -594,6 +594,7 @@ dataCoord:
|
|||
checkIntervalHigh: 2 # The interval for checking import, measured in seconds, is set to a high frequency for the import checker.
|
||||
checkIntervalLow: 120 # The interval for checking import, measured in seconds, is set to a low frequency for the import checker.
|
||||
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
|
||||
maxImportJobNum: 1024 # Maximum number of import jobs that are executing or pending.
|
||||
waitForIndex: true # Indicates whether the import operation waits for the completion of index building.
|
||||
gracefulStopTimeout: 5 # seconds. force stop node without graceful stop
|
||||
slot:
|
||||
|
|
|
@ -39,6 +39,28 @@ func WithCollectionID(collectionID int64) ImportJobFilter {
|
|||
}
|
||||
}
|
||||
|
||||
func WithJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
|
||||
return func(job ImportJob) bool {
|
||||
for _, state := range states {
|
||||
if job.GetState() == state {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
|
||||
return func(job ImportJob) bool {
|
||||
for _, state := range states {
|
||||
if job.GetState() == state {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
type UpdateJobAction func(job ImportJob)
|
||||
|
||||
func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {
|
||||
|
|
|
@ -27,6 +27,7 @@ type ImportMeta interface {
|
|||
UpdateJob(jobID int64, actions ...UpdateJobAction) error
|
||||
GetJob(jobID int64) ImportJob
|
||||
GetJobBy(filters ...ImportJobFilter) []ImportJob
|
||||
CountJobBy(filters ...ImportJobFilter) int
|
||||
RemoveJob(jobID int64) error
|
||||
|
||||
AddTask(task ImportTask) error
|
||||
|
@ -124,6 +125,10 @@ func (m *importMeta) GetJob(jobID int64) ImportJob {
|
|||
func (m *importMeta) GetJobBy(filters ...ImportJobFilter) []ImportJob {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.getJobBy(filters...)
|
||||
}
|
||||
|
||||
func (m *importMeta) getJobBy(filters ...ImportJobFilter) []ImportJob {
|
||||
ret := make([]ImportJob, 0)
|
||||
OUTER:
|
||||
for _, job := range m.jobs {
|
||||
|
@ -137,6 +142,12 @@ OUTER:
|
|||
return ret
|
||||
}
|
||||
|
||||
func (m *importMeta) CountJobBy(filters ...ImportJobFilter) int {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return len(m.getJobBy(filters...))
|
||||
}
|
||||
|
||||
func (m *importMeta) RemoveJob(jobID int64) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
@ -70,7 +72,7 @@ func TestImportMeta_Restore(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestImportMeta_ImportJob(t *testing.T) {
|
||||
func TestImportMeta_Job(t *testing.T) {
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().ListImportJobs().Return(nil, nil)
|
||||
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
|
||||
|
@ -81,54 +83,66 @@ func TestImportMeta_ImportJob(t *testing.T) {
|
|||
im, err := NewImportMeta(catalog)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var job ImportJob = &importJob{
|
||||
ImportJob: &datapb.ImportJob{
|
||||
JobID: 0,
|
||||
CollectionID: 1,
|
||||
PartitionIDs: []int64{2},
|
||||
Vchannels: []string{"ch0"},
|
||||
State: internalpb.ImportJobState_Pending,
|
||||
},
|
||||
jobIDs := []int64{1000, 2000, 3000}
|
||||
|
||||
for i, jobID := range jobIDs {
|
||||
var job ImportJob = &importJob{
|
||||
ImportJob: &datapb.ImportJob{
|
||||
JobID: jobID,
|
||||
CollectionID: rand.Int63(),
|
||||
PartitionIDs: []int64{rand.Int63()},
|
||||
Vchannels: []string{fmt.Sprintf("ch-%d", rand.Int63())},
|
||||
State: internalpb.ImportJobState_Pending,
|
||||
},
|
||||
}
|
||||
err = im.AddJob(job)
|
||||
assert.NoError(t, err)
|
||||
ret := im.GetJob(jobID)
|
||||
assert.Equal(t, job, ret)
|
||||
jobs := im.GetJobBy()
|
||||
assert.Equal(t, i+1, len(jobs))
|
||||
|
||||
// Add again, test idempotency
|
||||
err = im.AddJob(job)
|
||||
assert.NoError(t, err)
|
||||
ret = im.GetJob(jobID)
|
||||
assert.Equal(t, job, ret)
|
||||
jobs = im.GetJobBy()
|
||||
assert.Equal(t, i+1, len(jobs))
|
||||
}
|
||||
|
||||
err = im.AddJob(job)
|
||||
assert.NoError(t, err)
|
||||
jobs := im.GetJobBy()
|
||||
assert.Equal(t, 3, len(jobs))
|
||||
|
||||
err = im.UpdateJob(jobIDs[0], UpdateJobState(internalpb.ImportJobState_Completed))
|
||||
assert.NoError(t, err)
|
||||
job0 := im.GetJob(jobIDs[0])
|
||||
assert.NotNil(t, job0)
|
||||
assert.Equal(t, internalpb.ImportJobState_Completed, job0.GetState())
|
||||
|
||||
err = im.UpdateJob(jobIDs[1], UpdateJobState(internalpb.ImportJobState_Importing))
|
||||
assert.NoError(t, err)
|
||||
job1 := im.GetJob(jobIDs[1])
|
||||
assert.NotNil(t, job1)
|
||||
assert.Equal(t, internalpb.ImportJobState_Importing, job1.GetState())
|
||||
|
||||
jobs = im.GetJobBy(WithJobStates(internalpb.ImportJobState_Pending))
|
||||
assert.Equal(t, 1, len(jobs))
|
||||
err = im.AddJob(job)
|
||||
jobs = im.GetJobBy(WithoutJobStates(internalpb.ImportJobState_Pending))
|
||||
assert.Equal(t, 2, len(jobs))
|
||||
count := im.CountJobBy()
|
||||
assert.Equal(t, 3, count)
|
||||
count = im.CountJobBy(WithJobStates(internalpb.ImportJobState_Pending))
|
||||
assert.Equal(t, 1, count)
|
||||
count = im.CountJobBy(WithoutJobStates(internalpb.ImportJobState_Pending))
|
||||
assert.Equal(t, 2, count)
|
||||
|
||||
err = im.RemoveJob(jobIDs[0])
|
||||
assert.NoError(t, err)
|
||||
jobs = im.GetJobBy()
|
||||
assert.Equal(t, 1, len(jobs))
|
||||
|
||||
assert.Nil(t, job.GetSchema())
|
||||
err = im.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
|
||||
assert.NoError(t, err)
|
||||
job2 := im.GetJob(job.GetJobID())
|
||||
assert.Equal(t, internalpb.ImportJobState_Completed, job2.GetState())
|
||||
assert.Equal(t, job.GetJobID(), job2.GetJobID())
|
||||
assert.Equal(t, job.GetCollectionID(), job2.GetCollectionID())
|
||||
assert.Equal(t, job.GetPartitionIDs(), job2.GetPartitionIDs())
|
||||
assert.Equal(t, job.GetVchannels(), job2.GetVchannels())
|
||||
|
||||
err = im.RemoveJob(job.GetJobID())
|
||||
assert.NoError(t, err)
|
||||
jobs = im.GetJobBy()
|
||||
assert.Equal(t, 0, len(jobs))
|
||||
|
||||
// test failed
|
||||
mockErr := errors.New("mock err")
|
||||
catalog = mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().SaveImportJob(mock.Anything).Return(mockErr)
|
||||
catalog.EXPECT().DropImportJob(mock.Anything).Return(mockErr)
|
||||
im.(*importMeta).catalog = catalog
|
||||
|
||||
err = im.AddJob(job)
|
||||
assert.Error(t, err)
|
||||
im.(*importMeta).jobs[job.GetJobID()] = job
|
||||
err = im.UpdateJob(job.GetJobID())
|
||||
assert.Error(t, err)
|
||||
err = im.RemoveJob(job.GetJobID())
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, 2, len(jobs))
|
||||
count = im.CountJobBy()
|
||||
assert.Equal(t, 2, count)
|
||||
}
|
||||
|
||||
func TestImportMeta_ImportTask(t *testing.T) {
|
||||
|
@ -189,19 +203,37 @@ func TestImportMeta_ImportTask(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
tasks = im.GetTaskBy()
|
||||
assert.Equal(t, 1, len(tasks))
|
||||
}
|
||||
|
||||
// test failed
|
||||
func TestImportMeta_Task_Failed(t *testing.T) {
|
||||
mockErr := errors.New("mock err")
|
||||
catalog = mocks.NewDataCoordCatalog(t)
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().ListImportJobs().Return(nil, nil)
|
||||
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
|
||||
catalog.EXPECT().ListImportTasks().Return(nil, nil)
|
||||
catalog.EXPECT().SaveImportTask(mock.Anything).Return(mockErr)
|
||||
catalog.EXPECT().DropImportTask(mock.Anything).Return(mockErr)
|
||||
|
||||
im, err := NewImportMeta(catalog)
|
||||
assert.NoError(t, err)
|
||||
im.(*importMeta).catalog = catalog
|
||||
|
||||
err = im.AddTask(task1)
|
||||
task := &importTask{
|
||||
ImportTaskV2: &datapb.ImportTaskV2{
|
||||
JobID: 1,
|
||||
TaskID: 2,
|
||||
CollectionID: 3,
|
||||
SegmentIDs: []int64{5, 6},
|
||||
NodeID: 7,
|
||||
State: datapb.ImportTaskStateV2_Pending,
|
||||
},
|
||||
}
|
||||
|
||||
err = im.AddTask(task)
|
||||
assert.Error(t, err)
|
||||
im.(*importMeta).tasks[task1.GetTaskID()] = task1
|
||||
err = im.UpdateTask(task1.GetTaskID(), UpdateNodeID(9))
|
||||
im.(*importMeta).tasks[task.GetTaskID()] = task
|
||||
err = im.UpdateTask(task.GetTaskID(), UpdateNodeID(9))
|
||||
assert.Error(t, err)
|
||||
err = im.RemoveTask(task1.GetTaskID())
|
||||
err = im.RemoveTask(task.GetTaskID())
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
@ -1665,12 +1665,24 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||
return len(file.GetPaths()) > 0
|
||||
})
|
||||
if len(files) == 0 {
|
||||
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
|
||||
return resp, nil
|
||||
}
|
||||
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
|
||||
}
|
||||
|
||||
// Check if the number of jobs exceeds the limit.
|
||||
maxNum := paramtable.Get().DataCoordCfg.MaxImportJobNum.GetAsInt()
|
||||
executingNum := s.importMeta.CountJobBy(WithoutJobStates(internalpb.ImportJobState_Completed, internalpb.ImportJobState_Failed))
|
||||
if executingNum >= maxNum {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(
|
||||
fmt.Sprintf("The number of jobs has reached the limit, please try again later. " +
|
||||
"If your request is set to only import a single file, " +
|
||||
"please consider importing multiple files in one request for better efficiency.")))
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Allocate file ids.
|
||||
idStart, _, err := s.allocator.allocN(int64(len(files)) + 1)
|
||||
if err != nil {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("alloc id failed, err=%w", err)))
|
||||
|
|
|
@ -1618,6 +1618,12 @@ func TestImportV2(t *testing.T) {
|
|||
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
|
||||
|
||||
// alloc failed
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().ListImportJobs().Return(nil, nil)
|
||||
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
|
||||
catalog.EXPECT().ListImportTasks().Return(nil, nil)
|
||||
s.importMeta, err = NewImportMeta(catalog)
|
||||
assert.NoError(t, err)
|
||||
alloc := NewNMockAllocator(t)
|
||||
alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr)
|
||||
s.allocator = alloc
|
||||
|
@ -1629,7 +1635,7 @@ func TestImportV2(t *testing.T) {
|
|||
s.allocator = alloc
|
||||
|
||||
// add job failed
|
||||
catalog := mocks.NewDataCoordCatalog(t)
|
||||
catalog = mocks.NewDataCoordCatalog(t)
|
||||
catalog.EXPECT().ListImportJobs().Return(nil, nil)
|
||||
catalog.EXPECT().ListPreImportTasks().Return(nil, nil)
|
||||
catalog.EXPECT().ListImportTasks().Return(nil, nil)
|
||||
|
@ -1666,6 +1672,13 @@ func TestImportV2(t *testing.T) {
|
|||
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
|
||||
jobs = s.importMeta.GetJobBy()
|
||||
assert.Equal(t, 1, len(jobs))
|
||||
|
||||
// number of jobs reached the limit
|
||||
Params.Save(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key, "1")
|
||||
resp, err = s.ImportV2(ctx, &internalpb.ImportRequestInternal{})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))
|
||||
Params.Reset(paramtable.Get().DataCoordCfg.MaxImportJobNum.Key)
|
||||
})
|
||||
|
||||
t.Run("GetImportProgress", func(t *testing.T) {
|
||||
|
|
|
@ -3227,6 +3227,7 @@ type dataCoordConfig struct {
|
|||
ImportCheckIntervalHigh ParamItem `refreshable:"true"`
|
||||
ImportCheckIntervalLow ParamItem `refreshable:"true"`
|
||||
MaxFilesPerImportReq ParamItem `refreshable:"true"`
|
||||
MaxImportJobNum ParamItem `refreshable:"true"`
|
||||
WaitForIndex ParamItem `refreshable:"true"`
|
||||
|
||||
GracefulStopTimeout ParamItem `refreshable:"true"`
|
||||
|
@ -3990,6 +3991,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
}
|
||||
p.MaxFilesPerImportReq.Init(base.mgr)
|
||||
|
||||
p.MaxImportJobNum = ParamItem{
|
||||
Key: "dataCoord.import.maxImportJobNum",
|
||||
Version: "2.4.14",
|
||||
Doc: "Maximum number of import jobs that are executing or pending.",
|
||||
DefaultValue: "1024",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
}
|
||||
p.MaxImportJobNum.Init(base.mgr)
|
||||
|
||||
p.WaitForIndex = ParamItem{
|
||||
Key: "dataCoord.import.waitForIndex",
|
||||
Version: "2.4.0",
|
||||
|
|
|
@ -474,6 +474,7 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
|
||||
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())
|
||||
assert.Equal(t, 1024, Params.MaxImportJobNum.GetAsInt())
|
||||
assert.Equal(t, true, Params.WaitForIndex.GetAsBool())
|
||||
|
||||
params.Save("datacoord.gracefulStopTimeout", "100")
|
||||
|
|
Loading…
Reference in New Issue