Fix upsert msgid (#22839)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/22884/head
smellthemoon 2023-03-21 14:06:01 +08:00 committed by GitHub
parent 8259ca6929
commit 4c603cd02c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 35 additions and 11 deletions

View File

@ -2184,8 +2184,9 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
// RowData: transfer column based request to this
},
},
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
chMgr: node.chMgr,
chTicker: node.chTicker,
}
log.Debug("Enqueue delete request in Proxy",

View File

@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
@ -12,6 +13,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
@ -37,6 +39,7 @@ type deleteTask struct {
vChannels []vChan
pChannels []pChan
idAllocator *allocator.IDAllocator
collectionID UniqueID
schema *schemapb.CollectionSchema
}
@ -252,10 +255,16 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) {
ts := dt.deleteMsg.Timestamps[index]
_, ok := result[key]
if !ok {
msgid, err := dt.idAllocator.AllocOne()
if err != nil {
return errors.Wrap(err, "failed to allocate MsgID of delete")
}
sliceRequest := msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
commonpbutil.WithMsgID(dt.deleteMsg.Base.MsgID),
// msgid of delete msg must be set
// or it will be seen as duplicated msg in mq
commonpbutil.WithMsgID(msgid),
commonpbutil.WithTimeStamp(ts),
commonpbutil.WithSourceID(proxyID),
),

View File

@ -1425,8 +1425,9 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
PartitionName: partitionName,
},
},
deleteExpr: "int64 in [0, 1]",
ctx: ctx,
idAllocator: idAllocator,
deleteExpr: "int64 in [0, 1]",
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -1479,8 +1480,9 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
PartitionName: partitionName,
},
},
deleteExpr: "int64 not in [0, 1]",
ctx: ctx,
idAllocator: idAllocator,
deleteExpr: "int64 not in [0, 1]",
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -1768,8 +1770,9 @@ func TestTask_VarCharPrimaryKey(t *testing.T) {
PartitionName: partitionName,
},
},
deleteExpr: "varChar in [\"milvus\", \"test\"]",
ctx: ctx,
idAllocator: idAllocator,
deleteExpr: "varChar in [\"milvus\", \"test\"]",
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -1822,8 +1825,9 @@ func TestTask_VarCharPrimaryKey(t *testing.T) {
PartitionName: partitionName,
},
},
deleteExpr: "varChar not in [\"milvus\", \"test\"]",
ctx: ctx,
idAllocator: idAllocator,
deleteExpr: "varChar not in [\"milvus\", \"test\"]",
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -20,6 +20,7 @@ import (
"fmt"
"strconv"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
@ -415,10 +416,18 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP
ts := it.upsertMsg.DeleteMsg.Timestamps[index]
_, ok := result[key]
if !ok {
msgid, err := it.idAllocator.AllocOne()
if err != nil {
errors.Wrap(err, "failed to allocate MsgID for delete of upsert")
}
sliceRequest := msgpb.DeleteRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Delete),
commonpbutil.WithTimeStamp(ts),
// id of upsertTask were set as ts in scheduler
// msgid of delete msg must be set
// or it will be seen as duplicated msg in mq
commonpbutil.WithMsgID(msgid),
commonpbutil.WithSourceID(proxyID),
),
CollectionID: collectionID,

View File

@ -108,6 +108,7 @@ func (s *ErrSuite) TestWrap() {
// Metrics related
s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound)
}
func (s *ErrSuite) TestCombine() {