mirror of https://github.com/milvus-io/milvus.git
Add channel information to flow graph node (#17349)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/17571/head
parent
51a9e54b21
commit
875b6f88b0
|
@ -42,6 +42,7 @@ type deleteNode struct {
|
|||
baseNode
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface // historical
|
||||
channel Channel
|
||||
}
|
||||
|
||||
// Name returns the name of deleteNode
|
||||
|
@ -86,7 +87,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
collection, err := dNode.metaReplica.getCollectionByID(dNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", dNode.Name(), dNode.collectionID))
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", dNode.Name(), dNode.collectionID, dNode.channel))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
@ -95,6 +96,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
for i, delMsg := range dMsg.deleteMessages {
|
||||
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||
log.Debug("delete in historical replica",
|
||||
zap.String("channel", dNode.channel),
|
||||
zap.Any("collectionID", delMsg.CollectionID),
|
||||
zap.Any("collectionName", delMsg.CollectionName),
|
||||
zap.Int64("numPKs", delMsg.NumRows),
|
||||
|
@ -109,7 +111,7 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
err := processDeleteMessages(dNode.metaReplica, segmentTypeSealed, delMsg, delData)
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or unexpected pk type, should not happen
|
||||
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err)
|
||||
err = fmt.Errorf("deleteNode processDeleteMessages failed, collectionID = %d, err = %s, channel = %s", delMsg.CollectionID, err, dNode.channel)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
|
@ -177,12 +179,12 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
|
|||
return fmt.Errorf("segmentDelete failed, segmentID = %d", segmentID)
|
||||
}
|
||||
|
||||
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.Any("SegmentType", targetSegment.segmentType))
|
||||
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID), zap.Any("SegmentType", targetSegment.segmentType), zap.String("channel", dNode.channel))
|
||||
return nil
|
||||
}
|
||||
|
||||
// newDeleteNode returns a new deleteNode
|
||||
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID) *deleteNode {
|
||||
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *deleteNode {
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
||||
|
@ -194,5 +196,6 @@ func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID) *deleteN
|
|||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
channel: channel,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test delete", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -53,7 +53,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test segment delete error", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -75,7 +75,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = deleteNode.delete(nil, defaultSegmentID, wg)
|
||||
|
@ -85,7 +85,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test invalid segmentType", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -105,7 +105,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test operate", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -139,7 +139,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test invalid partitionID", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -163,7 +163,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test collection partition not exist", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -190,7 +190,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test partition not exist", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -216,7 +216,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test invalid input length", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID, defaultChannelName)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
|
|
@ -35,6 +35,7 @@ type filterDeleteNode struct {
|
|||
baseNode
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface
|
||||
channel Channel
|
||||
}
|
||||
|
||||
// Name returns the name of filterDeleteNode
|
||||
|
@ -81,7 +82,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
collection, err := fddNode.metaReplica.getCollectionByID(fddNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fddNode.Name(), fddNode.collectionID))
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel = %s", fddNode.Name(), fddNode.collectionID, fddNode.channel))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
@ -92,7 +93,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType())
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
|
||||
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s, collection = %d, channel = %s", err, fddNode.collectionID, fddNode.channel)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
|
@ -100,7 +101,10 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg)
|
||||
}
|
||||
default:
|
||||
log.Warn("invalid message type in filterDeleteNode", zap.String("message type", msg.Type().String()))
|
||||
log.Warn("invalid message type in filterDeleteNode",
|
||||
zap.String("message type", msg.Type().String()),
|
||||
zap.Int64("collection", fddNode.collectionID),
|
||||
zap.String("channel", fddNode.channel))
|
||||
}
|
||||
}
|
||||
var res Msg = &dMsg
|
||||
|
@ -126,6 +130,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
|
|||
|
||||
if len(msg.Timestamps) <= 0 {
|
||||
log.Debug("filter invalid delete message, no message",
|
||||
zap.String("channel", fddNode.channel),
|
||||
zap.Any("collectionID", msg.CollectionID),
|
||||
zap.Any("partitionID", msg.PartitionID))
|
||||
return nil, nil
|
||||
|
@ -141,7 +146,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
|
|||
}
|
||||
|
||||
// newFilteredDeleteNode returns a new filterDeleteNode
|
||||
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID) *filterDeleteNode {
|
||||
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *filterDeleteNode {
|
||||
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
@ -154,5 +159,6 @@ func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID)
|
|||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
channel: channel,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ func getFilterDeleteNode() (*filterDeleteNode, error) {
|
|||
}
|
||||
|
||||
historical.addExcludedSegments(defaultCollectionID, nil)
|
||||
return newFilteredDeleteNode(historical, defaultCollectionID), nil
|
||||
return newFilteredDeleteNode(historical, defaultCollectionID, defaultChannelName), nil
|
||||
}
|
||||
|
||||
func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) {
|
||||
|
|
|
@ -35,6 +35,7 @@ type filterDmNode struct {
|
|||
baseNode
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface
|
||||
channel Channel
|
||||
}
|
||||
|
||||
// Name returns the name of filterDmNode
|
||||
|
@ -82,7 +83,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
collection, err := fdmNode.metaReplica.getCollectionByID(fdmNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fdmNode.Name(), fdmNode.collectionID))
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", fdmNode.Name(), fdmNode.collectionID, fdmNode.channel))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
@ -96,7 +97,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("channel", fdmNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
if resMsg != nil {
|
||||
|
@ -107,14 +108,17 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", fdmNode.collectionID), zap.String("channel", fdmNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
if resMsg != nil {
|
||||
iMsg.deleteMessages = append(iMsg.deleteMessages, resMsg)
|
||||
}
|
||||
default:
|
||||
log.Warn("invalid message type in filterDmNode", zap.String("message type", msg.Type().String()))
|
||||
log.Warn("invalid message type in filterDmNode",
|
||||
zap.String("message type", msg.Type().String()),
|
||||
zap.Int64("collection", fdmNode.collectionID),
|
||||
zap.String("channel", fdmNode.channel))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,6 +137,7 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
|
|||
|
||||
if len(msg.Timestamps) <= 0 {
|
||||
log.Debug("filter invalid delete message, no message",
|
||||
zap.String("channel", fdmNode.channel),
|
||||
zap.Any("collectionID", msg.CollectionID),
|
||||
zap.Any("partitionID", msg.PartitionID))
|
||||
return nil, nil
|
||||
|
@ -165,6 +170,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
|||
|
||||
if len(msg.Timestamps) <= 0 {
|
||||
log.Debug("filter invalid insert message, no message",
|
||||
zap.String("channel", fdmNode.channel),
|
||||
zap.Any("collectionID", msg.CollectionID),
|
||||
zap.Any("partitionID", msg.PartitionID))
|
||||
return nil, nil
|
||||
|
@ -201,12 +207,14 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
|||
// unFlushed segment may not have checkPoint, so `segmentInfo.DmlPosition` may be nil
|
||||
if segmentInfo.DmlPosition == nil {
|
||||
log.Warn("filter unFlushed segment without checkPoint",
|
||||
zap.String("channel", fdmNode.channel),
|
||||
zap.Any("collectionID", msg.CollectionID),
|
||||
zap.Any("partitionID", msg.PartitionID))
|
||||
continue
|
||||
}
|
||||
if msg.SegmentID == segmentInfo.ID && msg.EndTs() < segmentInfo.DmlPosition.Timestamp {
|
||||
log.Debug("filter invalid insert message, segments are excluded segments",
|
||||
zap.String("channel", fdmNode.channel),
|
||||
zap.Any("collectionID", msg.CollectionID),
|
||||
zap.Any("partitionID", msg.PartitionID))
|
||||
return nil, nil
|
||||
|
@ -217,7 +225,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
|||
}
|
||||
|
||||
// newFilteredDmNode returns a new filterDmNode
|
||||
func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID) *filterDmNode {
|
||||
func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *filterDmNode {
|
||||
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
@ -230,5 +238,6 @@ func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID) *fil
|
|||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
channel: channel,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func getFilterDMNode() (*filterDmNode, error) {
|
|||
}
|
||||
|
||||
streaming.addExcludedSegments(defaultCollectionID, nil)
|
||||
return newFilteredDmNode(streaming, defaultCollectionID), nil
|
||||
return newFilteredDmNode(streaming, defaultCollectionID, defaultChannelName), nil
|
||||
}
|
||||
|
||||
func TestFlowGraphFilterDmNode_filterDmNode(t *testing.T) {
|
||||
|
|
|
@ -45,6 +45,7 @@ type insertNode struct {
|
|||
baseNode
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface // streaming
|
||||
channel Channel
|
||||
}
|
||||
|
||||
// insertData stores the valid insert data
|
||||
|
@ -107,7 +108,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", iNode.Name(), iNode.collectionID))
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d, channel: %s", iNode.Name(), iNode.collectionID, iNode.channel))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
@ -124,7 +125,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs only when collection cannot be found, should not happen
|
||||
err = fmt.Errorf("insertNode addPartition failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
@ -140,7 +141,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when collection or partition cannot be found, collection and partition should be created before
|
||||
err = fmt.Errorf("insertNode addSegment failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +150,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// occurs only when schema doesn't have dim param, this should not happen
|
||||
err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
@ -164,7 +165,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when cannot find collection or data is misaligned, should not happen
|
||||
err = fmt.Errorf("failed to get primary keys, err = %d", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...)
|
||||
|
@ -176,7 +177,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// should not happen, segment should be created before
|
||||
err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
|
||||
|
@ -186,11 +187,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when cgo function `PreInsert` failed
|
||||
err = fmt.Errorf("segmentPreInsert failed, segmentID = %d, err = %s", segmentID, err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
iData.insertOffset[segmentID] = offset
|
||||
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID))
|
||||
log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
targetSegment.updateBloomFilter(iData.insertPKs[segmentID])
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +206,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if err != nil {
|
||||
// error occurs when segment cannot be found or cgo function `Insert` failed
|
||||
err = fmt.Errorf("segment insert failed, segmentID = %d, err = %s", segmentID, err)
|
||||
log.Error(err.Error())
|
||||
log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel))
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
@ -221,13 +222,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
for _, delMsg := range iMsg.deleteMessages {
|
||||
if iNode.metaReplica.getSegmentNum(segmentTypeGrowing) != 0 {
|
||||
log.Debug("delete in streaming replica",
|
||||
zap.String("channel", iNode.channel),
|
||||
zap.Any("collectionID", delMsg.CollectionID),
|
||||
zap.Any("collectionName", delMsg.CollectionName),
|
||||
zap.Int64("numPKs", delMsg.NumRows))
|
||||
err := processDeleteMessages(iNode.metaReplica, segmentTypeGrowing, delMsg, delData)
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or unexpected pk type, should not happen
|
||||
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s", delMsg.CollectionID, err)
|
||||
err = fmt.Errorf("insertNode processDeleteMessages failed, collectionID = %d, err = %s, channel: %s", delMsg.CollectionID, err, iNode.channel)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
|
@ -503,7 +505,7 @@ func getPKsFromColumnBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.C
|
|||
}
|
||||
|
||||
// newInsertNode returns a new insertNode
|
||||
func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID) *insertNode {
|
||||
func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID, channel Channel) *insertNode {
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
||||
|
@ -515,5 +517,6 @@ func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID) *insertN
|
|||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
channel: channel,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func getInsertNode() (*insertNode, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return newInsertNode(streaming, defaultCollectionID), nil
|
||||
return newInsertNode(streaming, defaultCollectionID, defaultDMLChannel), nil
|
||||
}
|
||||
|
||||
func genFlowGraphInsertData(schema *schemapb.CollectionSchema, numRows int) (*insertData, error) {
|
||||
|
@ -128,7 +128,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = insertNode.insert(nil, defaultSegmentID, wg)
|
||||
|
@ -203,7 +203,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = insertNode.delete(nil, defaultSegmentID, wg)
|
||||
|
@ -338,7 +338,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
|
|||
t.Run("test getCollectionByID failed", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID, defaultDMLChannel)
|
||||
|
||||
msg := []flowgraph.Msg{genInsertMsg()}
|
||||
|
||||
|
|
|
@ -64,8 +64,8 @@ func newQueryNodeFlowGraph(ctx context.Context,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var filterDmNode node = newFilteredDmNode(metaReplica, collectionID)
|
||||
var insertNode node = newInsertNode(metaReplica, collectionID)
|
||||
var filterDmNode node = newFilteredDmNode(metaReplica, collectionID, channel)
|
||||
var insertNode node = newInsertNode(metaReplica, collectionID, channel)
|
||||
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
|
@ -134,8 +134,8 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID)
|
||||
var deleteNode node = newDeleteNode(metaReplica, collectionID)
|
||||
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID, channel)
|
||||
var deleteNode node = newDeleteNode(metaReplica, collectionID, channel)
|
||||
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
|
|
|
@ -83,6 +83,8 @@ const (
|
|||
|
||||
defaultCollectionName = "query-node-unittest-default-collection"
|
||||
defaultPartitionName = "query-node-unittest-default-partition"
|
||||
|
||||
defaultChannelName = "default-channel"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue