Add querynode delete ids in sealed segment (#10638)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/10652/head
godchen 2021-10-26 14:46:19 +08:00 committed by GitHub
parent 67cf56b276
commit 664da326e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 216 additions and 123 deletions

View File

@ -38,9 +38,10 @@ type dataSyncService struct {
collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs
partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs
streamingReplica ReplicaInterface
tSafeReplica TSafeReplicaInterface
msFactory msgstream.Factory
streamingReplica ReplicaInterface
historicalReplica ReplicaInterface
tSafeReplica TSafeReplicaInterface
msFactory msgstream.Factory
}
// collection flow graph
@ -60,6 +61,7 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID,
collectionID,
partitionID,
dsService.streamingReplica,
dsService.historicalReplica,
dsService.tSafeReplica,
vChannel,
dsService.msFactory)
@ -133,6 +135,7 @@ func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, p
collectionID,
partitionID,
dsService.streamingReplica,
dsService.historicalReplica,
dsService.tSafeReplica,
vChannel,
dsService.msFactory)
@ -198,6 +201,7 @@ func (dsService *dataSyncService) removePartitionFlowGraph(partitionID UniqueID)
// newDataSyncService returns a new dataSyncService
func newDataSyncService(ctx context.Context,
streamingReplica ReplicaInterface,
historicalReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
factory msgstream.Factory) *dataSyncService {
@ -206,6 +210,7 @@ func newDataSyncService(ctx context.Context,
collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
streamingReplica: streamingReplica,
historicalReplica: historicalReplica,
tSafeReplica: tSafeReplica,
msFactory: factory,
}

View File

@ -135,10 +135,13 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel})
@ -178,10 +181,13 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})
@ -222,10 +228,13 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, streaming.tSafeReplica, fac)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})

View File

@ -32,7 +32,8 @@ import (
type insertNode struct {
baseNode
replica ReplicaInterface
streamingReplica ReplicaInterface
historicalReplica ReplicaInterface
}
type insertData struct {
@ -89,8 +90,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 1. hash insertMessages to insertData
for _, task := range iMsg.insertMessages {
// check if partition exists, if not, create partition
if hasPartition := iNode.replica.hasPartition(task.PartitionID); !hasPartition {
err := iNode.replica.addPartition(task.CollectionID, task.PartitionID)
if hasPartition := iNode.streamingReplica.hasPartition(task.PartitionID); !hasPartition {
err := iNode.streamingReplica.addPartition(task.CollectionID, task.PartitionID)
if err != nil {
log.Warn(err.Error())
continue
@ -98,8 +99,8 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// check if segment exists, if not, create this segment
if !iNode.replica.hasSegment(task.SegmentID) {
err := iNode.replica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ShardName, segmentTypeGrowing, true)
if !iNode.streamingReplica.hasSegment(task.SegmentID) {
err := iNode.streamingReplica.addSegment(task.SegmentID, task.PartitionID, task.CollectionID, task.ShardName, segmentTypeGrowing, true)
if err != nil {
log.Warn(err.Error())
continue
@ -114,7 +115,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
// 2. do preInsert
for segmentID := range iData.insertRecords {
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
}
@ -146,48 +147,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// 1. filter segment by bloom filter
for _, delMsg := range iMsg.deleteMessages {
var partitionIDs []UniqueID
var err error
if delMsg.PartitionID != -1 {
partitionIDs = []UniqueID{delMsg.PartitionID}
} else {
partitionIDs, err = iNode.replica.getPartitionIDs(delMsg.CollectionID)
if err != nil {
log.Warn(err.Error())
continue
}
if iNode.streamingReplica != nil {
processDeleteMessages(iNode.streamingReplica, delMsg, delData)
}
resultSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
segmentIDs, err := iNode.replica.getSegmentIDs(partitionID)
if err != nil {
log.Warn(err.Error())
continue
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
}
for _, segmentID := range resultSegmentIDs {
segment, err := iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
continue
}
pks, err := filterSegmentsByPKs(delMsg.PrimaryKeys, segment)
if err != nil {
log.Warn(err.Error())
continue
}
if len(pks) > 0 {
offset := segment.segmentPreDelete(len(pks))
if err != nil {
log.Warn(err.Error())
continue
}
delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...)
// TODO(yukun) get offset of pks
delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], delMsg.Timestamps[:len(pks)]...)
delData.deleteOffset[segmentID] = offset
}
if iNode.historicalReplica != nil {
processDeleteMessages(iNode.historicalReplica, delMsg, delData)
}
}
@ -208,6 +172,52 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func processDeleteMessages(replica ReplicaInterface, msg *msgstream.DeleteMsg, delData *deleteData) {
var partitionIDs []UniqueID
var err error
if msg.PartitionID != -1 {
partitionIDs = []UniqueID{msg.PartitionID}
} else {
partitionIDs, err = replica.getPartitionIDs(msg.CollectionID)
if err != nil {
log.Warn(err.Error())
return
}
}
resultSegmentIDs := make([]UniqueID, 0)
for _, partitionID := range partitionIDs {
segmentIDs, err := replica.getSegmentIDs(partitionID)
if err != nil {
log.Warn(err.Error())
continue
}
resultSegmentIDs = append(resultSegmentIDs, segmentIDs...)
}
for _, segmentID := range resultSegmentIDs {
segment, err := replica.getSegmentByID(segmentID)
if err != nil {
log.Warn(err.Error())
continue
}
pks, err := filterSegmentsByPKs(msg.PrimaryKeys, segment)
if err != nil {
log.Warn(err.Error())
continue
}
if len(pks) > 0 {
offset := segment.segmentPreDelete(len(pks))
if err != nil {
log.Warn(err.Error())
continue
}
delData.deleteIDs[segmentID] = append(delData.deleteIDs[segmentID], pks...)
// TODO(yukun) get offset of pks
delData.deleteTimestamps[segmentID] = append(delData.deleteTimestamps[segmentID], msg.Timestamps[:len(pks)]...)
delData.deleteOffset[segmentID] = offset
}
}
}
func filterSegmentsByPKs(pks []int64, segment *Segment) ([]int64, error) {
if pks == nil {
return nil, fmt.Errorf("pks is nil when getSegmentsByPKs")
@ -218,19 +228,19 @@ func filterSegmentsByPKs(pks []int64, segment *Segment) ([]int64, error) {
buf := make([]byte, 8)
res := make([]int64, 0)
for _, pk := range pks {
binary.BigEndian.PutUint64(buf, uint64(pk))
binary.LittleEndian.PutUint64(buf, uint64(pk))
exist := segment.pkFilter.Test(buf)
if exist {
res = append(res, pk)
}
}
log.Debug("In filterSegmentsByPKs", zap.Any("pk", res), zap.Any("segment", segment.segmentID))
log.Debug("In filterSegmentsByPKs", zap.Any("pk len", len(res)), zap.Any("segment", segment.segmentID))
return res, nil
}
func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.WaitGroup) {
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
var targetSegment, err = iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID))
// TODO: add error handling
@ -263,13 +273,9 @@ func (iNode *insertNode) insert(iData *insertData, segmentID UniqueID, wg *sync.
func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::iNode::delete", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn("Cannot find segment:", zap.Int64("segmentID", segmentID))
return
}
if targetSegment.segmentType != segmentTypeGrowing {
targetSegment := iNode.getSegmentInReplica(segmentID)
if targetSegment == nil {
log.Warn("targetSegment is nil")
return
}
@ -277,7 +283,7 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
timestamps := deleteData.deleteTimestamps[segmentID]
offset := deleteData.deleteOffset[segmentID]
err = targetSegment.segmentDelete(offset, &ids, &timestamps)
err := targetSegment.segmentDelete(offset, &ids, &timestamps)
if err != nil {
log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err))
return
@ -286,6 +292,40 @@ func (iNode *insertNode) delete(deleteData *deleteData, segmentID UniqueID, wg *
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
}
func (iNode *insertNode) getSegmentInReplica(segmentID int64) *Segment {
streamingSegment, err := iNode.streamingReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn("Cannot find segment in streaming replica:", zap.Int64("segmentID", segmentID))
} else {
return streamingSegment
}
historicalSegment, err := iNode.historicalReplica.getSegmentByID(segmentID)
if err != nil {
log.Warn("Cannot find segment in historical replica:", zap.Int64("segmentID", segmentID))
} else {
return historicalSegment
}
log.Warn("Cannot find segment in both streaming and historical replica:", zap.Int64("segmentID", segmentID))
return nil
}
func (iNode *insertNode) getCollectionInReplica(segmentID int64) *Collection {
streamingCollection, err := iNode.streamingReplica.getCollectionByID(segmentID)
if err != nil {
log.Warn("Cannot find collection in streaming replica:", zap.Int64("collectionID", segmentID))
} else {
return streamingCollection
}
historicalCollection, err := iNode.historicalReplica.getCollectionByID(segmentID)
if err != nil {
log.Warn("Cannot find collection in historical replica:", zap.Int64("collectionID", segmentID))
} else {
return historicalCollection
}
log.Warn("Cannot find collection in both streaming and historical replica:", zap.Int64("collectionID", segmentID))
return nil
}
func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
log.Warn("misaligned messages detected")
@ -293,12 +333,11 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
}
collectionID := msg.GetCollectionID()
collection, err := iNode.replica.getCollectionByID(collectionID)
if err != nil {
log.Warn("collection cannot be found")
collection := iNode.getCollectionInReplica(collectionID)
if collection == nil {
log.Warn("collectio is nil")
return nil
}
offset := 0
for _, field := range collection.schema.Fields {
if field.IsPrimaryKey {
@ -332,10 +371,9 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
}
}
case schemapb.DataType_BinaryVector:
var dim int
for _, t := range field.TypeParams {
if t.Key == "dim" {
dim, err = strconv.Atoi(t.Value)
dim, err := strconv.Atoi(t.Value)
if err != nil {
log.Error("strconv wrong on get dim", zap.Error(err))
return nil
@ -354,7 +392,7 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
pks := make([]int64, len(blobReaders))
for i, reader := range blobReaders {
err = binary.Read(reader, binary.LittleEndian, &pks[i])
err := binary.Read(reader, binary.LittleEndian, &pks[i])
if err != nil {
log.Warn("binary read blob value failed", zap.Error(err))
}
@ -362,7 +400,7 @@ func (iNode *insertNode) getPrimaryKeys(msg *msgstream.InsertMsg) []int64 {
return pks
}
func newInsertNode(replica ReplicaInterface) *insertNode {
func newInsertNode(streamingReplica ReplicaInterface, historicalReplica ReplicaInterface) *insertNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
@ -371,7 +409,8 @@ func newInsertNode(replica ReplicaInterface) *insertNode {
baseNode.SetMaxParallelism(maxParallelism)
return &insertNode{
baseNode: baseNode,
replica: replica,
baseNode: baseNode,
streamingReplica: streamingReplica,
historicalReplica: historicalReplica,
}
}

View File

@ -69,11 +69,13 @@ func genFlowGraphDeleteData() (*deleteData, error) {
func TestFlowGraphInsertNode_insert(t *testing.T) {
t.Run("test insert", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -90,11 +92,13 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
})
t.Run("test segment insert error", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -112,20 +116,24 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
})
t.Run("test no target segment", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.insert(nil, defaultSegmentID, wg)
})
t.Run("test invalid segmentType", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -141,11 +149,13 @@ func TestFlowGraphInsertNode_insert(t *testing.T) {
func TestFlowGraphInsertNode_delete(t *testing.T) {
t.Run("test insert and delete", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -167,11 +177,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
})
t.Run("test only delete", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -187,11 +199,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
})
t.Run("test segment delete error", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -208,9 +222,11 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
})
t.Run("test no target segment", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
wg := &sync.WaitGroup{}
wg.Add(1)
insertNode.delete(nil, defaultSegmentID, wg)
@ -219,11 +235,13 @@ func TestFlowGraphInsertNode_delete(t *testing.T) {
func TestFlowGraphInsertNode_operate(t *testing.T) {
t.Run("test operate", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -245,7 +263,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
}
msg := []flowgraph.Msg{&iMsg}
insertNode.Operate(msg)
s, err := replica.getSegmentByID(defaultSegmentID)
s, err := streaming.getSegmentByID(defaultSegmentID)
assert.Nil(t, err)
buf := make([]byte, 8)
for i := 0; i < defaultMsgLength; i++ {
@ -256,11 +274,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
t.Run("test invalid partitionID", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -282,11 +302,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
t.Run("test collection partition not exist", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -308,11 +330,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
t.Run("test partition not exist", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -333,11 +357,13 @@ func TestFlowGraphInsertNode_operate(t *testing.T) {
})
t.Run("test invalid input length", func(t *testing.T) {
replica, err := genSimpleReplica()
streaming, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(replica)
historical, err := genSimpleReplica()
assert.NoError(t, err)
insertNode := newInsertNode(streaming, historical)
err = replica.addSegment(defaultSegmentID,
err = streaming.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
@ -375,7 +401,7 @@ func TestGetSegmentsByPKs(t *testing.T) {
}
pks, err := filterSegmentsByPKs([]int64{0, 1, 2, 3, 4}, segment)
assert.Nil(t, err)
assert.Equal(t, len(pks), 3)
assert.Equal(t, len(pks), 1)
pks, err = filterSegmentsByPKs([]int64{}, segment)
assert.Nil(t, err)

View File

@ -39,6 +39,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
collectionID UniqueID,
partitionID UniqueID,
streamingReplica ReplicaInterface,
historicalReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
factory msgstream.Factory) *queryNodeFlowGraph {
@ -56,7 +57,7 @@ func newQueryNodeFlowGraph(ctx context.Context,
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
var filterDmNode node = newFilteredDmNode(streamingReplica, loadType, collectionID, partitionID)
var insertNode node = newInsertNode(streamingReplica)
var insertNode node = newInsertNode(streamingReplica, historicalReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, loadType, collectionID, partitionID, channel, factory)
q.flowGraph.AddNode(dmStreamNode)

View File

@ -27,6 +27,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
@ -35,6 +38,7 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
defaultCollectionID,
defaultPartitionID,
streaming.replica,
historicalReplica,
streaming.tSafeReplica,
defaultVChannel,
fac)
@ -50,6 +54,9 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
assert.NoError(t, err)
fac, err := genFactory()
assert.NoError(t, err)
@ -58,6 +65,7 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
defaultCollectionID,
defaultPartitionID,
streaming.replica,
historicalReplica,
streaming.tSafeReplica,
defaultVChannel,
fac)

View File

@ -923,7 +923,11 @@ func genSimpleStreaming(ctx context.Context) (*streaming, error) {
if err != nil {
return nil, err
}
s := newStreaming(ctx, fac, kv)
historicalReplica, err := genSimpleReplica()
if err != nil {
return nil, err
}
s := newStreaming(ctx, fac, kv, historicalReplica)
r, err := genSimpleReplica()
if err != nil {
return nil, err

View File

@ -97,6 +97,7 @@ func updateTSafe(queryCollection *queryCollection, timestamp Timestamp) {
}
func TestQueryCollection_withoutVChannel(t *testing.T) {
ctx := context.Background()
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
"ReceiveBufSize": 1024,
@ -134,7 +135,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
assert.Nil(t, err)
//create a streaming
streaming := newStreaming(context.Background(), factory, etcdKV)
streaming := newStreaming(ctx, factory, etcdKV, historical.replica)
err = streaming.replica.addCollection(0, schema)
assert.Nil(t, err)
err = streaming.replica.addPartition(0, 1)

View File

@ -180,7 +180,7 @@ func (node *QueryNode) Init() error {
node.indexCoord,
node.msFactory,
node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV, node.historical.replica)
node.InitSegcore()

View File

@ -193,7 +193,7 @@ func newQueryNodeMock() *QueryNode {
}
svr := NewQueryNode(ctx, msFactory)
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, svr.msFactory, etcdKV)
svr.streaming = newStreaming(ctx, msFactory, etcdKV)
svr.streaming = newStreaming(ctx, msFactory, etcdKV, svr.historical.replica)
svr.etcdKV = etcdKV
return svr

View File

@ -28,17 +28,18 @@ import (
type streaming struct {
ctx context.Context
replica ReplicaInterface
tSafeReplica TSafeReplicaInterface
replica ReplicaInterface
historicalReplica ReplicaInterface
tSafeReplica TSafeReplicaInterface
dataSyncService *dataSyncService
msFactory msgstream.Factory
}
func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV) *streaming {
func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, historicalReplica ReplicaInterface) *streaming {
replica := newCollectionReplica(etcdKV)
tReplica := newTSafeReplica()
newDS := newDataSyncService(ctx, replica, tReplica, factory)
newDS := newDataSyncService(ctx, replica, historicalReplica, tReplica, factory)
return &streaming{
replica: replica,

View File

@ -24,7 +24,6 @@ class TestDeleteParams(TestcaseBase):
Only the `in` operator is supported in the expr
"""
@pytest.mark.xfail(reason="Issues #10431")
@pytest.mark.tags(CaseLabel.L0)
@pytest.mark.parametrize('is_binary', [False, True])
def test_delete_entities(self, is_binary):