Fix delete operation scope to vchannel (#20866)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/20882/head
congqixia 2022-11-29 15:15:15 +08:00 committed by GitHub
parent 938c09679c
commit a8e6d5d47e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 53 deletions

View File

@ -121,13 +121,14 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
assert.NoError(t, err)
dataSyncService.removeFlowGraphsByDeltaChannels([]Channel{defaultDeltaChannel})
replica.removeCollectionVDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 0)
fg, err = dataSyncService.getFlowGraphByDeltaChannel(defaultCollectionID, defaultDeltaChannel)
assert.Nil(t, fg)
assert.Error(t, err)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel})
assert.NoError(t, err)
assert.Len(t, dataSyncService.deltaChannel2FlowGraph, 1)
@ -147,7 +148,7 @@ func TestDataSyncService_DeltaFlowGraphs(t *testing.T) {
t.Run("test addFlowGraphsForDeltaChannels checkReplica Failed", func(t *testing.T) {
err = dataSyncService.metaReplica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDMLChannel}, map[string]string{defaultDMLChannel: defaultDMLChannel})
_, err = dataSyncService.addFlowGraphsForDeltaChannels(defaultCollectionID, []Channel{defaultDeltaChannel}, map[string]string{defaultDeltaChannel: defaultDeltaChannel})
assert.Error(t, err)
dataSyncService.metaReplica.addCollection(defaultCollectionID, genTestCollectionSchema())
})

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
)
@ -41,14 +42,15 @@ var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
// deleteNode is the one of nodes in delta flow graph
type deleteNode struct {
baseNode
collectionID UniqueID
metaReplica ReplicaInterface // historical
vchannel Channel
collectionID UniqueID
metaReplica ReplicaInterface // historical
deltaVchannel Channel
dmlVchannel Channel
}
// Name returns the name of deleteNode
func (dNode *deleteNode) Name() string {
return fmt.Sprintf("dNode-%s", dNode.vchannel)
return fmt.Sprintf("dNode-%s", dNode.deltaVchannel)
}
// Operate handles input messages, do delete operations
@ -86,7 +88,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Debug("delete in historical replica",
zap.String("vchannel", dNode.vchannel),
zap.String("vchannel", dNode.deltaVchannel),
zap.Int64("collectionID", delMsg.CollectionID),
zap.String("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows),
@ -98,10 +100,10 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
)
if dNode.metaReplica.getSegmentNum(segmentTypeSealed) != 0 {
err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData)
err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData, dNode.dmlVchannel)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.vchannel)
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.deltaVchannel)
log.Error(err.Error())
panic(err)
}
@ -116,7 +118,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Warn("failed to get segment",
zap.Int64("collectionID", dNode.collectionID),
zap.Int64("segmentID", segmentID),
zap.String("vchannel", dNode.vchannel),
zap.String("vchannel", dNode.deltaVchannel),
)
continue
}
@ -183,12 +185,12 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])),
zap.Int64("segmentID", segmentID),
zap.String("SegmentType", targetSegment.getType().String()),
zap.String("vchannel", dNode.vchannel))
zap.String("vchannel", dNode.deltaVchannel))
return nil
}
// newDeleteNode returns a new deleteNode
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *deleteNode {
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, deltaVchannel Channel) (*deleteNode, error) {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
@ -196,10 +198,17 @@ func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &deleteNode{
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
vchannel: vchannel,
dmlVChannel, err := funcutil.ConvertChannelName(deltaVchannel, Params.CommonCfg.RootCoordDelta, Params.CommonCfg.RootCoordDml)
if err != nil {
log.Error("failed to convert deltaVChannel to dmlVChannel", zap.String("deltaVChannel", deltaVchannel), zap.Error(err))
return nil, err
}
return &deleteNode{
baseNode: baseNode,
collectionID: collectionID,
metaReplica: metaReplica,
deltaVchannel: deltaVchannel,
dmlVchannel: dmlVChannel,
}, nil
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
@ -33,7 +34,8 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
t.Run("test delete", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -55,8 +57,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
t.Run("test segment delete error", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -79,8 +82,10 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
t.Run("test no target segment", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
err = deleteNode.delete(nil, defaultSegmentID, wg)
@ -89,8 +94,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
t.Run("test invalid segmentType", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -111,8 +117,9 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test operate", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -191,8 +198,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test invalid partitionID", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -217,8 +225,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test collection partition not exist", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -244,8 +253,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test partition not exist", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -270,8 +280,9 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test invalid input length", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
require.NoError(t, err)
deleteNode, err := newDeleteNode(historical, defaultCollectionID, defaultDeltaChannel)
require.NoError(t, err)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
@ -291,4 +302,11 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
msg := []flowgraph.Msg{&dMsg, &dMsg}
deleteNode.Operate(msg)
})
t.Run("test bad deltaChannelName", func(t *testing.T) {
historical, err := genSimpleReplica()
require.NoError(t, err)
_, err = newDeleteNode(historical, defaultCollectionID, defaultDMLChannel)
assert.Error(t, err)
})
}

View File

@ -243,7 +243,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.Int64("collectionID", delMsg.CollectionID),
zap.String("collectionName", delMsg.CollectionName),
zap.Int64("numPKs", delMsg.NumRows))
err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData)
err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData, iNode.vchannel)
if err != nil {
// error occurs when missing meta info or unexpected pk type, should not happen
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s, vchannel: %s", delMsg.CollectionID, err, iNode.vchannel)
@ -303,7 +303,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// processDeleteMessages would execute delete operations for growing segments
func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *msgstream.DeleteMsg, delData *deleteData) error {
func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *msgstream.DeleteMsg, delData *deleteData, vchannelName string) error {
var partitionIDs []UniqueID
var err error
if msg.PartitionID != -1 {
@ -318,17 +318,13 @@ func processDeleteMessages(replica ReplicaInterface, segType segmentType, msg *m
return err
}
}
resultSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
segmentIDs, err := replica.getSegmentIDs(partitionID, segType)
if err != nil {
// Skip this partition
if errors.Is(err, ErrPartitionNotFound) {
continue
}
var resultSegmentIDs []UniqueID
resultSegmentIDs, err = replica.getSegmentIDsByVChannel(partitionIDs, vchannelName, segType)
log.Warn("processDeleteMessage", zap.String("vchannel", vchannelName), zap.Int64s("segmentIDs", resultSegmentIDs), zap.Int64s("paritions", partitionIDs))
if err != nil {
if !errors.Is(err, ErrPartitionNotFound) {
return err
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
}
primaryKeys := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys)

View File

@ -198,7 +198,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
}
func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) {
t.Run("test processDeleteMessages", func(t *testing.T) {
t.Run("test processDeleteMessages growing", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
@ -206,11 +206,11 @@ func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) {
dData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData)
err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData, defaultChannelName)
assert.NoError(t, err)
})
t.Run("test processDeleteMessages", func(t *testing.T) {
t.Run("test processDeleteMessages sealed", func(t *testing.T) {
streaming, err := genSimpleReplica()
assert.NoError(t, err)
@ -218,7 +218,7 @@ func TestFlowGraphInsertNode_processDeleteMessages(t *testing.T) {
dData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
err = processDeleteMessages(streaming, segmentTypeGrowing, dMsg, dData)
err = processDeleteMessages(streaming, segmentTypeSealed, dMsg, dData, defaultChannelName)
assert.NoError(t, err)
})
}

View File

@ -144,7 +144,10 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
return nil, err
}
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, vchannel)
var deleteNode node = newDeleteNode(metaReplica, collectionID, vchannel)
deleteNode, err := newDeleteNode(metaReplica, collectionID, vchannel)
if err != nil {
return nil, err
}
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, vchannel)
q.flowGraph.AddNode(dmStreamNode)

View File

@ -73,8 +73,8 @@ const (
defaultMetricType = L2
defaultNQ = 10
defaultDMLChannel = "query-node-unittest-DML-0"
defaultDeltaChannel = "query-node-unittest-delta-channel-0"
defaultDMLChannel = "by-dev-rootcoord-dml-DML-0"
defaultDeltaChannel = "by-dev-rootcoord-delta-channel-0"
defaultSubName = "query-node-unittest-sub-name-0"
defaultVersion = 1

View File

@ -722,6 +722,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
stream.Close()
}()
vchannelName := position.ChannelName
pChannelName := funcutil.ToPhysicalChannel(position.ChannelName)
position.ChannelName = pChannelName
@ -796,7 +797,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
if dmsg.CollectionID != collectionID {
continue
}
err = processDeleteMessages(loader.metaReplica, segmentTypeSealed, dmsg, delData)
err = processDeleteMessages(loader.metaReplica, segmentTypeSealed, dmsg, delData, vchannelName)
if err != nil {
// TODO: panic?
// error occurs when missing meta info or unexpected pk type, should not happen