mirror of https://github.com/milvus-io/milvus.git
Remove segmentPtr lock and use collection lock instead (#17303)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/17308/head
parent
42f643e727
commit
7a59a80506
|
@ -40,7 +40,8 @@ var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
|
|||
// deleteNode is the one of nodes in delta flow graph
|
||||
type deleteNode struct {
|
||||
baseNode
|
||||
metaReplica ReplicaInterface // historical
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface // historical
|
||||
}
|
||||
|
||||
// Name returns the name of deleteNode
|
||||
|
@ -82,6 +83,14 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
msg.SetTraceCtx(ctx)
|
||||
}
|
||||
|
||||
collection, err := dNode.metaReplica.getCollectionByID(dNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", dNode.Name(), dNode.collectionID))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
||||
// 1. filter segment by bloom filter
|
||||
for i, delMsg := range dMsg.deleteMessages {
|
||||
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||
|
@ -173,7 +182,7 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
|
|||
}
|
||||
|
||||
// newDeleteNode returns a new deleteNode
|
||||
func newDeleteNode(metaReplica ReplicaInterface) *deleteNode {
|
||||
func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID) *deleteNode {
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
||||
|
@ -182,7 +191,8 @@ func newDeleteNode(metaReplica ReplicaInterface) *deleteNode {
|
|||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &deleteNode{
|
||||
baseNode: baseNode,
|
||||
metaReplica: metaReplica,
|
||||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test delete", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -53,7 +53,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test segment delete error", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -75,7 +75,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = deleteNode.delete(nil, defaultSegmentID, wg)
|
||||
|
@ -85,7 +85,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) {
|
|||
t.Run("test invalid segmentType", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -105,7 +105,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test operate", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -139,7 +139,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test invalid partitionID", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -163,7 +163,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test collection partition not exist", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -190,7 +190,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test partition not exist", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
@ -216,7 +216,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) {
|
|||
t.Run("test invalid input length", func(t *testing.T) {
|
||||
historical, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
deleteNode := newDeleteNode(historical)
|
||||
deleteNode := newDeleteNode(historical, defaultCollectionID)
|
||||
|
||||
err = historical.addSegment(defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
|
|
|
@ -78,10 +78,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
},
|
||||
}
|
||||
|
||||
collection, err := fddNode.metaReplica.getCollectionByID(fddNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fddNode.Name(), fddNode.collectionID))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
||||
for _, msg := range msgStreamMsg.TsMessages() {
|
||||
switch msg.Type() {
|
||||
case commonpb.MsgType_Delete:
|
||||
resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
|
||||
resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType())
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
|
||||
|
@ -103,7 +111,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
|
||||
// filterInvalidDeleteMessage would filter invalid delete messages
|
||||
func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) {
|
||||
func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg, loadType loadType) (*msgstream.DeleteMsg, error) {
|
||||
if err := msg.CheckAligned(); err != nil {
|
||||
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
|
||||
}
|
||||
|
@ -123,13 +131,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// check if collection exists
|
||||
col, err := fddNode.metaReplica.getCollectionByID(msg.CollectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID)
|
||||
}
|
||||
if col.getLoadType() == loadTypePartition {
|
||||
if loadType == loadTypePartition {
|
||||
if !fddNode.metaReplica.hasPartition(msg.PartitionID) {
|
||||
// filter out msg which not belongs to the loaded partitions
|
||||
return nil, nil
|
||||
|
|
|
@ -48,29 +48,17 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
fg, err := getFilterDeleteNode()
|
||||
assert.NoError(t, err)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test delete no collection", func(t *testing.T) {
|
||||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
msg.CollectionID = UniqueID(1003)
|
||||
fg, err := getFilterDeleteNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1003)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
fg.collectionID = defaultCollectionID
|
||||
})
|
||||
|
||||
t.Run("test delete not target collection", func(t *testing.T) {
|
||||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
fg, err := getFilterDeleteNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1000)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -83,11 +71,11 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0)
|
||||
msg.PrimaryKeys = &schemapb.IDs{}
|
||||
msg.NumRows = 0
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{})
|
||||
res, err = fg.filterInvalidDeleteMessage(msg)
|
||||
res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -96,13 +84,10 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
fg, err := getFilterDeleteNode()
|
||||
assert.NoError(t, err)
|
||||
col, err := fg.metaReplica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.setLoadType(loadTypePartition)
|
||||
err = fg.metaReplica.removePartition(defaultPartitionID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypePartition)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
|
|
@ -79,12 +79,20 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
},
|
||||
}
|
||||
|
||||
collection, err := fdmNode.metaReplica.getCollectionByID(fdmNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fdmNode.Name(), fdmNode.collectionID))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
|
||||
for i, msg := range msgStreamMsg.TsMessages() {
|
||||
traceID, _, _ := trace.InfoFromSpan(spans[i])
|
||||
log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID))
|
||||
switch msg.Type() {
|
||||
case commonpb.MsgType_Insert:
|
||||
resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg))
|
||||
resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg), collection.getLoadType())
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err)
|
||||
|
@ -95,7 +103,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
|
||||
}
|
||||
case commonpb.MsgType_Delete:
|
||||
resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
|
||||
resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType())
|
||||
if err != nil {
|
||||
// error occurs when missing meta info or data is misaligned, should not happen
|
||||
err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err)
|
||||
|
@ -118,7 +126,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
|
||||
// filterInvalidDeleteMessage would filter out invalid delete messages
|
||||
func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) {
|
||||
func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg, loadType loadType) (*msgstream.DeleteMsg, error) {
|
||||
if err := msg.CheckAligned(); err != nil {
|
||||
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
|
||||
}
|
||||
|
@ -139,13 +147,7 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// check if collection exist
|
||||
col, err := fdmNode.metaReplica.getCollectionByID(msg.CollectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID)
|
||||
}
|
||||
if col.getLoadType() == loadTypePartition {
|
||||
if loadType == loadTypePartition {
|
||||
if !fdmNode.metaReplica.hasPartition(msg.PartitionID) {
|
||||
// filter out msg which not belongs to the loaded partitions
|
||||
return nil, nil
|
||||
|
@ -156,7 +158,7 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg
|
|||
}
|
||||
|
||||
// filterInvalidInsertMessage would filter out invalid insert messages
|
||||
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) (*msgstream.InsertMsg, error) {
|
||||
func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg, loadType loadType) (*msgstream.InsertMsg, error) {
|
||||
if err := msg.CheckAligned(); err != nil {
|
||||
return nil, fmt.Errorf("CheckAligned failed, err = %s", err)
|
||||
}
|
||||
|
@ -180,13 +182,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// check if collection exists
|
||||
col, err := fdmNode.metaReplica.getCollectionByID(msg.CollectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
return nil, fmt.Errorf("filter invalid insert message, collection does not exist, collectionID = %d", msg.CollectionID)
|
||||
}
|
||||
if col.getLoadType() == loadTypePartition {
|
||||
if loadType == loadTypePartition {
|
||||
if !fdmNode.metaReplica.hasPartition(msg.PartitionID) {
|
||||
// filter out msg which not belongs to the loaded partitions
|
||||
return nil, nil
|
||||
|
|
|
@ -53,7 +53,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, res)
|
||||
})
|
||||
|
@ -65,7 +65,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1000)
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
fg.collectionID = defaultCollectionID
|
||||
|
@ -78,11 +78,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
|
||||
col, err := fg.metaReplica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.setLoadType(loadTypePartition)
|
||||
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypePartition)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -93,7 +89,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1000)
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -104,7 +100,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
fg.metaReplica.removeExcludedSegments(defaultCollectionID)
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -124,7 +120,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
},
|
||||
},
|
||||
})
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -135,7 +131,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
msg.Timestamps = make([]Timestamp, 0)
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -150,7 +146,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) {
|
|||
msg.RowData = make([]*commonpb.Blob, 0)
|
||||
msg.NumRows = 0
|
||||
msg.FieldsData = nil
|
||||
res, err := fg.filterInvalidInsertMessage(msg)
|
||||
res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -161,34 +157,18 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, res)
|
||||
})
|
||||
|
||||
t.Run("test delete no collection", func(t *testing.T) {
|
||||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
msg.CollectionID = UniqueID(1003)
|
||||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1003)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
fg.collectionID = defaultCollectionID
|
||||
})
|
||||
|
||||
t.Run("test delete no partition", func(t *testing.T) {
|
||||
msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength)
|
||||
msg.PartitionID = UniqueID(1000)
|
||||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
|
||||
col, err := fg.metaReplica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
col.setLoadType(loadTypePartition)
|
||||
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypePartition)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -198,7 +178,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
fg.collectionID = UniqueID(1000)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -208,7 +188,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
fg, err := getFilterDMNode()
|
||||
assert.NoError(t, err)
|
||||
msg.Timestamps = make([]Timestamp, 0)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
@ -221,11 +201,11 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
|
|||
msg.NumRows = 0
|
||||
msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0)
|
||||
msg.PrimaryKeys = &schemapb.IDs{}
|
||||
res, err := fg.filterInvalidDeleteMessage(msg)
|
||||
res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{})
|
||||
res, err = fg.filterInvalidDeleteMessage(msg)
|
||||
res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, res)
|
||||
})
|
||||
|
|
|
@ -43,7 +43,8 @@ import (
|
|||
// insertNode is one of the nodes in query flow graph
|
||||
type insertNode struct {
|
||||
baseNode
|
||||
metaReplica ReplicaInterface // streaming
|
||||
collectionID UniqueID
|
||||
metaReplica ReplicaInterface // streaming
|
||||
}
|
||||
|
||||
// insertData stores the valid insert data
|
||||
|
@ -103,6 +104,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
msg.SetTraceCtx(ctx)
|
||||
}
|
||||
|
||||
collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID)
|
||||
if err != nil {
|
||||
// QueryNode should add collection before start flow graph
|
||||
panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", iNode.Name(), iNode.collectionID))
|
||||
}
|
||||
collection.RLock()
|
||||
defer collection.RUnlock()
|
||||
// 1. hash insertMessages to insertData
|
||||
// sort timestamps ensures that the data in iData.insertRecords is sorted in ascending order of timestamp
|
||||
// avoiding re-sorting in segCore, which will need data copying
|
||||
|
@ -111,14 +119,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
})
|
||||
for _, insertMsg := range iMsg.insertMessages {
|
||||
// if loadType is loadCollection, check if partition exists, if not, create partition
|
||||
col, err := iNode.metaReplica.getCollectionByID(insertMsg.CollectionID)
|
||||
if err != nil {
|
||||
// should not happen, QueryNode should create collection before start flow graph
|
||||
err = fmt.Errorf("insertNode getCollectionByID failed, err = %s", err)
|
||||
log.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
if col.getLoadType() == loadTypeCollection {
|
||||
if collection.getLoadType() == loadTypeCollection {
|
||||
err = iNode.metaReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID)
|
||||
if err != nil {
|
||||
// error occurs only when collection cannot be found, should not happen
|
||||
|
@ -144,7 +145,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
}
|
||||
|
||||
insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg)
|
||||
insertRecord, err := storage.TransferInsertMsgToInsertRecord(collection.schema, insertMsg)
|
||||
if err != nil {
|
||||
// occurs only when schema doesn't have dim param, this should not happen
|
||||
err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err)
|
||||
|
@ -503,7 +504,7 @@ func getPKsFromColumnBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.C
|
|||
}
|
||||
|
||||
// newInsertNode returns a new insertNode
|
||||
func newInsertNode(metaReplica ReplicaInterface) *insertNode {
|
||||
func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID) *insertNode {
|
||||
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism
|
||||
|
||||
|
@ -512,7 +513,8 @@ func newInsertNode(metaReplica ReplicaInterface) *insertNode {
|
|||
baseNode.SetMaxParallelism(maxParallelism)
|
||||
|
||||
return &insertNode{
|
||||
baseNode: baseNode,
|
||||
metaReplica: metaReplica,
|
||||
baseNode: baseNode,
|
||||
collectionID: collectionID,
|
||||
metaReplica: metaReplica,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func getInsertNode() (*insertNode, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return newInsertNode(streaming), nil
|
||||
return newInsertNode(streaming, defaultCollectionID), nil
|
||||
}
|
||||
|
||||
func genFlowGraphInsertData(schema *schemapb.CollectionSchema, numRows int) (*insertData, error) {
|
||||
|
@ -128,7 +128,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = insertNode.insert(nil, defaultSegmentID, wg)
|
||||
|
@ -203,7 +203,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
|
|||
t.Run("test no target segment", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
err = insertNode.delete(nil, defaultSegmentID, wg)
|
||||
|
@ -338,7 +338,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
|
|||
t.Run("test getCollectionByID failed", func(t *testing.T) {
|
||||
streaming, err := genSimpleReplica()
|
||||
assert.NoError(t, err)
|
||||
insertNode := newInsertNode(streaming)
|
||||
insertNode := newInsertNode(streaming, defaultCollectionID)
|
||||
|
||||
msg := []flowgraph.Msg{genInsertMsg()}
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
|
|||
return nil, err
|
||||
}
|
||||
var filterDmNode node = newFilteredDmNode(metaReplica, collectionID)
|
||||
var insertNode node = newInsertNode(metaReplica)
|
||||
var insertNode node = newInsertNode(metaReplica, collectionID)
|
||||
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
|
@ -135,7 +135,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context,
|
|||
return nil, err
|
||||
}
|
||||
var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID)
|
||||
var deleteNode node = newDeleteNode(metaReplica)
|
||||
var deleteNode node = newDeleteNode(metaReplica, collectionID)
|
||||
var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel)
|
||||
|
||||
q.flowGraph.AddNode(dmStreamNode)
|
||||
|
|
|
@ -76,7 +76,6 @@ type IndexedFieldInfo struct {
|
|||
|
||||
// Segment is a wrapper of the underlying C-structure segment.
|
||||
type Segment struct {
|
||||
segPtrMu sync.RWMutex // guards segmentPtr
|
||||
segmentPtr C.CSegmentInterface
|
||||
|
||||
segmentID UniqueID
|
||||
|
@ -219,15 +218,11 @@ func deleteSegment(segment *Segment) {
|
|||
return
|
||||
}
|
||||
|
||||
segment.segPtrMu.Lock()
|
||||
defer segment.segPtrMu.Unlock()
|
||||
cPtr := segment.segmentPtr
|
||||
C.DeleteSegment(cPtr)
|
||||
segment.segmentPtr = nil
|
||||
|
||||
log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID()))
|
||||
|
||||
segment = nil
|
||||
}
|
||||
|
||||
func (s *Segment) getRowCount() int64 {
|
||||
|
@ -235,8 +230,6 @@ func (s *Segment) getRowCount() int64 {
|
|||
long int
|
||||
getRowCount(CSegmentInterface c_segment);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock()
|
||||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
|
@ -249,8 +242,6 @@ func (s *Segment) getDeletedCount() int64 {
|
|||
long int
|
||||
getDeletedCount(CSegmentInterface c_segment);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock()
|
||||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
|
@ -263,8 +254,6 @@ func (s *Segment) getMemSize() int64 {
|
|||
long int
|
||||
GetMemoryUsageInBytes(CSegmentInterface c_segment);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock()
|
||||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
|
@ -283,8 +272,6 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
|
|||
long int* result_ids,
|
||||
float* result_distances);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock()
|
||||
if s.segmentPtr == nil {
|
||||
return nil, errors.New("null seg core pointer")
|
||||
}
|
||||
|
@ -317,8 +304,6 @@ func HandleCProto(cRes *C.CProto, msg proto.Message) error {
|
|||
}
|
||||
|
||||
func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock()
|
||||
if s.segmentPtr == nil {
|
||||
return nil, errors.New("null seg core pointer")
|
||||
}
|
||||
|
@ -566,8 +551,6 @@ func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
|
|||
long int
|
||||
PreInsert(CSegmentInterface c_segment, long int size);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentType != segmentTypeGrowing {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -585,16 +568,12 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
|
|||
long int
|
||||
PreDelete(CSegmentInterface c_segment, long int size);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
var offset = C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords)))
|
||||
|
||||
return int64(offset)
|
||||
}
|
||||
|
||||
func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps []Timestamp, record *segcorepb.InsertRecord) error {
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentType != segmentTypeGrowing {
|
||||
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
|
||||
}
|
||||
|
@ -642,8 +621,6 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps
|
|||
return fmt.Errorf("empty pks to delete")
|
||||
}
|
||||
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentPtr == nil {
|
||||
return errors.New("null seg core pointer")
|
||||
}
|
||||
|
@ -702,8 +679,6 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
|
|||
CStatus
|
||||
LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info);
|
||||
*/
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentPtr == nil {
|
||||
return errors.New("null seg core pointer")
|
||||
}
|
||||
|
@ -738,8 +713,6 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
|
|||
}
|
||||
|
||||
func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps []Timestamp, rowCount int64) error {
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentPtr == nil {
|
||||
return errors.New("null seg core pointer")
|
||||
}
|
||||
|
@ -812,8 +785,6 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F
|
|||
return err
|
||||
}
|
||||
|
||||
s.segPtrMu.RLock()
|
||||
defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock
|
||||
if s.segmentPtr == nil {
|
||||
return errors.New("null seg core pointer")
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -530,53 +529,6 @@ func TestSegment_segmentLoadFieldData(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestSegment_ConcurrentOperation(t *testing.T) {
|
||||
const N = 16
|
||||
var ages = []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
|
||||
ageData := &schemapb.FieldData{
|
||||
Type: simpleInt32Field.dataType,
|
||||
FieldId: simpleInt32Field.id,
|
||||
Field: &schemapb.FieldData_Scalars{
|
||||
Scalars: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{
|
||||
Data: ages,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
partitionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(collectionID, schema)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 100; i++ {
|
||||
segmentID := UniqueID(i)
|
||||
segment, err := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Equal(t, partitionID, segment.partitionID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
deleteSegment(segment)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
// segmentLoadFieldData result error may be nil or not, we just expected this test would not crash.
|
||||
_ = segment.segmentLoadFieldData(simpleInt32Field.id, N, ageData)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
deleteCollection(collection)
|
||||
}
|
||||
|
||||
func TestSegment_indexInfo(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
Loading…
Reference in New Issue