enhance: [2.4] Limit the maximum number of segments restored and fail the job if saving the binlog fails (#39473)

1. Limit the maximum number of restored segments to 1024.
2. Fail the import job if saving binlog fails.
3. Fail the import job if saving the import task fails to prevent
repeatedly generating dirty importing segments.
4. Update proto.

issue: https://github.com/milvus-io/milvus/issues/39331

pr: https://github.com/milvus-io/milvus/pull/39344

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/39486/head v2.4.21
yihao.dai 2025-01-21 15:55:05 +08:00 committed by GitHub
parent 6111a0dc22
commit 86ae07d705
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 31 additions and 15 deletions

View File

@ -6,7 +6,7 @@ require (
github.com/blang/semver/v4 v4.0.0
github.com/cockroachdb/errors v1.9.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/samber/lo v1.27.0

View File

@ -400,8 +400,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21 h1:Imb0uGFp/kyPPI5f6dCne8GCJIceuQWzI1H20p5aa4c=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.21/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3 h1:ZBpRWhBa7FTFxW4YYVv9AUESoW1Xyb3KNXTzTqfkZmw=
github.com/milvus-io/milvus/pkg v0.0.2-0.20240317152703-17b4938985f3/go.mod h1:jQ2BUZny1COsgv1Qbcv8dmbppW+V9J/c4YQZNb3EOm8=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

4
go.sum
View File

@ -612,8 +612,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=

View File

@ -235,6 +235,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
err = c.imeta.AddTask(t)
if err != nil {
log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...)
updateErr := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if updateErr != nil {
log.Warn("failed to update job state to Failed", zap.Error(updateErr))
}
return
}
log.Info("add new import task", WrapTaskLog(t)...)

View File

@ -258,13 +258,17 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
catalog.ExpectedCalls = nil
catalog.EXPECT().SaveImportTask(mock.Anything).Return(mockErr)
catalog.EXPECT().SaveImportJob(mock.Anything).Return(nil)
s.checker.checkPreImportingJob(job)
importTasks := s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(0, len(importTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState())
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(job.GetJobID()).GetState())
alloc.ExpectedCalls = nil
alloc.EXPECT().allocN(mock.Anything).Return(0, 0, mockErr)
err := s.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
s.NoError(err)
s.checker.checkPreImportingJob(job)
importTasks = s.imeta.GetTaskBy(WithJob(job.GetJobID()), WithType(ImportTaskType))
s.Equal(0, len(importTasks))
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(job.GetJobID()).GetState())

View File

@ -279,8 +279,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason(resp.GetReason()))
err = s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
}
@ -325,7 +324,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(op1, op2)
if err != nil {
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
updateErr := s.imeta.UpdateJob(task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
if updateErr != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(updateErr))
}
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.String("err", err.Error()))...)
return
}
select {

View File

@ -1672,6 +1672,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
return resp, nil
}
if len(files) > paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
return resp, nil
}
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
}

View File

@ -139,7 +139,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
log.Ctx(ctx).Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

View File

@ -306,7 +306,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
log.Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

View File

@ -12,7 +12,7 @@ require (
github.com/expr-lang/expr v1.15.7
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2

View File

@ -503,8 +503,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7 h1:putot5l1gpiucE4CBrYzLoPCAci/BdYFe76GP15xsg4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22-0.20250117031653-a796005be4f7/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22 h1:8CV4LUoo0KEFNSmDyDPkfdTapFrX6SBM64cE7UV5EVY=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.22/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=