enhance: Add max file num limit and max file size limit for import (#31497)

The max number of import files per request should not exceed 1024 by
default (configurable).
The import file size allowed for importing should not exceed 16GB by
default (configurable).

issue: https://github.com/milvus-io/milvus/issues/28521

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31533/head
yihao.dai 2024-03-22 18:13:06 +08:00 committed by GitHub
parent 0fe5e90e8b
commit f65a796d18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 38 additions and 0 deletions

View File

@ -442,6 +442,7 @@ dataCoord:
import:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.
enableGarbageCollection: true
gc:
@ -502,6 +503,7 @@ dataNode:
updateChannelCheckpointMaxParallel: 10
import:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.
# Configures the system log output.
log:

View File

@ -232,6 +232,12 @@ func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task
}
totalSize += size
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if totalSize > int64(maxSize) {
return 0, merr.WrapErrImportFailed(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", totalSize, int64(maxSize)))
}
return totalSize, nil
}

View File

@ -5649,6 +5649,11 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg("import request is empty"))
return resp, nil
}
if len(req.Files) > Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(req.Files))))
return resp, nil
}
isBackup := importutilv2.IsBackup(req.GetOptions())
if !isBackup {
// check file type

View File

@ -2557,6 +2557,7 @@ type dataCoordConfig struct {
ImportScheduleInterval ParamItem `refreshable:"true"`
ImportCheckIntervalHigh ParamItem `refreshable:"true"`
ImportCheckIntervalLow ParamItem `refreshable:"true"`
MaxFilesPerImportReq ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
}
@ -3092,6 +3093,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ImportCheckIntervalLow.Init(base.mgr)
p.MaxFilesPerImportReq = ParamItem{
Key: "dataCoord.import.maxImportFileNumPerReq",
Version: "2.4.0",
Doc: "The maximum number of files allowed per single import request.",
DefaultValue: "1024",
PanicIfEmpty: false,
Export: true,
}
p.MaxFilesPerImportReq.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "dataCoord.gracefulStopTimeout",
Version: "2.3.7",
@ -3152,7 +3163,9 @@ type dataNodeConfig struct {
MaxChannelCheckpointsPerRPC ParamItem `refreshable:"true"`
ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"`
// import
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"`
// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
@ -3411,6 +3424,16 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.MaxConcurrentImportTaskNum.Init(base.mgr)
p.MaxImportFileSizeInGB = ParamItem{
Key: "datanode.import.maxImportFileSizeInGB",
Version: "2.4.0",
Doc: "The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.MaxImportFileSizeInGB.Init(base.mgr)
p.L0BatchMemoryRatio = ParamItem{
Key: "datanode.compaction.levelZeroBatchMemoryRatio",
Version: "2.4.0",

View File

@ -383,6 +383,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2*time.Second, Params.ImportScheduleInterval.GetAsDuration(time.Second))
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())
params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
@ -434,6 +435,7 @@ func TestComponentParam(t *testing.T) {
maxConcurrentImportTaskNum := Params.MaxConcurrentImportTaskNum.GetAsInt()
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
})