Refactor flow graph and load/watchDML in query node (#5682)

* Refactor flow graph and load/watchDML in query node

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/5688/head^2
bigsheeper 2021-06-09 11:37:55 +08:00 committed by GitHub
parent cf8f52ee87
commit 8aae0f7cc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 635 additions and 429 deletions

View File

@ -1475,11 +1475,11 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Ge
},
SegmentIDs: segments,
})
log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
if err != nil {
resp.Status.Reason = err.Error()
return resp, nil
}
log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
resp.Status.Reason = infoResp.Status.Reason
return resp, nil

View File

@ -23,11 +23,13 @@ package querynode
*/
import "C"
import (
"unsafe"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"go.uber.org/zap"
"unsafe"
)
type Collection struct {
@ -61,6 +63,9 @@ func (c *Collection) removePartitionID(partitionID UniqueID) {
}
func (c *Collection) addWatchedDmChannels(channels []VChannel) {
log.Debug("add watch dm channels to collection",
zap.Any("channels", channels),
zap.Any("collectionID", c.ID()))
c.watchedChannels = append(c.watchedChannels, channels...)
}
@ -82,6 +87,7 @@ func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Co
collectionPtr: collection,
id: collectionID,
schema: schema,
watchedChannels: make([]VChannel, 0),
}
C.free(unsafe.Pointer(cSchemaBlob))

View File

@ -23,12 +23,11 @@ package querynode
*/
import "C"
import (
"errors"
"fmt"
"strconv"
"sync"
"errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"

View File

@ -13,127 +13,193 @@ package querynode
import (
"context"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
type flowGraphType = int32
const (
flowGraphTypeCollection = 0
flowGraphTypePartition = 1
)
type dataSyncService struct {
ctx context.Context
cancel context.CancelFunc
collectionID UniqueID
fg *flowgraph.TimeTickedFlowGraph
mu sync.Mutex // guards FlowGraphs
collectionFlowGraphs map[UniqueID][]*queryNodeFlowGraph // map[collectionID]flowGraphs
partitionFlowGraphs map[UniqueID][]*queryNodeFlowGraph // map[partitionID]flowGraphs
dmStream msgstream.MsgStream
msFactory msgstream.Factory
replica ReplicaInterface
streamingReplica ReplicaInterface
tSafeReplica TSafeReplicaInterface
msFactory msgstream.Factory
}
func newDataSyncService(ctx context.Context,
replica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
factory msgstream.Factory,
collectionID UniqueID) *dataSyncService {
// collection flow graph
func (dsService *dataSyncService) addCollectionFlowGraph(collectionID UniqueID, vChannels []string) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
fg: nil,
replica: replica,
tSafeReplica: tSafeReplica,
msFactory: factory,
if _, ok := dsService.collectionFlowGraphs[collectionID]; ok {
return errors.New("collection flow graph has been existed, collectionID = " + fmt.Sprintln(collectionID))
}
service.initNodes()
return service
}
func (dsService *dataSyncService) start() {
dsService.fg.Start()
}
func (dsService *dataSyncService) close() {
dsService.cancel()
if dsService.fg != nil {
dsService.fg.Close()
}
log.Debug("dataSyncService closed", zap.Int64("collectionID", dsService.collectionID))
}
func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
var dmStreamNode node = dsService.newDmInputNode(dsService.ctx)
var filterDmNode node = newFilteredDmNode(dsService.replica, dsService.collectionID)
var insertNode node = newInsertNode(dsService.replica, dsService.collectionID)
var serviceTimeNode node = newServiceTimeNode(dsService.ctx,
dsService.replica,
dsService.collectionFlowGraphs[collectionID] = make([]*queryNodeFlowGraph, 0)
for _, vChannel := range vChannels {
// collection flow graph doesn't need partition id
partitionID := UniqueID(0)
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
flowGraphTypeCollection,
collectionID,
partitionID,
dsService.streamingReplica,
dsService.tSafeReplica,
dsService.msFactory,
dsService.collectionID)
dsService.fg.AddNode(dmStreamNode)
dsService.fg.AddNode(filterDmNode)
dsService.fg.AddNode(insertNode)
dsService.fg.AddNode(serviceTimeNode)
// dmStreamNode
var err = dsService.fg.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name()))
}
// filterDmNode
err = dsService.fg.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name()},
[]string{insertNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", filterDmNode.Name()))
}
// insertNode
err = dsService.fg.SetEdges(insertNode.Name(),
[]string{filterDmNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", insertNode.Name()))
}
// serviceTimeNode
err = dsService.fg.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name()))
}
}
func (dsService *dataSyncService) seekSegment(position *internalpb.MsgPosition) error {
err := dsService.dmStream.Seek([]*internalpb.MsgPosition{position})
if err != nil {
return err
vChannel,
dsService.msFactory)
dsService.collectionFlowGraphs[collectionID] = append(dsService.collectionFlowGraphs[collectionID], newFlowGraph)
log.Debug("add collection flow graph",
zap.Any("collectionID", collectionID),
zap.Any("channel", vChannel))
}
return nil
}
func (dsService *dataSyncService) getCollectionFlowGraphs(collectionID UniqueID) ([]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionFlowGraphs[collectionID]; !ok {
return nil, errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID))
}
return dsService.collectionFlowGraphs[collectionID], nil
}
func (dsService *dataSyncService) startCollectionFlowGraph(collectionID UniqueID) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionFlowGraphs[collectionID]; !ok {
return errors.New("collection flow graph doesn't existed, collectionID = " + fmt.Sprintln(collectionID))
}
for _, fg := range dsService.collectionFlowGraphs[collectionID] {
// start flow graph
log.Debug("start flow graph", zap.Any("channel", fg.channel))
go fg.flowGraph.Start()
}
return nil
}
func (dsService *dataSyncService) removeCollectionFlowGraph(collectionID UniqueID) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.collectionFlowGraphs[collectionID]; ok {
for _, nodeFG := range dsService.collectionFlowGraphs[collectionID] {
// close flow graph
nodeFG.close()
}
dsService.collectionFlowGraphs[collectionID] = nil
}
delete(dsService.collectionFlowGraphs, collectionID)
}
// partition flow graph
func (dsService *dataSyncService) addPartitionFlowGraph(collectionID UniqueID, partitionID UniqueID, vChannels []string) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.partitionFlowGraphs[partitionID]; ok {
return errors.New("partition flow graph has been existed, partitionID = " + fmt.Sprintln(partitionID))
}
dsService.partitionFlowGraphs[partitionID] = make([]*queryNodeFlowGraph, 0)
for _, vChannel := range vChannels {
newFlowGraph := newQueryNodeFlowGraph(dsService.ctx,
flowGraphTypePartition,
collectionID,
partitionID,
dsService.streamingReplica,
dsService.tSafeReplica,
vChannel,
dsService.msFactory)
dsService.partitionFlowGraphs[partitionID] = append(dsService.partitionFlowGraphs[partitionID], newFlowGraph)
}
return nil
}
func (dsService *dataSyncService) getPartitionFlowGraphs(partitionID UniqueID) ([]*queryNodeFlowGraph, error) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.partitionFlowGraphs[partitionID]; !ok {
return nil, errors.New("partition flow graph doesn't existed, partitionID = " + fmt.Sprintln(partitionID))
}
return dsService.partitionFlowGraphs[partitionID], nil
}
func (dsService *dataSyncService) startPartitionFlowGraph(partitionID UniqueID) error {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.partitionFlowGraphs[partitionID]; !ok {
return errors.New("partition flow graph doesn't existed, partitionID = " + fmt.Sprintln(partitionID))
}
for _, fg := range dsService.partitionFlowGraphs[partitionID] {
// start flow graph
go fg.flowGraph.Start()
}
return nil
}
func (dsService *dataSyncService) removePartitionFlowGraph(partitionID UniqueID) {
dsService.mu.Lock()
defer dsService.mu.Unlock()
if _, ok := dsService.partitionFlowGraphs[partitionID]; ok {
for _, nodeFG := range dsService.partitionFlowGraphs[partitionID] {
// close flow graph
nodeFG.close()
}
dsService.partitionFlowGraphs[partitionID] = nil
}
delete(dsService.partitionFlowGraphs, partitionID)
}
func newDataSyncService(ctx context.Context,
streamingReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
factory msgstream.Factory) *dataSyncService {
return &dataSyncService{
ctx: ctx,
collectionFlowGraphs: make(map[UniqueID][]*queryNodeFlowGraph),
partitionFlowGraphs: make(map[UniqueID][]*queryNodeFlowGraph),
streamingReplica: streamingReplica,
tSafeReplica: tSafeReplica,
msFactory: factory,
}
}
func (dsService *dataSyncService) close() {
for _, nodeFGs := range dsService.collectionFlowGraphs {
for _, nodeFG := range nodeFGs {
if nodeFG != nil {
nodeFG.flowGraph.Close()
}
}
}
for _, nodeFGs := range dsService.partitionFlowGraphs {
for _, nodeFG := range nodeFGs {
if nodeFG != nil {
nodeFG.flowGraph.Close()
}
}
}
dsService.collectionFlowGraphs = make(map[UniqueID][]*queryNodeFlowGraph)
dsService.partitionFlowGraphs = make(map[UniqueID][]*queryNodeFlowGraph)
}

View File

@ -115,13 +115,11 @@ func TestDataSyncService_Start(t *testing.T) {
err := msFactory.SetParams(m)
assert.Nil(t, err)
// dataSync
node.streaming.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx,
node.streaming.replica,
node.streaming.tSafeReplica,
msFactory,
collectionID)
go node.streaming.dataSyncServices[collectionID].start()
channels := []VChannel{"0"}
err = node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, channels)
assert.NoError(t, err)
err = node.streaming.dataSyncService.startCollectionFlowGraph(collectionID)
assert.NoError(t, err)
<-node.queryNodeLoopCtx.Done()
node.Stop()

View File

@ -13,7 +13,6 @@ package querynode
import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -21,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
)
type ddNode struct {

View File

@ -12,20 +12,23 @@
package querynode
import (
"fmt"
"errors"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type filterDmNode struct {
baseNode
graphType flowGraphType
collectionID UniqueID
partitionID UniqueID
replica ReplicaInterface
}
@ -72,8 +75,6 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
// case commonpb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
log.Warn("Non supporting", zap.Int32("message type", int32(msg.Type())))
}
@ -102,9 +103,14 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
return nil
}
// if the flow graph type is partition, check if the partition is target partition
if fdmNode.graphType == flowGraphTypePartition && msg.PartitionID != fdmNode.partitionID {
return nil
}
// check if the segment is in excluded segments
excludedSegments, err := fdmNode.replica.getExcludedSegments(fdmNode.collectionID)
log.Debug("excluded segments", zap.String("segmentIDs", fmt.Sprintln(excludedSegments)))
//log.Debug("excluded segments", zap.String("segmentIDs", fmt.Sprintln(excludedSegments)))
if err != nil {
log.Error(err.Error())
return nil
@ -115,12 +121,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
}
}
// TODO: If the last record is drop type, all insert requests are invalid.
//if !records[len(records)-1].createOrDrop {
// return nil
//}
// Filter insert requests before last record.
if len(msg.RowIDs) != len(msg.Timestamps) || len(msg.RowIDs) != len(msg.RowData) {
// TODO: what if the messages are misaligned? Here, we ignore those messages and print error
log.Error("Error, misaligned messages detected")
@ -134,7 +134,11 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
return msg
}
func newFilteredDmNode(replica ReplicaInterface, collectionID UniqueID) *filterDmNode {
func newFilteredDmNode(replica ReplicaInterface,
graphType flowGraphType,
collectionID UniqueID,
partitionID UniqueID) *filterDmNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
@ -142,9 +146,16 @@ func newFilteredDmNode(replica ReplicaInterface, collectionID UniqueID) *filterD
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
if graphType != flowGraphTypeCollection && graphType != flowGraphTypePartition {
err := errors.New("invalid flow graph type")
log.Error(err.Error())
}
return &filterDmNode{
baseNode: baseNode,
graphType: graphType,
collectionID: collectionID,
partitionID: partitionID,
replica: replica,
}
}

View File

@ -12,11 +12,10 @@
package querynode
import (
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
type gcNode struct {

View File

@ -15,17 +15,17 @@ import (
"context"
"sync"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
)
type insertNode struct {
baseNode
collectionID UniqueID
replica ReplicaInterface
}
@ -84,15 +84,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
segment, err := iNode.replica.getSegmentByID(task.SegmentID)
if err != nil {
log.Error(err.Error())
continue
}
if segment.enableLoadBinLog {
continue
}
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
@ -132,20 +123,14 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID),
zap.Any("targetSegment", targetSegment),
zap.Error(err),
zap.Any("SegmentType", targetSegment.segmentType),
zap.Any("enableLoadBinLog", targetSegment.enableLoadBinLog),
)
if targetSegment.segmentType != segmentTypeGrowing || targetSegment.enableLoadBinLog {
if targetSegment.segmentType != segmentTypeGrowing {
wg.Done()
return
}
@ -175,12 +160,11 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
}
log.Debug("Do insert done", zap.Int("len", len(insertData.insertIDs[segmentID])),
zap.Int64("segmentID", segmentID),
zap.Int64("collectionID", iNode.collectionID))
zap.Int64("segmentID", segmentID))
wg.Done()
}
func newInsertNode(replica ReplicaInterface, collectionID UniqueID) *insertNode {
func newInsertNode(replica ReplicaInterface) *insertNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
@ -190,7 +174,6 @@ func newInsertNode(replica ReplicaInterface, collectionID UniqueID) *insertNode
return &insertNode{
baseNode: baseNode,
collectionID: collectionID,
replica: replica,
}
}

View File

@ -1,30 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
func (dsService *dataSyncService) newDmInputNode(ctx context.Context) *flowgraph.InputNode {
// query node doesn't need to consume any topic
insertStream, _ := dsService.msFactory.NewTtMsgStream(ctx)
dsService.dmStream = insertStream
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&insertStream, "dmInputNode", maxQueueLength, maxParallelism)
return node
}

View File

@ -0,0 +1,142 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
import (
"context"
"errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)
type queryNodeFlowGraph struct {
ctx context.Context
cancel context.CancelFunc
collectionID UniqueID
partitionID UniqueID
channel VChannel
flowGraph *flowgraph.TimeTickedFlowGraph
dmlStream msgstream.MsgStream
}
func newQueryNodeFlowGraph(ctx context.Context,
flowGraphType flowGraphType,
collectionID UniqueID,
partitionID UniqueID,
streamingReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
channel VChannel,
factory msgstream.Factory) *queryNodeFlowGraph {
ctx1, cancel := context.WithCancel(ctx)
q := &queryNodeFlowGraph{
ctx: ctx1,
cancel: cancel,
collectionID: collectionID,
partitionID: partitionID,
channel: channel,
flowGraph: flowgraph.NewTimeTickedFlowGraph(ctx1),
}
var dmStreamNode node = q.newDmInputNode(ctx1, factory)
var filterDmNode node = newFilteredDmNode(streamingReplica, flowGraphType, collectionID, partitionID)
var insertNode node = newInsertNode(streamingReplica)
var serviceTimeNode node = newServiceTimeNode(ctx1, tSafeReplica, collectionID, channel, factory)
q.flowGraph.AddNode(dmStreamNode)
q.flowGraph.AddNode(filterDmNode)
q.flowGraph.AddNode(insertNode)
q.flowGraph.AddNode(serviceTimeNode)
// dmStreamNode
var err = q.flowGraph.SetEdges(dmStreamNode.Name(),
[]string{},
[]string{filterDmNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", dmStreamNode.Name()))
}
// filterDmNode
err = q.flowGraph.SetEdges(filterDmNode.Name(),
[]string{dmStreamNode.Name()},
[]string{insertNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", filterDmNode.Name()))
}
// insertNode
err = q.flowGraph.SetEdges(insertNode.Name(),
[]string{filterDmNode.Name()},
[]string{serviceTimeNode.Name()},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", insertNode.Name()))
}
// serviceTimeNode
err = q.flowGraph.SetEdges(serviceTimeNode.Name(),
[]string{insertNode.Name()},
[]string{},
)
if err != nil {
log.Error("set edges failed in node:", zap.String("node name", serviceTimeNode.Name()))
}
return q
}
func (q *queryNodeFlowGraph) newDmInputNode(ctx context.Context, factory msgstream.Factory) *flowgraph.InputNode {
insertStream, err := factory.NewTtMsgStream(ctx)
if err != nil {
log.Error(err.Error())
} else {
q.dmlStream = insertStream
}
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
node := flowgraph.NewInputNode(&insertStream, "dmlInputNode", maxQueueLength, maxParallelism)
return node
}
func (q *queryNodeFlowGraph) consumerFlowGraph(channel VChannel, subName ConsumeSubName) error {
if q.dmlStream == nil {
return errors.New("null dml message stream in flow graph")
}
q.dmlStream.AsConsumer([]string{channel}, subName)
log.Debug("query node flow graph consumes from virtual channel", zap.Any("vChannel", channel))
return nil
}
func (q *queryNodeFlowGraph) seekQueryNodeFlowGraph(position *internalpb.MsgPosition) error {
err := q.dmlStream.Seek([]*internalpb.MsgPosition{position})
return err
}
func (q *queryNodeFlowGraph) close() {
q.cancel()
q.flowGraph.Close()
log.Debug("stop query node flow graph",
zap.Any("collectionID", q.collectionID),
zap.Any("partitionID", q.partitionID),
zap.Any("channel", q.channel),
)
}

View File

@ -13,21 +13,21 @@ package querynode
import (
"context"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"go.uber.org/zap"
)
type serviceTimeNode struct {
baseNode
collectionID UniqueID
replica ReplicaInterface
vChannel VChannel
tSafeReplica TSafeReplicaInterface
timeTickMsgStream msgstream.MsgStream
//timeTickMsgStream msgstream.MsgStream
}
func (stNode *serviceTimeNode) Name() string {
@ -35,7 +35,7 @@ func (stNode *serviceTimeNode) Name() string {
}
func (stNode *serviceTimeNode) Close() {
stNode.timeTickMsgStream.Close()
//stNode.timeTickMsgStream.Close()
}
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
@ -57,16 +57,16 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
// update service time
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(stNode.collectionID)
stNode.tSafeReplica.setTSafe(vChannel, serviceTimeMsg.timeRange.timestampMax)
channel := stNode.vChannel + strconv.FormatInt(stNode.collectionID, 10)
stNode.tSafeReplica.setTSafe(channel, serviceTimeMsg.timeRange.timestampMax)
//log.Debug("update tSafe:",
// zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)),
// zap.Int64("collectionID", stNode.collectionID))
// zap.Any("collectionID", stNode.collectionID),
//)
if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil {
log.Error("Error: send time tick into pulsar channel failed", zap.Error(err))
}
//if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil {
// log.Error("Error: send time tick into pulsar channel failed", zap.Error(err))
//}
var res Msg = &gcMsg{
gcRecord: serviceTimeMsg.gcRecord,
@ -75,32 +75,32 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
return []Msg{res}
}
func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: 0,
Timestamp: ts,
SourceID: Params.QueryNodeID,
},
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
return stNode.timeTickMsgStream.Produce(&msgPack)
}
//func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
// msgPack := msgstream.MsgPack{}
// timeTickMsg := msgstream.TimeTickMsg{
// BaseMsg: msgstream.BaseMsg{
// BeginTimestamp: ts,
// EndTimestamp: ts,
// HashValues: []uint32{0},
// },
// TimeTickMsg: internalpb.TimeTickMsg{
// Base: &commonpb.MsgBase{
// MsgType: commonpb.MsgType_TimeTick,
// MsgID: 0,
// Timestamp: ts,
// SourceID: Params.QueryNodeID,
// },
// },
// }
// msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
// return stNode.timeTickMsgStream.Produce(&msgPack)
//}
func newServiceTimeNode(ctx context.Context,
replica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
factory msgstream.Factory,
collectionID UniqueID) *serviceTimeNode {
collectionID UniqueID,
channel VChannel,
factory msgstream.Factory) *serviceTimeNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
@ -109,15 +109,19 @@ func newServiceTimeNode(ctx context.Context,
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
timeTimeMsgStream, _ := factory.NewMsgStream(ctx)
timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName})
log.Debug("querynode AsProducer: " + Params.QueryTimeTickChannelName)
//timeTimeMsgStream, err := factory.NewMsgStream(ctx)
//if err != nil {
// log.Error(err.Error())
//} else {
// timeTimeMsgStream.AsProducer([]string{Params.QueryTimeTickChannelName})
// log.Debug("query node AsProducer: " + Params.QueryTimeTickChannelName)
//}
return &serviceTimeNode{
baseNode: baseNode,
collectionID: collectionID,
replica: replica,
vChannel: channel,
tSafeReplica: tSafeReplica,
timeTickMsgStream: timeTimeMsgStream,
//timeTickMsgStream: timeTimeMsgStream,
}
}

View File

@ -20,12 +20,11 @@ package querynode
*/
import "C"
import (
"errors"
"path/filepath"
"strconv"
"unsafe"
"errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"

View File

@ -22,8 +22,9 @@ package querynode
import "C"
import (
"errors"
"github.com/milvus-io/milvus/internal/proto/planpb"
"unsafe"
"github.com/milvus-io/milvus/internal/proto/planpb"
)
type Plan struct {

View File

@ -122,7 +122,7 @@ func (node *QueryNode) Init() error {
node.dataService,
node.indexService,
node.msFactory)
node.streaming = newStreaming()
node.streaming = newStreaming(node.queryNodeLoopCtx, node.msFactory)
C.SegcoreInit()
registerReq := &queryPb.RegisterNodeRequest{

View File

@ -173,14 +173,17 @@ func newQueryNodeMock() *QueryNode {
}()
}
msFactory := msgstream.NewPmsFactory()
msFactory, err := newMessageStreamFactory()
if err != nil {
panic(err)
}
svr := NewQueryNode(ctx, Params.QueryNodeID, msFactory)
err := svr.SetQueryService(&queryServiceMock{})
err = svr.SetQueryService(&queryServiceMock{})
if err != nil {
panic(err)
}
svr.historical = newHistorical(svr.queryNodeLoopCtx, nil, nil, nil, svr.msFactory)
svr.streaming = newStreaming()
svr.streaming = newStreaming(ctx, msFactory)
return svr
}

View File

@ -15,6 +15,8 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"github.com/golang/protobuf/proto"
@ -43,8 +45,8 @@ type retrieveCollection struct {
unsolvedMsgMu sync.Mutex
unsolvedMsg []*msgstream.RetrieveMsg
tSafeMutex sync.Mutex
tSafeWatcher *tSafeWatcher
tSafeWatchers map[VChannel]*tSafeWatcher
watcherSelectCase []reflect.SelectCase
serviceableTimeMutex sync.Mutex
serviceableTime Timestamp
@ -72,13 +74,15 @@ func newRetrieveCollection(releaseCtx context.Context,
streamingReplica: streamingReplica,
tSafeReplica: tSafeReplica,
tSafeWatchers: make(map[VChannel]*tSafeWatcher),
msgBuffer: msgBuffer,
unsolvedMsg: unsolvedMsg,
retrieveResultMsgStream: retrieveResultStream,
}
rc.register(collectionID)
rc.register()
return rc
}
@ -101,13 +105,20 @@ func (rc *retrieveCollection) setServiceableTime(t Timestamp) {
}
func (rc *retrieveCollection) waitNewTSafe() Timestamp {
// block until dataSyncService updating tSafe
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(rc.collectionID)
// block until dataSyncService updating tSafe
rc.tSafeWatcher.hasUpdate()
ts := rc.tSafeReplica.getTSafe(vChannel)
return ts
// block until any vChannel updating tSafe
_, _, recvOK := reflect.Select(rc.watcherSelectCase)
if !recvOK {
log.Error("tSafe has been closed")
return invalidTimestamp
}
t := Timestamp(math.MaxInt64)
for channel := range rc.tSafeWatchers {
ts := rc.tSafeReplica.getTSafe(channel)
if ts <= t {
t = ts
}
}
return t
}
func (rc *retrieveCollection) start() {
@ -115,13 +126,24 @@ func (rc *retrieveCollection) start() {
go rc.doUnsolvedMsgRetrieve()
}
func (rc *retrieveCollection) register(collectionID UniqueID) {
vChannel := collectionIDToChannel(collectionID)
rc.tSafeReplica.addTSafe(vChannel)
rc.tSafeMutex.Lock()
rc.tSafeWatcher = newTSafeWatcher()
rc.tSafeMutex.Unlock()
rc.tSafeReplica.registerTSafeWatcher(vChannel, rc.tSafeWatcher)
func (rc *retrieveCollection) register() {
// register tSafe watcher and init watcher select case
collection, err := rc.historicalReplica.getCollectionByID(rc.collectionID)
if err != nil {
log.Error(err.Error())
return
}
rc.watcherSelectCase = make([]reflect.SelectCase, 0)
for _, channel := range collection.getWatchedDmChannels() {
rc.tSafeReplica.addTSafe(channel)
rc.tSafeWatchers[channel] = newTSafeWatcher()
rc.tSafeReplica.registerTSafeWatcher(channel, rc.tSafeWatchers[channel])
rc.watcherSelectCase = append(rc.watcherSelectCase, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(rc.tSafeWatchers[channel].watcherChan()),
})
}
}
func (rc *retrieveCollection) addToUnsolvedMsg(msg *msgstream.RetrieveMsg) {

View File

@ -16,10 +16,11 @@ import (
"errors"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
)
type retrieveService struct {

View File

@ -14,9 +14,12 @@ package querynode
import (
"context"
"errors"
"math"
"reflect"
"sync"
"github.com/golang/protobuf/proto"
oplog "github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
@ -26,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
oplog "github.com/opentracing/opentracing-go/log"
)
type searchCollection struct {
@ -42,8 +44,8 @@ type searchCollection struct {
unsolvedMsgMu sync.Mutex // guards unsolvedMsg
unsolvedMsg []*msgstream.SearchMsg
tSafeMutex sync.Mutex
tSafeWatcher *tSafeWatcher
tSafeWatchers map[VChannel]*tSafeWatcher
watcherSelectCase []reflect.SelectCase
serviceableTimeMutex sync.Mutex // guards serviceableTime
serviceableTime Timestamp
@ -60,6 +62,7 @@ func newSearchCollection(releaseCtx context.Context,
streamingReplica ReplicaInterface,
tSafeReplica TSafeReplicaInterface,
searchResultStream msgstream.MsgStream) *searchCollection {
receiveBufSize := Params.SearchReceiveBufSize
msgBuffer := make(chan *msgstream.SearchMsg, receiveBufSize)
unsolvedMsg := make([]*msgstream.SearchMsg, 0)
@ -73,13 +76,15 @@ func newSearchCollection(releaseCtx context.Context,
streamingReplica: streamingReplica,
tSafeReplica: tSafeReplica,
tSafeWatchers: make(map[VChannel]*tSafeWatcher),
msgBuffer: msgBuffer,
unsolvedMsg: unsolvedMsg,
searchResultMsgStream: searchResultStream,
}
sc.register(collectionID)
sc.register()
return sc
}
@ -88,14 +93,25 @@ func (s *searchCollection) start() {
go s.doUnsolvedMsgSearch()
}
func (s *searchCollection) register(collectionID UniqueID) {
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(collectionID)
s.tSafeReplica.addTSafe(vChannel)
s.tSafeMutex.Lock()
s.tSafeWatcher = newTSafeWatcher()
s.tSafeMutex.Unlock()
s.tSafeReplica.registerTSafeWatcher(vChannel, s.tSafeWatcher)
func (s *searchCollection) register() {
collection, err := s.streamingReplica.getCollectionByID(s.collectionID)
if err != nil {
log.Error(err.Error())
return
}
s.watcherSelectCase = make([]reflect.SelectCase, 0)
log.Debug("register tSafe watcher and init watcher select case",
zap.Any("dml channels", collection.getWatchedDmChannels()),
zap.Any("collectionID", collection.ID()))
for _, channel := range collection.getWatchedDmChannels() {
s.tSafeWatchers[channel] = newTSafeWatcher()
s.tSafeReplica.registerTSafeWatcher(channel, s.tSafeWatchers[channel])
s.watcherSelectCase = append(s.watcherSelectCase, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.tSafeWatchers[channel].watcherChan()),
})
}
}
func (s *searchCollection) addToUnsolvedMsg(msg *msgstream.SearchMsg) {
@ -113,12 +129,20 @@ func (s *searchCollection) popAllUnsolvedMsg() []*msgstream.SearchMsg {
}
func (s *searchCollection) waitNewTSafe() Timestamp {
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(s.collectionID)
// block until dataSyncService updating tSafe
s.tSafeWatcher.hasUpdate()
ts := s.tSafeReplica.getTSafe(vChannel)
return ts
// block until any vChannel updating tSafe
_, _, recvOK := reflect.Select(s.watcherSelectCase)
if !recvOK {
log.Error("tSafe has been closed", zap.Any("collectionID", s.collectionID))
return invalidTimestamp
}
t := Timestamp(math.MaxInt64)
for channel := range s.tSafeWatchers {
ts := s.tSafeReplica.getTSafe(channel)
if ts <= t {
t = ts
}
}
return t
}
func (s *searchCollection) getServiceableTime() Timestamp {
@ -129,6 +153,12 @@ func (s *searchCollection) getServiceableTime() Timestamp {
func (s *searchCollection) setServiceableTime(t Timestamp) {
s.serviceableTimeMutex.Lock()
defer s.serviceableTimeMutex.Unlock()
if t < s.serviceableTime {
return
}
gracefulTimeInMilliSecond := Params.GracefulTime
if gracefulTimeInMilliSecond > 0 {
gracefulTime := tsoutil.ComposeTS(gracefulTimeInMilliSecond, 0)
@ -136,7 +166,6 @@ func (s *searchCollection) setServiceableTime(t Timestamp) {
} else {
s.serviceableTime = t
}
s.serviceableTimeMutex.Unlock()
}
func (s *searchCollection) emptySearch(searchMsg *msgstream.SearchMsg) {
@ -174,6 +203,7 @@ func (s *searchCollection) receiveSearchMsg() {
zap.Any("serviceTime", st),
zap.Any("delta seconds", (sm.BeginTs()-serviceTime)/(1000*1000*1000)),
zap.Any("collectionID", s.collectionID),
zap.Any("msgID", sm.ID()),
)
s.addToUnsolvedMsg(sm)
sp.LogFields(
@ -216,7 +246,7 @@ func (s *searchCollection) doUnsolvedMsgSearch() {
default:
serviceTime := s.waitNewTSafe()
s.setServiceableTime(serviceTime)
log.Debug("querynode::doUnsolvedMsgSearch: setServiceableTime",
log.Debug("query node::doUnsolvedMsgSearch: setServiceableTime",
zap.Any("serviceTime", serviceTime),
)
log.Debug("get tSafe from flow graph",

View File

@ -17,10 +17,11 @@ import (
"errors"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
)
type searchService struct {
@ -78,6 +79,7 @@ func newSearchService(ctx context.Context,
}
func (s *searchService) start() {
log.Debug("start search service")
s.searchMsgStream.Start()
s.searchResultMsgStream.Start()
s.startEmptySearchCollection()
@ -145,6 +147,7 @@ func (s *searchService) consumeSearch() {
}
func (s *searchService) close() {
log.Debug("search service closed")
if s.searchMsgStream != nil {
s.searchMsgStream.Close()
}

View File

@ -140,18 +140,6 @@ func TestSearch_Search(t *testing.T) {
msFactory, err := newMessageStreamFactory()
assert.NoError(t, err)
// start dataSync
newDS := newDataSyncService(node.queryNodeLoopCtx,
node.streaming.replica,
node.streaming.tSafeReplica,
msFactory,
collectionID)
err = node.streaming.addDataSyncService(collectionID, newDS)
assert.NoError(t, err)
ds, err := node.streaming.getDataSyncService(collectionID)
assert.NoError(t, err)
go ds.start()
// start search service
node.searchService = newSearchService(node.queryNodeLoopCtx,
node.historical.replica,
@ -161,10 +149,6 @@ func TestSearch_Search(t *testing.T) {
go node.searchService.start()
node.searchService.startSearchCollection(collectionID)
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(collectionID)
node.streaming.tSafeReplica.setTSafe(vChannel, 1000)
// load segment
err = node.historical.replica.addSegment(segmentID, defaultPartitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)
@ -196,18 +180,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
msFactory, err := newMessageStreamFactory()
assert.NoError(t, err)
// start dataSync
newDS := newDataSyncService(node.queryNodeLoopCtx,
node.streaming.replica,
node.streaming.tSafeReplica,
msFactory,
collectionID)
err = node.streaming.addDataSyncService(collectionID, newDS)
assert.NoError(t, err)
ds, err := node.streaming.getDataSyncService(collectionID)
assert.NoError(t, err)
go ds.start()
// start search service
node.searchService = newSearchService(node.queryNodeLoopCtx,
node.streaming.replica,
@ -217,10 +189,6 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
go node.searchService.start()
node.searchService.startSearchCollection(collectionID)
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(collectionID)
node.streaming.tSafeReplica.setTSafe(vChannel, 1000)
// load segments
err = node.historical.replica.addSegment(segmentID1, defaultPartitionID, collectionID, segmentTypeSealed)
assert.NoError(t, err)

View File

@ -23,6 +23,7 @@ package querynode
*/
import "C"
import (
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/proto/planpb"
"strconv"
@ -32,8 +33,6 @@ import (
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"errors"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
)

View File

@ -12,27 +12,30 @@
package querynode
import (
"errors"
"fmt"
"sync"
"context"
"github.com/milvus-io/milvus/internal/msgstream"
)
type streaming struct {
ctx context.Context
replica ReplicaInterface
tSafeReplica TSafeReplicaInterface
dsServicesMu sync.Mutex // guards dataSyncServices
dataSyncServices map[UniqueID]*dataSyncService
dataSyncService *dataSyncService
msFactory msgstream.Factory
}
func newStreaming() *streaming {
func newStreaming(ctx context.Context, factory msgstream.Factory) *streaming {
replica := newCollectionReplica()
tReplica := newTSafeReplica()
ds := make(map[UniqueID]*dataSyncService)
newDS := newDataSyncService(ctx, replica, tReplica, factory)
return &streaming{
replica: replica,
tSafeReplica: tReplica,
dataSyncServices: ds,
dataSyncService: newDS,
}
}
@ -42,37 +45,11 @@ func (s *streaming) start() {
func (s *streaming) close() {
// TODO: stop stats
for _, ds := range s.dataSyncServices {
ds.close()
if s.dataSyncService != nil {
s.dataSyncService.close()
}
s.dataSyncServices = make(map[UniqueID]*dataSyncService)
// free collectionReplica
s.replica.freeAll()
}
func (s *streaming) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
ds, ok := s.dataSyncServices[collectionID]
if !ok {
return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
}
return ds, nil
}
func (s *streaming) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
if _, ok := s.dataSyncServices[collectionID]; ok {
return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
}
s.dataSyncServices[collectionID] = ds
return nil
}
func (s *streaming) removeDataSyncService(collectionID UniqueID) {
s.dsServicesMu.Lock()
defer s.dsServicesMu.Unlock()
delete(s.dataSyncServices, collectionID)
}

View File

@ -18,11 +18,11 @@ import (
"math/rand"
"strconv"
"strings"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
)
@ -110,35 +110,35 @@ func (w *watchDmChannelsTask) PreExecute(ctx context.Context) error {
func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs)))
// TODO: pass load type, col or partition
// 1. init channels in collection meta
collectionID := w.req.CollectionID
ds, err := w.node.streaming.getDataSyncService(collectionID)
if err != nil || ds.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
log.Error(errMsg)
return errors.New(errMsg)
// TODO: Remove this and use unique vChannel
channelTmp := make([]string, 0)
for _, channel := range w.req.ChannelIDs {
channelTmp = append(channelTmp, channel+strconv.FormatInt(collectionID, 10))
}
switch t := ds.dmStream.(type) {
case *msgstream.MqTtMsgStream:
default:
_ = t
errMsg := "type assertion failed for dm message stream"
log.Error(errMsg)
return errors.New(errMsg)
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
if err != nil {
log.Error(err.Error())
return err
}
collection.addWatchedDmChannels(channelTmp)
// 2. get subscription name
getUniqueSubName := func() string {
prefixName := Params.MsgChannelSubName
return prefixName + "-" + strconv.FormatInt(collectionID, 10)
}
consumeSubName := getUniqueSubName()
// add request channel
// 3. group channels by to seeking or consuming
consumeChannels := w.req.ChannelIDs
toSeekInfo := make([]*internalpb.MsgPosition, 0)
toDirSubChannels := make([]string, 0)
consumeSubName := getUniqueSubName()
for _, info := range w.req.Infos {
if len(info.Pos.MsgID) == 0 {
toDirSubChannels = append(toDirSubChannels, info.ChannelID)
@ -155,23 +155,61 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) error {
}
}
ds.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
// 4. add flow graph
err = w.node.streaming.dataSyncService.addCollectionFlowGraph(collectionID, consumeChannels)
if err != nil {
return err
}
log.Debug("query node add flow graphs, channels = " + strings.Join(consumeChannels, ", "))
// 5. channels as consumer
nodeFGs, err := w.node.streaming.dataSyncService.getCollectionFlowGraphs(collectionID)
if err != nil {
return err
}
for _, channel := range toDirSubChannels {
for _, fg := range nodeFGs {
if fg.channel == channel {
err := fg.consumerFlowGraph(channel, consumeSubName)
if err != nil {
errMsg := "msgStream consume error :" + err.Error()
log.Error(errMsg)
return errors.New(errMsg)
}
}
}
}
log.Debug("as consumer channels", zap.Any("channels", consumeChannels))
// 6. seek channel
for _, pos := range toSeekInfo {
err := ds.dmStream.Seek([]*internalpb.MsgPosition{pos})
for _, fg := range nodeFGs {
if fg.channel == pos.ChannelName {
err := fg.seekQueryNodeFlowGraph(pos)
if err != nil {
errMsg := "msgStream seek error :" + err.Error()
log.Error(errMsg)
return errors.New(errMsg)
}
}
}
}
collection, err := w.node.streaming.replica.getCollectionByID(collectionID)
// add tSafe
for _, channel := range channelTmp {
w.node.streaming.tSafeReplica.addTSafe(channel)
}
// 7. start search collection
w.node.searchService.startSearchCollection(collectionID)
log.Debug("start search collection", zap.Any("collectionID", collectionID))
// 8. start flow graphs
err = w.node.streaming.dataSyncService.startCollectionFlowGraph(collectionID)
if err != nil {
log.Error(err.Error())
return err
}
collection.addWatchedDmChannels(w.req.ChannelIDs)
log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(w.req.ChannelIDs)))
return nil
}
@ -229,19 +267,6 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
}
l.node.streaming.replica.initExcludedSegments(collectionID)
newDS := newDataSyncService(l.node.queryNodeLoopCtx,
l.node.streaming.replica,
l.node.streaming.tSafeReplica,
l.node.msFactory,
collectionID)
// ignore duplicated dataSyncService error
_ = l.node.streaming.addDataSyncService(collectionID, newDS)
ds, err := l.node.streaming.getDataSyncService(collectionID)
if err != nil {
return err
}
go ds.start()
l.node.searchService.startSearchCollection(collectionID)
}
if !hasPartitionInHistorical {
err := l.node.historical.replica.addPartition(collectionID, partitionID)
@ -302,15 +327,19 @@ func (r *releaseCollectionTask) PreExecute(ctx context.Context) error {
}
func (r *releaseCollectionTask) Execute(ctx context.Context) error {
ds, err := r.node.streaming.getDataSyncService(r.req.CollectionID)
if err == nil && ds != nil {
ds.close()
r.node.streaming.removeDataSyncService(r.req.CollectionID)
// TODO: remove and use vChannel
vChannel := collectionIDToChannel(r.req.CollectionID)
r.node.streaming.tSafeReplica.removeTSafe(vChannel)
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
log.Debug("receive release collection task", zap.Any("collectionID", r.req.CollectionID))
r.node.streaming.dataSyncService.removeCollectionFlowGraph(r.req.CollectionID)
collection, err := r.node.historical.replica.getCollectionByID(r.req.CollectionID)
if err != nil {
log.Error(err.Error())
} else {
// remove all tSafes of the target collection
for _, channel := range collection.getWatchedDmChannels() {
r.node.streaming.tSafeReplica.removeTSafe(channel)
}
}
r.node.streaming.replica.removeExcludedSegments(r.req.CollectionID)
if r.node.searchService.hasSearchCollection(r.req.CollectionID) {
r.node.searchService.stopSearchCollection(r.req.CollectionID)
@ -326,12 +355,15 @@ func (r *releaseCollectionTask) Execute(ctx context.Context) error {
hasCollectionInStreaming := r.node.streaming.replica.hasCollection(r.req.CollectionID)
if hasCollectionInStreaming {
err := r.node.streaming.replica.removePartition(r.req.CollectionID)
err := r.node.streaming.replica.removeCollection(r.req.CollectionID)
if err != nil {
return err
}
}
// TODO: for debugging, remove this
time.Sleep(2 * time.Second)
log.Debug("ReleaseCollection done", zap.Int64("collectionID", r.req.CollectionID))
return nil
}

View File

@ -31,10 +31,15 @@ func (watcher *tSafeWatcher) notify() {
}
}
// deprecated
func (watcher *tSafeWatcher) hasUpdate() {
<-watcher.notifyChan
}
func (watcher *tSafeWatcher) watcherChan() <-chan bool {
return watcher.notifyChan
}
type tSafer interface {
get() Timestamp
set(t Timestamp)

View File

@ -13,11 +13,7 @@ package querynode
import (
"errors"
"fmt"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/log"
)
// TSafeReplicaInterface is the interface wrapper of tSafeReplica
@ -39,6 +35,7 @@ func (t *tSafeReplica) getTSafe(vChannel VChannel) Timestamp {
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
//log.Error("get tSafe failed", zap.Error(err))
return 0
}
return safer.get()
@ -49,6 +46,7 @@ func (t *tSafeReplica) setTSafe(vChannel VChannel, timestamp Timestamp) {
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
//log.Error("set tSafe failed", zap.Error(err))
return
}
safer.set(timestamp)
@ -57,7 +55,7 @@ func (t *tSafeReplica) setTSafe(vChannel VChannel, timestamp Timestamp) {
func (t *tSafeReplica) getTSaferPrivate(vChannel VChannel) (tSafer, error) {
if _, ok := t.tSafes[vChannel]; !ok {
err := errors.New("cannot found tSafer, vChannel = " + vChannel)
log.Error(err.Error())
//log.Error(err.Error())
return nil, err
}
return t.tSafes[vChannel], nil
@ -67,6 +65,7 @@ func (t *tSafeReplica) addTSafe(vChannel VChannel) {
t.mu.Lock()
defer t.mu.Unlock()
t.tSafes[vChannel] = newTSafe()
//log.Debug("add tSafe done", zap.Any("channel", vChannel))
}
func (t *tSafeReplica) removeTSafe(vChannel VChannel) {
@ -85,25 +84,12 @@ func (t *tSafeReplica) registerTSafeWatcher(vChannel VChannel, watcher *tSafeWat
defer t.mu.Unlock()
safer, err := t.getTSaferPrivate(vChannel)
if err != nil {
//log.Error("register tSafe watcher failed", zap.Error(err))
return
}
safer.registerTSafeWatcher(watcher)
}
// TODO: remove and use real vChannel
func collectionIDToChannel(collectionID UniqueID) VChannel {
return fmt.Sprintln(collectionID)
}
// TODO: remove and use real vChannel
func channelTOCollectionID(channel VChannel) UniqueID {
collectionID, err := strconv.ParseInt(channel, 10, 64)
if err != nil {
return 0
}
return collectionID
}
func newTSafeReplica() TSafeReplicaInterface {
var replica TSafeReplicaInterface = &tSafeReplica{
tSafes: make(map[string]tSafer),

View File

@ -20,6 +20,8 @@ const (
timestampFieldID = 1
)
const invalidTimestamp = Timestamp(0)
type (
UniqueID = typeutil.UniqueID
// Timestamp is timestamp

View File

@ -58,7 +58,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
wg.Done()
//fmt.Println(nodeCtx.node.Name(), "closed")
fmt.Println(nodeCtx.node.Name(), "closed")
return
default:
// inputs from inputsMessages for Operate