Add flow_graph for delta channel in querynode (#11293)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/11360/head
yukun 2021-11-06 11:02:58 +08:00 committed by GitHub
parent 0699176d24
commit 4e5c8811da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 958 additions and 118 deletions

View File

@ -34,9 +34,10 @@ const (
type dataSyncService struct {
ctx context.Context
mu sync.Mutex // guards FlowGraphs
collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs
partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs
mu sync.Mutex // guards FlowGraphs
collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs
collectionDeltaFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph
partitionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs
streamingReplica ReplicaInterface
historicalReplica ReplicaInterface
@ -72,6 +73,33 @@ func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID,
}
}
// collection flow graph
// addCollectionFlowGraphDelta add a collection flowGraph to collectionFlowGraphs
func (dsService *dataSyncService) addCollectionDeltaFlowGraph(collectionID UniqueID, vChannels []string) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok {
dsService.collectionDeltaFlowGraphs[collectionID] = make(map[Channel]*queryNodeFlowGraph)
}
for _, vChannel := range vChannels {
// collection flow graph doesn't need partition id
partitionID := UniqueID(0)
newFlowGraph := newQueryNodeDeltaFlowGraph(dsService.ctx,
loadTypeCollection,
collectionID,
partitionID,
dsService.historicalReplica,
dsService.tSafeReplica,
vChannel,
dsService.msFactory)
dsService.collectionDeltaFlowGraphs[collectionID][vChannel] = newFlowGraph
log.Debug("add collection flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", vChannel))
}
}
func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
@ -90,6 +118,24 @@ func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID,
return tmpFGs, nil
}
func (dsService *dataSyncService) getCollectionDeltaFlowGraphs(collectionID UniqueID, vChannels []string) (map[Channel]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok {
return nil, errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID))
}
tmpFGs := make(map[Channel]*queryNodeFlowGraph)
for _, channel := range vChannels {
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID][channel]; ok {
tmpFGs[channel] = dsService.collectionDeltaFlowGraphs[collectionID][channel]
}
}
return tmpFGs, nil
}
func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID, vChannels []string) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
@ -107,6 +153,23 @@ func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID
return nil
}
func (dsService *dataSyncService) startCollectionDeltaFlowGraph(collectionID UniqueID, vChannels []string) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; !ok {
return errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID))
}
for _, channel := range vChannels {
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID][channel]; ok {
// start flow graph
log.Debug("start collection flow graph", zap.Any("channel", channel))
dsService.collectionDeltaFlowGraphs[collectionID][channel].flowGraph.Start()
}
}
return nil
}
func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueID) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
@ -121,6 +184,20 @@ func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueI
delete(dsService.collectionFlowGraphs, collectionID)
}
func (dsService *dataSyncService) removeCollectionDeltaFlowGraph(collectionID UniqueID) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionDeltaFlowGraphs[collectionID]; ok {
for _, nodeFG := range dsService.collectionDeltaFlowGraphs[collectionID] {
// close flow graph
nodeFG.close()
}
dsService.collectionDeltaFlowGraphs[collectionID] = nil
}
delete(dsService.collectionDeltaFlowGraphs, collectionID)
}
// partition flow graph
func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) {
dsService.mu.Lock()
@ -206,13 +283,14 @@ func newDataSyncService(ctx context.Context,
factory msgstream.Factory) *dataSyncService {
return &dataSyncService{
ctx: ctx,
collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
streamingReplica: streamingReplica,
historicalReplica: historicalReplica,
tSafeReplica: tSafeReplica,
msFactory: factory,
ctx: ctx,
collectionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
collectionDeltaFlowGraphs: map[int64]map[string]*queryNodeFlowGraph{},
partitionFlowGraphs: make(map[UniqueID]map[Channel]*queryNodeFlowGraph),
streamingReplica: streamingReplica,
historicalReplica: historicalReplica,
tSafeReplica: tSafeReplica,
msFactory: factory,
}
}

View File

@ -117,12 +117,12 @@ func TestDataSyncService_Start(t *testing.T) {
assert.Nil(t, err)
channels := []Channel{"0"}
node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels)
err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, channels)
node.dataSyncService.addCollectionFlowGraph(collectionID, channels)
err = node.dataSyncService.startCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
<-node.queryNodeLoopCtx.Done()
node.streaming.dataSyncService.close()
node.dataSyncService.close()
err = node.Stop()
assert.NoError(t, err)
@ -132,7 +132,7 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
streaming, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
@ -141,7 +141,8 @@ func TestDataSyncService_collectionFlowGraphs(t *testing.T) {
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
tSafe := newTSafeReplica()
dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addCollectionFlowGraph(defaultCollectionID, []Channel{defaultVChannel})
@ -178,7 +179,7 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
streaming, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
@ -187,7 +188,9 @@ func TestDataSyncService_partitionFlowGraphs(t *testing.T) {
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
tSafe := newTSafeReplica()
dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})
@ -225,7 +228,7 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) {
defer cancel()
t.Run("test no tSafe", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
streaming, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
@ -234,7 +237,10 @@ func TestDataSyncService_removePartitionFlowGraphs(t *testing.T) {
fac, err := genFactory()
assert.NoError(t, err)
dataSyncService := newDataSyncService(ctx, streaming.replica, historicalReplica, streaming.tSafeReplica, fac)
tSafe := newTSafeReplica()
tSafe.addTSafe(defaultVChannel)
dataSyncService := newDataSyncService(ctx, streaming, historicalReplica, tSafe, fac)
assert.NotNil(t, dataSyncService)
dataSyncService.addPartitionFlowGraph(defaultPartitionID, defaultPartitionID, []Channel{defaultVChannel})

View File

@ -0,0 +1,133 @@
package querynode
import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type deleteNode struct {
baseNode
replica ReplicaInterface // historical
}
func (dNode *deleteNode) Name() string {
return "dNode"
}
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
// TODO: add error handling
}
dMsg, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg")
// TODO: add error handling
}
delData := &deleteData{
deleteIDs: map[UniqueID][]int64{},
deleteTimestamps: map[UniqueID][]Timestamp{},
deleteOffset: map[UniqueID]int64{},
}
if dMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range dMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
// 1. filter segment by bloom filter
for _, delMsg := range dMsg.deleteMessages {
if dNode.replica.getSegmentNum() != 0 {
processDeleteMessages(dNode.replica, delMsg, delData)
}
}
// 2. do preDelete
for segmentID, pks := range delData.deleteIDs {
segment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
log.Warn("Cannot find segment in historical replica:", zap.Int64("segmentID", segmentID))
}
offset := segment.segmentPreDelete(len(pks))
delData.deleteOffset[segmentID] = offset
}
// 3. do delete
wg := sync.WaitGroup{}
for segmentID := range delData.deleteIDs {
wg.Add(1)
go dNode.delete(delData, segmentID, &wg)
}
wg.Wait()
var res Msg = &serviceTimeMsg{
timeRange: dMsg.timeRange,
}
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}
func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg *sync.WaitGroup) {
defer wg.Done()
log.Debug("QueryNode::dNode::delete", zap.Any("SegmentID", segmentID))
targetSegment := dNode.getSegmentInReplica(segmentID)
if targetSegment == nil {
log.Warn("targetSegment is nil")
return
}
if targetSegment.segmentType != segmentTypeSealed {
return
}
ids := deleteData.deleteIDs[segmentID]
timestamps := deleteData.deleteTimestamps[segmentID]
offset := deleteData.deleteOffset[segmentID]
err := targetSegment.segmentDelete(offset, &ids, &timestamps)
if err != nil {
log.Warn("QueryNode: targetSegmentDelete failed", zap.Error(err))
return
}
log.Debug("Do delete done", zap.Int("len", len(deleteData.deleteIDs[segmentID])), zap.Int64("segmentID", segmentID))
}
func (dNode *deleteNode) getSegmentInReplica(segmentID int64) *Segment {
segment, err := dNode.replica.getSegmentByID(segmentID)
if err != nil {
} else {
return segment
}
return nil
}
func newDeleteNode(historicalReplica ReplicaInterface) *deleteNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := baseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &deleteNode{
baseNode: baseNode,
replica: historicalReplica,
}
}

View File

@ -0,0 +1,235 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func TestFlowGraphDeleteNode_delete(t *testing.T) {
t.Run("test delete", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(deleteData, defaultSegmentID, wg)
})
t.Run("test segment delete error", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
deleteData, err := genFlowGraphDeleteData()
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteData.deleteTimestamps[defaultSegmentID] = deleteData.deleteTimestamps[defaultSegmentID][:len(deleteData.deleteTimestamps)/2]
deleteNode.delete(deleteData, defaultSegmentID, wg)
})
t.Run("test no target segment", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(nil, defaultSegmentID, wg)
})
t.Run("test invalid segmentType", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeGrowing,
true)
assert.NoError(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
deleteNode.delete(&deleteData{}, defaultSegmentID, wg)
})
}
func TestFlowGraphDeleteNode_operate(t *testing.T) {
t.Run("test operate", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
dMsg := deleteMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
s, err := historical.getSegmentByID(defaultSegmentID)
pks := make([]int64, defaultMsgLength)
for i := 0; i < defaultMsgLength; i++ {
pks[i] = int64(i)
}
s.updateBloomFilter(pks)
assert.Nil(t, err)
buf := make([]byte, 8)
for i := 0; i < defaultMsgLength; i++ {
common.Endian.PutUint64(buf, uint64(i))
assert.True(t, s.pkFilter.Test(buf))
}
})
t.Run("test invalid partitionID", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msgDeleteMsg.PartitionID = common.InvalidPartitionID
assert.NoError(t, err)
dMsg := deleteMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
})
t.Run("test collection partition not exist", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
msgDeleteMsg.CollectionID = 9999
msgDeleteMsg.PartitionID = -1
assert.NoError(t, err)
dMsg := deleteMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
})
t.Run("test partition not exist", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
msgDeleteMsg.PartitionID = 9999
assert.NoError(t, err)
dMsg := deleteMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&dMsg}
deleteNode.Operate(msg)
})
t.Run("test invalid input length", func(t *testing.T) {
historical, err := genSimpleReplica()
assert.NoError(t, err)
deleteNode := newDeleteNode(historical)
err = historical.addSegment(defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
assert.NoError(t, err)
msgDeleteMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
dMsg := deleteMsg{
deleteMessages: []*msgstream.DeleteMsg{
msgDeleteMsg,
},
}
msg := []flowgraph.Msg{&dMsg, &dMsg}
deleteNode.Operate(msg)
})
}

View File

@ -0,0 +1,113 @@
package querynode
import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type filterDeleteNode struct {
baseNode
collectionID UniqueID
partitionID UniqueID
replica ReplicaInterface
}
func (fddNode *filterDeleteNode) Name() string {
return "fddNode"
}
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if len(in) != 1 {
log.Error("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)))
// TODO: add error handling
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg")
// TODO: add error handling
}
if msgStreamMsg == nil {
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range msgStreamMsg.TsMessages() {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
var dMsg = deleteMsg{
deleteMessages: make([]*msgstream.DeleteMsg, 0),
timeRange: TimeRange{
timestampMin: msgStreamMsg.TimestampMin(),
timestampMax: msgStreamMsg.TimestampMax(),
},
}
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case commonpb.MsgType_Delete:
resMsg := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg))
if resMsg != nil {
dMsg.deleteMessages = append(dMsg.deleteMessages, resMsg)
}
default:
log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type())))
}
}
var res Msg = &dMsg
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}
func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) *msgstream.DeleteMsg {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
msg.SetTraceCtx(ctx)
defer sp.Finish()
if msg.CollectionID != fddNode.collectionID {
return nil
}
if len(msg.PrimaryKeys) != len(msg.Timestamps) {
log.Warn("Error, misaligned messages detected")
return nil
}
if len(msg.Timestamps) <= 0 {
log.Debug("filter invalid delete message, no message",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
return nil
}
return msg
}
func newFilteredDeleteNode(replica ReplicaInterface,
collectionID UniqueID,
partitionID UniqueID) *filterDeleteNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
baseNode := baseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
return &filterDeleteNode{
baseNode: baseNode,
collectionID: collectionID,
partitionID: partitionID,
replica: replica,
}
}

View File

@ -0,0 +1,115 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func getFilterDeleteNode(ctx context.Context) (*filterDeleteNode, error) {
historical, err := genSimpleReplica()
if err != nil {
return nil, err
}
historical.addExcludedSegments(defaultCollectionID, nil)
return newFilteredDeleteNode(historical, defaultCollectionID, defaultPartitionID), nil
}
func TestFlowGraphFilterDeleteNode_filterDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
fg.Name()
}
func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("delete valid test", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.NotNil(t, res)
})
t.Run("test delete no collection", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msg.CollectionID = UniqueID(1003)
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete not target collection", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
fg.collectionID = UniqueID(1000)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
t.Run("test delete no data", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
msg.Timestamps = make([]Timestamp, 0)
msg.PrimaryKeys = make([]IntPrimaryKey, 0)
res := fg.filterInvalidDeleteMessage(msg)
assert.Nil(t, res)
})
}
func TestFlowGraphFilterDeleteNode_Operate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
genFilterDeleteMsg := func() []flowgraph.Msg {
dMsg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msg := flowgraph.GenerateMsgStreamMsg([]msgstream.TsMsg{dMsg}, 0, 1000, nil, nil)
return []flowgraph.Msg{msg}
}
t.Run("valid test", func(t *testing.T) {
msg := genFilterDeleteMsg()
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
res := fg.Operate(msg)
assert.NotNil(t, res)
})
t.Run("invalid input length", func(t *testing.T) {
msg := genFilterDeleteMsg()
fg, err := getFilterDeleteNode(ctx)
assert.NoError(t, err)
var m flowgraph.Msg
msg = append(msg, m)
res := fg.Operate(msg)
assert.NotNil(t, res)
})
}

View File

@ -25,13 +25,13 @@ import (
)
func getFilterDMNode(ctx context.Context) (*filterDmNode, error) {
streaming, err := genSimpleStreaming(ctx)
streaming, err := genSimpleReplica()
if err != nil {
return nil, err
}
streaming.replica.addExcludedSegments(defaultCollectionID, nil)
return newFilteredDmNode(streaming.replica, loadTypeCollection, defaultCollectionID, defaultPartitionID), nil
streaming.addExcludedSegments(defaultCollectionID, nil)
return newFilteredDmNode(streaming, loadTypeCollection, defaultCollectionID, defaultPartitionID), nil
}
func TestFlowGraphFilterDmNode_filterDmNode(t *testing.T) {
@ -183,7 +183,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) {
t.Run("test delete no collection", func(t *testing.T) {
msg, err := genSimpleDeleteMsg()
assert.NoError(t, err)
msg.CollectionID = UniqueID(1000)
msg.CollectionID = UniqueID(1003)
fg, err := getFilterDMNode(ctx)
assert.NoError(t, err)
res := fg.filterInvalidDeleteMessage(msg)

View File

@ -28,6 +28,11 @@ type insertMsg struct {
timeRange TimeRange
}
type deleteMsg struct {
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}
type serviceTimeMsg struct {
timeRange TimeRange
}
@ -36,6 +41,10 @@ func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}

View File

@ -104,6 +104,75 @@ func newQueryNodeFlowGraph(ctx context.Context,
return q
}
func newQueryNodeDeltaFlowGraph(ctx context.Context,
loadType loadType,
collectionID UniqueID,
partitionID UniqueID,
historicalReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel Channel,
factory msgstream.Factory) *queryNodeFlowGraph {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
partitionID: partitionID,
channel: channel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
var filterDeleteNode node = newFilteredDeleteNode(historicalReplica, collectionID, partitionID)
var deleteNode node = newDeleteNode(historicalReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, loadTypeCollection, collectionID, partitionID, channel, factory)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDeleteNode)
q.flowGraph.AddNode(deleteNode)
q.flowGraph.AddNode(serviceTimeNode)
// dmStreamNode
var err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDeleteNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name()))
}
// filterDmNode
err = q.flowGraph.SetEdges(filterDeleteNode.Name(),
[]string{dmStreamNode.Name()},
[]string{deleteNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", filterDeleteNode.Name()))
}
// insertNode
err = q.flowGraph.SetEdges(deleteNode.Name(),
[]string{filterDeleteNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", deleteNode.Name()))
}
// serviceTimeNode
err = q.flowGraph.SetEdges(serviceTimeNode.Name(),
[]string{deleteNode.Name()},
[]string{},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name()))
}
return q
}
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {

View File

@ -24,7 +24,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streamingReplica, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
@ -37,9 +39,9 @@ func TestQueryNodeFlowGraph_consumerFlowGraph(t *testing.T) {
loadTypeCollection,
defaultCollectionID,
defaultPartitionID,
streaming.replica,
streamingReplica,
historicalReplica,
streaming.tSafeReplica,
tSafe,
defaultVChannel,
fac)
@ -51,7 +53,7 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
streamingReplica, err := genSimpleReplica()
assert.NoError(t, err)
historicalReplica, err := genSimpleReplica()
@ -60,13 +62,15 @@ func TestQueryNodeFlowGraph_seekQueryNodeFlowGraph(t *testing.T) {
fac, err := genFactory()
assert.NoError(t, err)
tSafe := newTSafeReplica()
fg := newQueryNodeFlowGraph(ctx,
loadTypeCollection,
defaultCollectionID,
defaultPartitionID,
streaming.replica,
streamingReplica,
historicalReplica,
streaming.tSafeReplica,
tSafe,
defaultVChannel,
fac)

View File

@ -25,14 +25,14 @@ func TestServiceTimeNode_Operate(t *testing.T) {
defer cancel()
genServiceTimeNode := func() *serviceTimeNode {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
tSafe := newTSafeReplica()
tSafe.addTSafe(defaultVChannel)
fac, err := genFactory()
assert.NoError(t, err)
node := newServiceTimeNode(ctx,
streaming.tSafeReplica,
tSafe,
loadTypeCollection,
defaultCollectionID,
defaultPartitionID,

View File

@ -43,6 +43,7 @@ type historical struct {
replica ReplicaInterface
loader *segmentLoader
statsService *statsService
tSafeReplica TSafeReplicaInterface
mu sync.Mutex // guards globalSealedSegments
globalSealedSegments map[UniqueID]*querypb.SegmentInfo
@ -52,11 +53,12 @@ type historical struct {
// newHistorical returns a new historical
func newHistorical(ctx context.Context,
replica ReplicaInterface,
rootCoord types.RootCoord,
indexCoord types.IndexCoord,
factory msgstream.Factory,
etcdKV *etcdkv.EtcdKV) *historical {
replica := newCollectionReplica(etcdKV)
etcdKV *etcdkv.EtcdKV,
tSafeReplica TSafeReplicaInterface) *historical {
loader := newSegmentLoader(ctx, rootCoord, indexCoord, replica, etcdKV)
ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory)
@ -67,6 +69,7 @@ func newHistorical(ctx context.Context,
statsService: ss,
globalSealedSegments: make(map[UniqueID]*querypb.SegmentInfo),
etcdKV: etcdKV,
tSafeReplica: tSafeReplica,
}
}

View File

@ -95,7 +95,8 @@ func TestHistorical_Search(t *testing.T) {
defer cancel()
t.Run("test search", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
@ -106,7 +107,8 @@ func TestHistorical_Search(t *testing.T) {
})
t.Run("test no collection - search partitions", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
@ -120,7 +122,8 @@ func TestHistorical_Search(t *testing.T) {
})
t.Run("test no collection - search all collection", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
@ -134,7 +137,8 @@ func TestHistorical_Search(t *testing.T) {
})
t.Run("test load partition and partition has been released", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
@ -152,7 +156,8 @@ func TestHistorical_Search(t *testing.T) {
})
t.Run("test no partition in collection", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
@ -168,7 +173,8 @@ func TestHistorical_Search(t *testing.T) {
})
t.Run("test load collection partition released in collection", func(t *testing.T) {
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()

View File

@ -241,7 +241,7 @@ func TestImpl_WatchDmChannels(t *testing.T) {
req := &queryPb.WatchDmChannelsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgType: commonpb.MsgType_WatchDmChannels,
MsgID: rand.Int63(),
},
NodeID: 0,

View File

@ -25,7 +25,8 @@ func TestIndexLoader_setIndexInfo(t *testing.T) {
defer cancel()
t.Run("test setIndexInfo", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
@ -39,7 +40,8 @@ func TestIndexLoader_setIndexInfo(t *testing.T) {
})
t.Run("test nil root and index", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
@ -55,7 +57,8 @@ func TestIndexLoader_getIndexBinlog(t *testing.T) {
defer cancel()
t.Run("test getIndexBinlog", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
paths, err := generateIndex(defaultSegmentID)
@ -66,7 +69,8 @@ func TestIndexLoader_getIndexBinlog(t *testing.T) {
})
t.Run("test invalid path", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
_, _, _, err = historical.loader.indexLoader.getIndexBinlog([]string{""})
@ -78,7 +82,8 @@ func TestIndexLoader_printIndexParams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
indexKV := []*commonpb.KeyValuePair{
@ -95,7 +100,8 @@ func TestIndexLoader_loadIndex(t *testing.T) {
defer cancel()
t.Run("test loadIndex", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
@ -112,7 +118,8 @@ func TestIndexLoader_loadIndex(t *testing.T) {
})
t.Run("test set indexinfo with empty indexFilePath", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()
@ -144,7 +151,8 @@ func TestIndexLoader_loadIndex(t *testing.T) {
//})
t.Run("test checkIndexReady failed", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
segment, err := genSimpleSealedSegment()

View File

@ -58,8 +58,9 @@ const (
defaultNProb = 10
defaultMetricType = "JACCARD"
defaultKVRootPath = "query-node-unittest"
defaultVChannel = "query-node-unittest-channel-0"
defaultKVRootPath = "query-node-unittest"
defaultVChannel = "query-node-unittest-channel-0"
defaultHistoricalVChannel = "query-node-unittest-historical-channel-0"
//defaultQueryChannel = "query-node-unittest-query-channel-0"
//defaultQueryResultChannel = "query-node-unittest-query-result-channel-0"
defaultSubName = "query-node-unittest-sub-name-0"
@ -881,7 +882,7 @@ func genSimpleReplica() (ReplicaInterface, error) {
return r, err
}
func genSimpleHistorical(ctx context.Context) (*historical, error) {
func genSimpleHistorical(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*historical, error) {
fac, err := genFactory()
if err != nil {
return nil, err
@ -890,7 +891,11 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) {
if err != nil {
return nil, err
}
h := newHistorical(ctx, newMockRootCoord(), newMockIndexCoord(), fac, kv)
replica, err := genSimpleReplica()
if err != nil {
return nil, err
}
h := newHistorical(ctx, replica, newMockRootCoord(), newMockIndexCoord(), fac, kv, tSafeReplica)
r, err := genSimpleReplica()
if err != nil {
return nil, err
@ -910,12 +915,14 @@ func genSimpleHistorical(ctx context.Context) (*historical, error) {
return nil, err
}
col.addVChannels([]Channel{
// defaultHistoricalVChannel,
defaultVChannel,
})
// h.tSafeReplica.addTSafe(defaultHistoricalVChannel)
return h, nil
}
func genSimpleStreaming(ctx context.Context) (*streaming, error) {
func genSimpleStreaming(ctx context.Context, tSafeReplica TSafeReplicaInterface) (*streaming, error) {
kv, err := genEtcdKV()
if err != nil {
return nil, err
@ -924,11 +931,11 @@ func genSimpleStreaming(ctx context.Context) (*streaming, error) {
if err != nil {
return nil, err
}
historicalReplica, err := genSimpleReplica()
replica, err := genSimpleReplica()
if err != nil {
return nil, err
}
s := newStreaming(ctx, fac, kv, historicalReplica)
s := newStreaming(ctx, replica, fac, kv, tSafeReplica)
r, err := genSimpleReplica()
if err != nil {
return nil, err
@ -1304,15 +1311,18 @@ func genSimpleQueryNode(ctx context.Context) (*QueryNode, error) {
node.etcdKV = etcdKV
streaming, err := genSimpleStreaming(ctx)
node.tSafeReplica = newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, node.tSafeReplica)
if err != nil {
return nil, err
}
historical, err := genSimpleHistorical(ctx)
historical, err := genSimpleHistorical(ctx, node.tSafeReplica)
if err != nil {
return nil, err
}
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streaming.replica, historical.replica, node.tSafeReplica, node.msFactory)
node.streaming = streaming
node.historical = historical

View File

@ -47,7 +47,8 @@ func TestPlan_createSearchPlanByExpr(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
col, err := historical.replica.getCollectionByID(defaultCollectionID)

View File

@ -147,10 +147,15 @@ func (q *queryCollection) registerCollectionTSafe() error {
if err != nil {
return err
}
// historicalCollection, err := q.historical.replica.getCollectionByID(q.collectionID)
// if err != nil {
// return err
// }
log.Debug("register tSafe watcher and init watcher select case",
zap.Any("collectionID", collection.ID()),
zap.Any("dml channels", collection.getVChannels()),
// zap.Any("delta channels", collection.getVChannels()),
)
for _, channel := range collection.getVChannels() {
err = q.addTSafeWatcher(channel)
@ -158,6 +163,12 @@ func (q *queryCollection) registerCollectionTSafe() error {
return err
}
}
// for _, channel := range historicalCollection.getVChannels() {
// err := q.addTSafeWatcher(channel)
// if err != nil {
// return err
// }
// }
return nil
}
@ -1216,7 +1227,7 @@ func mergeRetrieveResults(retrieveResults []*segcorepb.RetrieveResults) (*segcor
// merge results and remove duplicates
for _, rr := range retrieveResults {
// skip empty result, it will break merge result
if rr == nil || rr.Offset == nil || len(rr.Offset) == 0 {
if rr == nil || len(rr.Offset) == 0 {
continue
}
@ -1234,7 +1245,7 @@ func mergeRetrieveResults(retrieveResults []*segcorepb.RetrieveResults) (*segcor
}
if len(ret.FieldsData) != len(rr.FieldsData) {
return nil, fmt.Errorf("mismatch FieldData in querynode RetrieveResults, expect %d get %d", len(ret.FieldsData), len(rr.FieldsData))
return nil, fmt.Errorf("mismatch FieldData in RetrieveResults")
}
dstIds := ret.Ids.GetIntId()

View File

@ -40,12 +40,13 @@ import (
)
func genSimpleQueryCollection(ctx context.Context, cancel context.CancelFunc) (*queryCollection, error) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
if err != nil {
return nil, err
}
streaming, err := genSimpleStreaming(ctx)
streaming, err := genSimpleStreaming(ctx, tSafe)
if err != nil {
return nil, err
}
@ -127,7 +128,10 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
assert.Nil(t, err)
schema := genTestCollectionSchema(0, false, 2)
historical := newHistorical(context.Background(), nil, nil, factory, etcdKV)
historicalReplica := newCollectionReplica(etcdKV)
tsReplica := newTSafeReplica()
streamingReplica := newCollectionReplica(etcdKV)
historical := newHistorical(context.Background(), historicalReplica, nil, nil, factory, etcdKV, tsReplica)
//add a segment to historical data
err = historical.replica.addCollection(0, schema)
@ -153,7 +157,7 @@ func TestQueryCollection_withoutVChannel(t *testing.T) {
assert.Nil(t, err)
//create a streaming
streaming := newStreaming(ctx, factory, etcdKV, historical.replica)
streaming := newStreaming(ctx, streamingReplica, factory, etcdKV, tsReplica)
err = streaming.replica.addCollection(0, schema)
assert.Nil(t, err)
err = streaming.replica.addPartition(0, 1)

View File

@ -79,6 +79,12 @@ type QueryNode struct {
historical *historical
streaming *streaming
// tSafeReplica
tSafeReplica TSafeReplicaInterface
// dataSyncService
dataSyncService *dataSyncService
// internal services
queryService *queryService
@ -179,13 +185,27 @@ func (node *QueryNode) Init() error {
zap.Any("EtcdEndpoints", Params.EtcdEndpoints),
zap.Any("MetaRootPath", Params.MetaRootPath),
)
node.tSafeReplica = newTSafeReplica()
streamingReplica := newCollectionReplica(node.etcdKV)
historicalReplica := newCollectionReplica(node.etcdKV)
node.historical = newHistorical(node.queryNodeLoopCtx,
historicalReplica,
node.rootCoord,
node.indexCoord,
node.msFactory,
node.etcdKV)
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory, node.etcdKV, node.historical.replica)
node.etcdKV,
node.tSafeReplica,
)
node.streaming = newStreaming(node.queryNodeLoopCtx,
streamingReplica,
node.msFactory,
node.etcdKV,
node.tSafeReplica,
)
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, streamingReplica, historicalReplica, node.tSafeReplica, node.msFactory)
node.InitSegcore()
@ -240,6 +260,9 @@ func (node *QueryNode) Stop() error {
node.queryNodeLoopCancel()
// close services
if node.dataSyncService != nil {
node.dataSyncService.close()
}
if node.historical != nil {
node.historical.close()
}

View File

@ -192,8 +192,12 @@ func newQueryNodeMock() *QueryNode {
panic(err)
}
svr := NewQueryNode(ctx, msFactory)
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, svr.msFactory, etcdKV)
svr.streaming = newStreaming(ctx, msFactory, etcdKV, svr.historical.replica)
tsReplica := newTSafeReplica()
streamingReplica := newCollectionReplica(etcdKV)
historicalReplica := newCollectionReplica(etcdKV)
svr.historical = newHistorical(svr.queryNodeLoopCtx, historicalReplica, nil, nil, svr.msFactory, etcdKV, tsReplica)
svr.streaming = newStreaming(ctx, streamingReplica, msFactory, etcdKV, tsReplica)
svr.dataSyncService = newDataSyncService(ctx, svr.streaming.replica, svr.historical.replica, tsReplica, msFactory)
svr.etcdKV = etcdKV
return svr

View File

@ -216,10 +216,11 @@ func TestQueryService_addQueryCollection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
his, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
his, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
str, err := genSimpleStreaming(ctx)
str, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
fac, err := genFactory()

View File

@ -37,7 +37,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
assert.NoError(t, err)
t.Run("test load segment", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
err = historical.replica.removeSegment(defaultSegmentID)
@ -68,7 +69,8 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
})
t.Run("test set segment error", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
err = historical.replica.removePartition(defaultPartitionID)
@ -104,7 +106,8 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
defer cancel()
runLoadSegmentFieldData := func(dataType schemapb.DataType) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
fieldUID := genConstantField(uidField)
@ -185,7 +188,8 @@ func TestSegmentLoader_invalid(t *testing.T) {
defer cancel()
t.Run("test no collection", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
@ -247,7 +251,8 @@ func TestSegmentLoader_invalid(t *testing.T) {
//})
t.Run("test no vec field 2", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
@ -291,7 +296,8 @@ func TestSegmentLoader_checkSegmentSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
err = historical.loader.checkSegmentSize(defaultSegmentID, map[UniqueID]int64{defaultSegmentID: 1024})
@ -307,7 +313,8 @@ func TestSegmentLoader_estimateSegmentSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
historical, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
historical, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
seg, err := historical.replica.getSegmentByID(defaultSegmentID)

View File

@ -886,7 +886,8 @@ func TestSegment_indexInfoTest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
h, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
seg, err := h.replica.getSegmentByID(defaultSegmentID)
@ -939,7 +940,8 @@ func TestSegment_indexInfoTest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h, err := genSimpleHistorical(ctx)
tSafe := newTSafeReplica()
h, err := genSimpleHistorical(ctx, tSafe)
assert.NoError(t, err)
seg, err := h.replica.getSegmentByID(defaultSegmentID)

View File

@ -28,23 +28,17 @@ import (
type streaming struct {
ctx context.Context
replica ReplicaInterface
historicalReplica ReplicaInterface
tSafeReplica TSafeReplicaInterface
replica ReplicaInterface
tSafeReplica TSafeReplicaInterface
dataSyncService *dataSyncService
msFactory msgstream.Factory
msFactory msgstream.Factory
}
func newStreaming(ctx context.Context, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, historicalReplica ReplicaInterface) *streaming {
replica := newCollectionReplica(etcdKV)
tReplica := newTSafeReplica()
newDS := newDataSyncService(ctx, replica, historicalReplica, tReplica, factory)
func newStreaming(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory, etcdKV *etcdkv.EtcdKV, tSafeReplica TSafeReplicaInterface) *streaming {
return &streaming{
replica: replica,
tSafeReplica: tReplica,
dataSyncService: newDS,
replica: replica,
tSafeReplica: tSafeReplica,
}
}
@ -55,10 +49,6 @@ func (s *streaming) start() {
func (s *streaming) close() {
// TODO: stop stats
if s.dataSyncService != nil {
s.dataSyncService.close()
}
// free collectionReplica
s.replica.freeAll()
}

View File

@ -22,7 +22,8 @@ func TestStreaming_streaming(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -34,7 +35,8 @@ func TestStreaming_search(t *testing.T) {
defer cancel()
t.Run("test search", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -52,7 +54,8 @@ func TestStreaming_search(t *testing.T) {
})
t.Run("test run empty partition", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -70,7 +73,8 @@ func TestStreaming_search(t *testing.T) {
})
t.Run("test run empty partition and loadCollection", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -95,7 +99,8 @@ func TestStreaming_search(t *testing.T) {
})
t.Run("test run empty partition and loadPartition", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -119,7 +124,8 @@ func TestStreaming_search(t *testing.T) {
})
t.Run("test no partitions in collection", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -140,7 +146,8 @@ func TestStreaming_search(t *testing.T) {
})
t.Run("test search failed", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()
@ -166,7 +173,8 @@ func TestStreaming_retrieve(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
tSafe := newTSafeReplica()
streaming, err := genSimpleStreaming(ctx, tSafe)
assert.NoError(t, err)
defer streaming.close()

View File

@ -220,15 +220,15 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
// create tSafe
for _, channel := range vChannels {
w.node.streaming.tSafeReplica.addTSafe(channel)
w.node.tSafeReplica.addTSafe(channel)
}
// add flow graph
if loadPartition {
w.node.streaming.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
w.node.dataSyncService.addPartitionFlowGraph(collectionID, partitionID, vChannels)
log.Debug("Query node add partition flow graphs", zap.Any("channels", vChannels))
} else {
w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
w.node.dataSyncService.addCollectionFlowGraph(collectionID, vChannels)
log.Debug("Query node add collection flow graphs", zap.Any("channels", vChannels))
}
@ -247,12 +247,12 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
// channels as consumer
var nodeFGs map[Channel]*queryNodeFlowGraph
if loadPartition {
nodeFGs, err = w.node.streaming.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels)
nodeFGs, err = w.node.dataSyncService.getPartitionFlowGraphs(partitionID, vChannels)
if err != nil {
return err
}
} else {
nodeFGs, err = w.node.streaming.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels)
nodeFGs, err = w.node.dataSyncService.getCollectionFlowGraphs(collectionID, vChannels)
if err != nil {
return err
}
@ -296,12 +296,12 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
// start flow graphs
if loadPartition {
err = w.node.streaming.dataSyncService.startPartitionFlowGraph(partitionID, vChannels)
err = w.node.dataSyncService.startPartitionFlowGraph(partitionID, vChannels)
if err != nil {
return err
}
} else {
err = w.node.streaming.dataSyncService.startCollectionFlowGraph(collectionID, vChannels)
err = w.node.dataSyncService.startCollectionFlowGraph(collectionID, vChannels)
if err != nil {
return err
}
@ -447,7 +447,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
)
// remove collection flow graph
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
r.node.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
// remove partition flow graphs which partitions belong to the target collection
partitionIDs, err := r.node.streaming.replica.getPartitionIDs(r.req.CollectionID)
@ -456,7 +456,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
return err
}
for _, partitionID := range partitionIDs {
r.node.streaming.dataSyncService.removePartitionFlowGraph(partitionID)
r.node.dataSyncService.removePartitionFlowGraph(partitionID)
}
// remove all tSafes of the target collection
@ -466,7 +466,7 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.streaming.tSafeReplica.removeTSafe(channel)
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
}
@ -554,8 +554,8 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
// release partitions
vChannels := sCol.getVChannels()
for _, id := range r.req.PartitionIDs {
if _, err = r.node.streaming.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil {
r.node.streaming.dataSyncService.removePartitionFlowGraph(id)
if _, err = r.node.dataSyncService.getPartitionFlowGraphs(id, vChannels); err == nil {
r.node.dataSyncService.removePartitionFlowGraph(id)
// remove all tSafes of the target partition
for _, channel := range vChannels {
log.Debug("Releasing tSafe in releasePartitionTask...",
@ -564,7 +564,7 @@ func (r *releasePartitionsTask) Execute(ctx context.Context) error {
zap.Any("vChannel", channel),
)
// no tSafe in tSafeReplica, don't return error
err = r.node.streaming.tSafeReplica.removeTSafe(channel)
err = r.node.tSafeReplica.removeTSafe(channel)
if err != nil {
log.Warn(err.Error())
}

View File

@ -392,7 +392,7 @@ func TestTask_releasePartitionTask(t *testing.T) {
req: genReleasePartitionsRequest(),
node: node,
}
task.node.streaming.dataSyncService.addPartitionFlowGraph(defaultCollectionID,
task.node.dataSyncService.addPartitionFlowGraph(defaultCollectionID,
defaultPartitionID,
[]Channel{defaultVChannel})
err = task.Execute(ctx)