Support sync all segments while close (#21421)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/21555/head
Xiaofan 2023-01-06 14:49:36 +08:00 committed by GitHub
parent 6fb3542f2a
commit 4b4944ecee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 782 additions and 248 deletions

View File

@ -47,6 +47,8 @@ type DelBufferManager struct {
}
func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 {
bm.mu.Lock()
defer bm.mu.Unlock()
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.item.memorySize
}
@ -54,6 +56,8 @@ func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 {
}
func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 {
bm.mu.Lock()
defer bm.mu.Unlock()
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.GetEntriesNum()
}

View File

@ -191,10 +191,10 @@ func (c *ChannelMeta) maxRowCountPerSegment(ts Timestamp) (int64, error) {
// Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`.
func (c *ChannelMeta) addSegment(req addSegmentReq) error {
if req.collID != c.collectionID {
log.Warn("collection mismatch",
log.Warn("failed to addSegment, collection mismatch",
zap.Int64("current collection ID", req.collID),
zap.Int64("expected collection ID", c.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", req.collID)
return fmt.Errorf("failed to addSegment, mismatch collection, ID=%d", req.collID)
}
log.Info("adding segment",
zap.String("type", req.segType.String()),
@ -500,8 +500,11 @@ func (c *ChannelMeta) getCollectionID() UniqueID {
//
// If you want the latest collection schema, ts should be 0.
func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
if !c.validCollection(collID) {
return nil, fmt.Errorf("mismatch collection, want %d, actual %d", c.collectionID, collID)
if collID != c.collectionID {
log.Warn("failed to getCollectionSchema, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return nil, fmt.Errorf("failed to getCollectionSchema, mismatch collection, want %d, actual %d", c.collectionID, collID)
}
c.schemaMut.RLock()
@ -524,12 +527,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
return c.collSchema, nil
}
func (c *ChannelMeta) validCollection(collID UniqueID) bool {
return collID == c.collectionID
}
func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
log := log.With(
zap.Int64("segment ID", seg.segmentID),
zap.Int64("collection ID", seg.collectionID),
@ -539,9 +537,10 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
zap.String("channel name", c.channelName))
if seg.collectionID != c.collectionID {
log.Warn("Mismatch collection",
zap.Int64("expected collectionID", c.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", seg.collectionID)
log.Warn("failed to mergeFlushedSegments, collection mismatch",
zap.Int64("current collection ID", seg.collectionID),
zap.Int64("expected collection ID", c.collectionID))
return fmt.Errorf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID)
}
compactedFrom = lo.Filter(compactedFrom, func(segID int64, _ int) bool {
@ -578,10 +577,10 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac
// for tests only
func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, numOfRows int64, ids storage.FieldData) error {
if collID != c.collectionID {
log.Warn("Mismatch collection",
zap.Int64("input ID", collID),
zap.Int64("expected ID", c.collectionID))
return fmt.Errorf("mismatch collection, ID=%d", collID)
log.Warn("failed to addFlushedSegmentWithPKs, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return fmt.Errorf("failed to addFlushedSegmentWithPKs, mismatch collection, ID=%d", collID)
}
log.Info("Add Flushed segment",

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"go.uber.org/zap"
@ -44,10 +45,10 @@ type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages
flushCh chan flushMsg // chan to notify flush
resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message.
channel Channel // channel stores meta of channel
idAllocator allocatorInterface // id/timestamp allocator
flushCh chan flushMsg
resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message.
channel Channel // channel stores meta of channel
idAllocator allocatorInterface // id/timestamp allocator
msFactory msgstream.Factory
collectionID UniqueID // collection id of vchan for which this data sync service serves
vchannelName string
@ -59,6 +60,9 @@ type dataSyncService struct {
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
compactor *compactionExecutor // reference to compaction executor
stopOnce sync.Once
flushListener chan *segmentFlushPack // chan to listen flush event
}
func newDataSyncService(ctx context.Context,
@ -132,7 +136,7 @@ func newParallelConfig() parallelConfig {
return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32(), Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()}
}
// start starts the flow graph in datasyncservice
// start the flow graph in datasyncservice
func (dsService *dataSyncService) start() {
if dsService.fg != nil {
log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID),
@ -145,18 +149,20 @@ func (dsService *dataSyncService) start() {
}
func (dsService *dataSyncService) close() {
if dsService.fg != nil {
log.Info("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
dsService.fg.Close()
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel
}
dsService.stopOnce.Do(func() {
if dsService.fg != nil {
log.Info("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
dsService.fg.Close()
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel
}
dsService.clearGlobalFlushingCache()
dsService.cancelFn()
dsService.flushManager.close()
dsService.clearGlobalFlushingCache()
close(dsService.flushCh)
dsService.flushManager.close()
dsService.cancelFn()
})
}
func (dsService *dataSyncService) clearGlobalFlushingCache() {
@ -171,6 +177,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel,
flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService))
log.Info("begin to init data sync service", zap.Int64("collection", vchanInfo.CollectionID),
zap.String("Chan", vchanInfo.ChannelName),
zap.Int64s("unflushed", vchanInfo.GetUnflushedSegmentIds()),
zap.Int64s("flushed", vchanInfo.GetFlushedSegmentIds()),
)
var err error
// recover segment checkpoints
unflushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetUnflushedSegmentIds())
@ -187,7 +198,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
for _, us := range unflushedSegmentInfos {
if us.CollectionID != dsService.collectionID ||
us.GetInsertChannel() != vchanInfo.ChannelName {
log.Warn("Collection ID or ChannelName not compact",
log.Warn("Collection ID or ChannelName not match",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", us.CollectionID),
zap.String("Wanted Channel Name", vchanInfo.ChannelName),
@ -224,7 +235,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
for _, fs := range flushedSegmentInfos {
if fs.CollectionID != dsService.collectionID ||
fs.GetInsertChannel() != vchanInfo.ChannelName {
log.Warn("Collection ID or ChannelName not compact",
log.Warn("Collection ID or ChannelName not match",
zap.Int64("Wanted ID", dsService.collectionID),
zap.Int64("Actual ID", fs.CollectionID),
zap.String("Wanted Channel Name", vchanInfo.ChannelName),

View File

@ -21,6 +21,7 @@ import (
"context"
"encoding/binary"
"math"
"os"
"testing"
"time"
@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service"
@ -190,38 +192,36 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
t.Skip()
const ctxTimeInMillisecond = 2000
const ctxTimeInMillisecond = 10000
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
// init data node
insertChannelName := "data_sync_service_test_dml"
ddlChannelName := "data_sync_service_test_ddl"
insertChannelName := "by-dev-rootcoord-dml"
ddlChannelName := "by-dev-rootcoord-ddl"
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
collectionID := UniqueID(1)
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(insertChannelName, collectionID, collMeta.GetSchema(), mockRootCoord, cm)
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
allocFactory := NewAllocatorFactory(1)
factory := dependency.NewDefaultFactory(true)
defer os.RemoveAll("/tmp/milvus")
paramtable.Get().Save(Params.DataNodeCfg.FlushInsertBufferSize.Key, "1")
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 0,
NumOfRows: 0,
@ -251,18 +251,38 @@ func TestDataSyncService_Start(t *testing.T) {
}
signalCh := make(chan string, 100)
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, &DataCoordFactory{}, newCache(), cm, newCompactionExecutor())
dataCoord := &DataCoordFactory{}
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor())
assert.Nil(t, err)
// sync.channel.addCollection(collMeta.ID, collMeta.Schema)
sync.flushListener = make(chan *segmentFlushPack)
defer close(sync.flushListener)
sync.start()
defer sync.close()
timeRange := TimeRange{
timestampMin: 0,
timestampMax: math.MaxUint64,
}
dataFactory := NewDataFactory()
insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName)
insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, tsoutil.GetCurrentTime())
msgPack := msgstream.MsgPack{
BeginTs: timeRange.timestampMin,
@ -315,10 +335,213 @@ func TestDataSyncService_Start(t *testing.T) {
_, err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// dataSync
<-sync.ctx.Done()
select {
case flushPack := <-sync.flushListener:
assert.True(t, flushPack.segmentID == 1)
return
case <-sync.ctx.Done():
assert.Fail(t, "test timeout")
}
}
func TestDataSyncService_Close(t *testing.T) {
const ctxTimeInMillisecond = 1000000
delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), delay)
defer cancel()
os.RemoveAll("/tmp/milvus")
// init data node
insertChannelName := "by-dev-rootcoord-dml2"
ddlChannelName := "by-dev-rootcoord-ddl2"
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
allocFactory := NewAllocatorFactory(1)
factory := dependency.NewDefaultFactory(true)
defer os.RemoveAll("/tmp/milvus")
paramtable.Get().Remove(Params.DataNodeCfg.FlushInsertBufferSize.Key)
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 0,
NumOfRows: 0,
DmlPosition: &internalpb.MsgPosition{},
}}
fs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 1,
NumOfRows: 0,
DmlPosition: &internalpb.MsgPosition{},
}}
var ufsIds []int64
var fsIds []int64
for _, segmentInfo := range ufs {
ufsIds = append(ufsIds, segmentInfo.ID)
}
for _, segmentInfo := range fs {
fsIds = append(fsIds, segmentInfo.ID)
}
vchan := &datapb.VchannelInfo{
CollectionID: collMeta.ID,
ChannelName: insertChannelName,
UnflushedSegmentIds: ufsIds,
FlushedSegmentIds: fsIds,
}
signalCh := make(chan string, 100)
dataCoord := &DataCoordFactory{}
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
1: {
ID: 1,
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
},
}
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor())
assert.Nil(t, err)
sync.flushListener = make(chan *segmentFlushPack, 10)
defer close(sync.flushListener)
sync.start()
dataFactory := NewDataFactory()
ts := tsoutil.GetCurrentTime()
insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, ts)
msgPack := msgstream.MsgPack{
BeginTs: ts,
EndTs: ts,
Msgs: insertMessages,
StartPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
EndPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
}
// 400 is the actual data
int64Pks := []primaryKey{
newInt64PrimaryKey(400),
}
deleteMessages := dataFactory.GenMsgStreamDeleteMsgWithTs(0, int64Pks, insertChannelName, ts+1)
inMsgs := make([]msgstream.TsMsg, 0)
inMsgs = append(inMsgs, deleteMessages)
msgPackDelete := msgstream.MsgPack{
BeginTs: ts + 1,
EndTs: ts + 1,
Msgs: inMsgs,
StartPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
EndPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
}
// generate timeTick
timeTickMsg := &msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts + 2,
HashValues: []uint32{0},
},
TimeTickMsg: internalpb.TimeTickMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_TimeTick,
MsgID: UniqueID(2),
Timestamp: ts + 2,
SourceID: 0,
},
},
}
timeTickMsgPack := msgstream.MsgPack{
BeginTs: ts + 2,
EndTs: ts + 2,
StartPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
EndPositions: []*internalpb.MsgPosition{{
ChannelName: insertChannelName,
}},
}
timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
// pulsar produce
assert.NoError(t, err)
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer([]string{ddlChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
var ddMsgStream msgstream.MsgStream = ddStream
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Produce(&msgPackDelete)
assert.NoError(t, err)
_, err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
_, err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// wait for delete
for sync.delBufferManager.GetEntriesNum(1) == 0 {
time.Sleep(100)
}
// close and wait for flush
sync.close()
for {
select {
case flushPack, ok := <-sync.flushListener:
assert.True(t, ok)
if flushPack.segmentID == 1 {
assert.True(t, len(flushPack.insertLogs) == 12)
assert.True(t, len(flushPack.statsLogs) == 1)
assert.True(t, len(flushPack.deltaLogs) == 1)
return
}
if flushPack.segmentID == 0 {
assert.True(t, len(flushPack.insertLogs) == 0)
assert.True(t, len(flushPack.statsLogs) == 0)
assert.True(t, len(flushPack.deltaLogs) == 0)
}
case <-sync.ctx.Done():
}
}
}
func genBytes() (rawData []byte) {
@ -405,6 +628,22 @@ func TestGetSegmentInfos(t *testing.T) {
segmentInfos3, err := dsService.getSegmentInfos([]int64{1})
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
dataCoord.GetSegmentInfosError = false
dataCoord.GetSegmentInfosNotSuccess = false
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
5: {
ID: 100,
CollectionID: 101,
PartitionID: 102,
InsertChannel: "by-dev-rootcoord-dml-test_v1",
},
}
segmentInfos, err = dsService.getSegmentInfos([]int64{5})
assert.NoError(t, err)
assert.Equal(t, 1, len(segmentInfos))
assert.Equal(t, int64(100), segmentInfos[0].ID)
}
func TestClearGlobalFlushingCache(t *testing.T) {

View File

@ -86,37 +86,60 @@ func (ddn *ddNode) Name() string {
return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName)
}
func (ddn *ddNode) IsValidInMsg(in []Msg) bool {
if !ddn.BaseNode.IsValidInMsg(in) {
return false
}
_, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return false
}
return true
}
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil")
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in)))
return []Msg{}
}
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
}
if msMsg.IsCloseMsg() {
var fgMsg = flowGraphMsg{
BaseMsg: flowgraph.NewBaseMsg(true),
insertMessages: make([]*msgstream.InsertMsg, 0),
timeRange: TimeRange{
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
startPositions: msMsg.StartPositions(),
endPositions: msMsg.EndPositions(),
dropCollection: false,
}
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID), zap.String("channel", ddn.vChannelName))
return []Msg{&fgMsg}
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Debug("ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collection ID", ddn.collectionID))
return []Msg{}
}
var spans []opentracing.Span
for _, msg := range msMsg.TsMessages() {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.Debug("ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collection ID", ddn.collectionID))
return []Msg{}
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
var fgMsg = flowGraphMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
@ -193,7 +216,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
case commonpb.MsgType_Delete:
dmsg := msg.(*msgstream.DeleteMsg)
log.Debug("DDNode receive delete messages",
zap.Int64("num", dmsg.NumRows),
zap.Int64("numRows", dmsg.NumRows),
zap.String("vChannelName", ddn.vChannelName))
for i := int64(0); i < dmsg.NumRows; i++ {
dmsg.HashValues = append(dmsg.HashValues, uint32(0))
@ -231,10 +254,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...)
fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...)
for _, sp := range spans {
sp.Finish()
}
return []Msg{&fgMsg}
}

View File

@ -110,11 +110,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) {
for _, test := range invalidInTests {
t.Run(test.description, func(t *testing.T) {
ddn := ddNode{}
rt := ddn.Operate(test.in)
assert.Empty(t, rt)
assert.False(t, ddn.IsValidInMsg(test.in))
})
}
// valid inputs
tests := []struct {
ddnCollID UniqueID

View File

@ -67,23 +67,21 @@ func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
}
}
// Operate implementing flowgraph.Node, performs delete data process
func (dn *deleteNode) Operate(in []Msg) []Msg {
if in == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
return []Msg{}
func (dn *deleteNode) IsValidInMsg(in []Msg) bool {
if !dn.BaseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)))
return []Msg{}
}
fgMsg, ok := in[0].(*flowGraphMsg)
_, ok := in[0].(*flowGraphMsg)
if !ok {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
return false
}
return true
}
// Operate implementing flowgraph.Node, performs delete data process
func (dn *deleteNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
var spans []opentracing.Span
for _, msg := range fgMsg.deleteMessages {
@ -127,6 +125,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
// no related delta data to flush, send empty buf to complete flush life-cycle
dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0])
} else {
// TODO, this has to be async, no need to block here
err := retry.Do(dn.ctx, func() error {
return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0])
}, getFlowGraphRetryOpt())

View File

@ -151,8 +151,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
for _, test := range invalidInTests {
te.Run(test.desc, func(t *testing.T) {
dn := deleteNode{}
rt := dn.Operate(test.in)
assert.Empty(t, rt)
assert.False(t, dn.IsValidInMsg(test.in))
})
}
})
@ -439,6 +438,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//2. here we set flushing segments inside fgmsg to empty
//in order to verify the validity of auto flush function
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
// delete has to match segment partition ID
for _, msg := range msg.deleteMessages {
msg.PartitionID = 0
}
msg.segmentsToSync = []UniqueID{}
var fgMsg flowgraph.Msg = &msg

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
@ -103,10 +104,36 @@ func (ibNode *insertBufferNode) Close() {
}
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg, ok := ibNode.verifyInMsg(in)
func (ibNode *insertBufferNode) IsValidInMsg(in []Msg) bool {
if !ibNode.BaseNode.IsValidInMsg(in) {
return false
}
_, ok := in[0].(*flowGraphMsg)
if !ok {
return []Msg{}
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return false
}
return true
}
func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
if fgMsg.IsCloseMsg() {
if len(fgMsg.endPositions) != 0 {
// try to sync all segments
segmentsToSync := ibNode.Sync(fgMsg, make([]UniqueID, 0), fgMsg.endPositions[0])
res := flowGraphMsg{
deleteMessages: []*msgstream.DeleteMsg{},
timeRange: fgMsg.timeRange,
startPositions: fgMsg.startPositions,
endPositions: fgMsg.endPositions,
segmentsToSync: segmentsToSync,
dropCollection: fgMsg.dropCollection,
BaseMsg: flowgraph.NewBaseMsg(true),
}
return []Msg{&res}
}
return in
}
if fgMsg.dropCollection {
@ -120,6 +147,12 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
// replace pchannel with vchannel
startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions))
for idx := range fgMsg.startPositions {
@ -181,33 +214,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
dropCollection: fgMsg.dropCollection,
}
for _, sp := range spans {
sp.Finish()
}
// send delete msg to DeleteNode
return []Msg{&res}
}
func (ibNode *insertBufferNode) verifyInMsg(in []Msg) (*flowGraphMsg, bool) {
// while closing
if in == nil {
log.Warn("type assertion failed for flowGraphMsg because it's nil")
return nil, false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in)))
return nil, false
}
fgMsg, ok := in[0].(*flowGraphMsg)
if !ok {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
}
return fgMsg, ok
}
func (ibNode *insertBufferNode) GetBufferIfFull(segID UniqueID) (*BufferData, bool) {
if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok && bd.effectiveCap() <= 0 {
return bd, true
@ -301,7 +311,6 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
for _, segID := range segmentIDs {
buf := ibNode.GetBuffer(segID)
syncTasks[segID] = &syncTask{
buffer: buf, // nil is valid
segmentID: segID,
@ -312,6 +321,32 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
return syncTasks
}
if fgMsg.IsCloseMsg() {
// All segments in the collection will be synced, not matter empty buffer or not
segmentIDs := ibNode.channel.listAllSegmentIDs()
log.Info("Receive close request and syncing all segments",
zap.Int64s("segments", segmentIDs),
zap.String("channel", ibNode.channelName),
)
for _, segID := range segmentIDs {
// if segment has data or delete then force sync
insertBuf, hasInsert := ibNode.channel.getCurInsertBuffer(segID)
deleteEntry := ibNode.delBufferManager.GetEntriesNum(segID)
// if insert buf or or delete buf is not empty, trigger sync
if (hasInsert && insertBuf.size > 0) || (deleteEntry > 0) {
syncTasks[segID] = &syncTask{
buffer: insertBuf, // nil is valid
segmentID: segID,
flushed: false,
dropped: false,
auto: true,
}
}
}
return syncTasks
}
// Auto Sync // TODO: move to segment_sync_policy
for _, segID := range seg2Upload {
if ibuffer, ok := ibNode.GetBufferIfFull(segID); ok {
@ -353,7 +388,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
}
}
if len(syncSegmentIDs) > 0 {
log.Debug("sync segments", zap.String("vChannel", ibNode.channelName),
log.Info("sync segments", zap.String("vChannel", ibNode.channelName),
zap.Int64s("segIDs", syncSegmentIDs)) // TODO: maybe too many prints here
}
@ -418,6 +453,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID,
)
// use the flushed pk stats to take current stat
var pkStats []*storage.PrimaryKeyStats
// TODO, this has to be async flush, no need to block here.
err := retry.Do(ibNode.ctx, func() error {
statBlobs, err := ibNode.flushManager.flushBufferData(task.buffer,
task.segmentID,

View File

@ -129,12 +129,18 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
assert.Error(t, err)
}
type mockMsg struct{}
type mockMsg struct {
BaseMsg
}
func (*mockMsg) TimeTick() Timestamp {
return 0
}
func (*mockMsg) IsClose() bool {
return false
}
func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
t.Run("Test iBNode Operate invalid Msg", func(te *testing.T) {
invalidInTests := []struct {
@ -154,8 +160,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
ibn := &insertBufferNode{
ttMerger: newMergedTimeTickerSender(func(Timestamp, []int64) error { return nil }),
}
rt := ibn.Operate(test.in)
assert.Empty(t0, rt)
assert.False(t0, ibn.IsValidInMsg(test.in))
})
}
})
@ -711,16 +716,15 @@ func (s *InsertBufferNodeSuite) SetupSuite() {
pkType: schemapb.DataType_Int64,
}
delBufManager := &DelBufferManager{
s.collID = 1
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.delBufManager = &DelBufferManager{
channel: s.channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
s.collID = 1
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.delBufManager = delBufManager
s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir))
s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
@ -886,6 +890,27 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
s.Assert().False(task.dropped)
}
})
s.Run("test close", func() {
fgMsg := &flowGraphMsg{BaseMsg: flowgraph.NewBaseMsg(true)}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: make(chan flushMsg, 100),
}
syncTasks := node.FillInSyncTasks(fgMsg, nil)
s.Assert().Equal(1, len(syncTasks))
for _, task := range syncTasks {
s.Assert().Equal(task.segmentID, int64(1))
s.Assert().False(task.dropped)
s.Assert().False(task.flushed)
s.Assert().True(task.auto)
}
})
}
func TestInsertBufferNodeSuite(t *testing.T) {

View File

@ -27,6 +27,8 @@ type (
// Msg is flowgraph.Msg
Msg = flowgraph.Msg
BaseMsg = flowgraph.BaseMsg
// MsgStreamMsg is flowgraph.MsgStreamMsg
MsgStreamMsg = flowgraph.MsgStreamMsg
@ -41,6 +43,7 @@ type (
)
type flowGraphMsg struct {
BaseMsg
insertMessages []*msgstream.InsertMsg
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
@ -56,12 +59,18 @@ func (fgMsg *flowGraphMsg) TimeTick() Timestamp {
return fgMsg.timeRange.timestampMax
}
func (fgMsg *flowGraphMsg) IsClose() bool {
return fgMsg.BaseMsg.IsCloseMsg()
}
// flush Msg is used in flowgraph insertBufferNode to flush the given segment
type flushMsg struct {
msgID UniqueID
timestamp Timestamp
segmentID UniqueID
collectionID UniqueID
//isFlush illustrates if this is a flush or normal sync
isFlush bool
}
type resendTTMsg struct {

View File

@ -56,22 +56,23 @@ func (ttn *ttNode) Name() string {
return fmt.Sprintf("ttNode-%s", ttn.vChannelName)
}
// Operate handles input messages, implementing flowgraph.Node
func (ttn *ttNode) Operate(in []Msg) []Msg {
if in == nil {
log.Warn("type assertion failed for flowGraphMsg because it's nil")
return []Msg{}
func (ttn *ttNode) IsValidInMsg(in []Msg) bool {
if !ttn.BaseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in ttNode", zap.Int("input length", len(in)))
return []Msg{}
}
fgMsg, ok := in[0].(*flowGraphMsg)
_, ok := in[0].(*flowGraphMsg)
if !ok {
log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
return []Msg{}
return false
}
return true
}
// Operate handles input messages, implementing flowgraph.Node
func (ttn *ttNode) Operate(in []Msg) []Msg {
fgMsg := in[0].(*flowGraphMsg)
if fgMsg.IsCloseMsg() {
return in
}
curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax)

View File

@ -312,6 +312,7 @@ func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flush
}
func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flushDeleteTask, deltaLogs *DelDataBuf, pos *internalpb.MsgPosition) {
log.Info("handling delete task", zap.Int64("segment ID", segmentID))
// in dropping mode
if m.dropping.Load() {
// preventing separate delete, check position exists in queue first
@ -567,6 +568,7 @@ func (m *rendezvousFlushManager) close() {
queue.injectMut.Unlock()
return true
})
m.waitForAllFlushQueue()
}
type flushBufferInsertTask struct {
@ -788,6 +790,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", dsService.collectionID),
zap.Any("startPos", startPos),
zap.Any("checkPoints", checkPoints),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
zap.Int("Length of Field2Deltalogs", len(deltaInfos[0].GetBinlogs())),
@ -816,7 +819,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req)
// should be network issue, return error and retry
if err != nil {
return fmt.Errorf(err.Error())
return err
}
// Segment not found during stale segment flush. Segment might get compacted already.
@ -854,6 +857,10 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
if pack.flushed || pack.dropped {
dsService.channel.segmentFlushed(pack.segmentID)
}
if dsService.flushListener != nil {
dsService.flushListener <- pack
}
dsService.flushingSegCache.Remove(req.GetSegmentID())
dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos)
dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos)

View File

@ -218,7 +218,14 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
dropped: t.dropped,
}
log.Debug("flush pack composed",
zap.Any("pack", pack))
zap.Int64("segmentID", t.segmentID),
zap.Int("insertLogs", len(t.insertLogs)),
zap.Int("statsLogs", len(t.statsLogs)),
zap.Int("deleteLogs", len(t.deltaLogs)),
zap.Bool("flushed", t.flushed),
zap.Bool("dropped", t.dropped),
)
if t.insertErr != nil || t.deleteErr != nil {
log.Warn("flush task error detected", zap.Error(t.insertErr), zap.Error(t.deleteErr))
pack.err = errors.New("execution failed")

View File

@ -207,6 +207,7 @@ type DataCoordFactory struct {
GetSegmentInfosError bool
GetSegmentInfosNotSuccess bool
UserSegmentInfo map[int64]*datapb.SegmentInfo
AddSegmentError bool
AddSegmentNotSuccess bool
@ -310,7 +311,9 @@ func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetS
}
var segmentInfos []*datapb.SegmentInfo
for _, segmentID := range req.SegmentIDs {
if segInfo, ok := segID2SegInfo[segmentID]; ok {
if segInfo, ok := ds.UserSegmentInfo[segmentID]; ok {
segmentInfos = append(segmentInfos, segInfo)
} else if segInfo, ok := segID2SegInfo[segmentID]; ok {
segmentInfos = append(segmentInfos, segInfo)
} else {
segmentInfos = append(segmentInfos, &datapb.SegmentInfo{
@ -780,9 +783,39 @@ func (df *DataFactory) GenMsgStreamInsertMsg(idx int, chanName string) *msgstrea
return msg
}
func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int, chanName string) (inMsgs []msgstream.TsMsg) {
func (df *DataFactory) GenMsgStreamInsertMsgWithTs(idx int, chanName string, ts Timestamp) *msgstream.InsertMsg {
var msg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{uint32(idx)},
BeginTimestamp: ts,
EndTimestamp: ts,
},
InsertRequest: internalpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
Timestamp: ts,
SourceID: 0,
},
CollectionName: "col1",
PartitionName: "default",
SegmentID: 1,
CollectionID: UniqueID(0),
ShardName: chanName,
Timestamps: []Timestamp{ts},
RowIDs: []UniqueID{UniqueID(idx)},
// RowData: []*commonpb.Blob{{Value: df.rawData}},
FieldsData: df.columnData,
Version: internalpb.InsertDataVersion_ColumnBased,
NumRows: 1,
},
}
return msg
}
func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int, chanName string, ts Timestamp) (inMsgs []msgstream.TsMsg) {
for i := 0; i < n; i++ {
var msg = df.GenMsgStreamInsertMsg(i, chanName)
var msg = df.GenMsgStreamInsertMsgWithTs(i, chanName, ts)
var tsMsg msgstream.TsMsg = msg
inMsgs = append(inMsgs, tsMsg)
}
@ -816,6 +849,7 @@ func (df *DataFactory) GenMsgStreamDeleteMsg(pks []primaryKey, chanName string)
},
CollectionName: "col1",
PartitionName: "default",
PartitionID: 1,
ShardName: chanName,
PrimaryKeys: s.ParsePrimaryKeys2IDs(pks),
Timestamps: timestamps,
@ -825,6 +859,33 @@ func (df *DataFactory) GenMsgStreamDeleteMsg(pks []primaryKey, chanName string)
return msg
}
func (df *DataFactory) GenMsgStreamDeleteMsgWithTs(idx int, pks []primaryKey, chanName string, ts Timestamp) *msgstream.DeleteMsg {
var msg = &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{uint32(idx)},
BeginTimestamp: ts,
EndTimestamp: ts,
},
DeleteRequest: internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 1,
Timestamp: ts,
SourceID: 0,
},
CollectionName: "col1",
PartitionName: "default",
PartitionID: 1,
CollectionID: UniqueID(0),
ShardName: chanName,
PrimaryKeys: s.ParsePrimaryKeys2IDs(pks),
Timestamps: []Timestamp{ts},
NumRows: int64(len(pks)),
},
}
return msg
}
func genFlowGraphInsertMsg(chanName string) flowGraphMsg {
timeRange := TimeRange{
timestampMin: 0,

View File

@ -19,7 +19,9 @@ package datanode
import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
// segmentSyncPolicy sync policy applies to segment
@ -30,7 +32,10 @@ func syncPeriodically() segmentSyncPolicy {
return func(segment *Segment, ts Timestamp) bool {
endTime := tsoutil.PhysicalTime(ts)
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs)
return endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) &&
!segment.isBufferEmpty()
shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) && !segment.isBufferEmpty()
if shouldSync {
log.Info("sync segment periodically ", zap.Time("now", endTime), zap.Time("last sync", lastSyncTime))
}
return shouldSync
}
}

View File

@ -53,30 +53,26 @@ func (dNode *deleteNode) Name() string {
return fmt.Sprintf("dNode-%s", dNode.deltaVchannel)
}
func (dNode *deleteNode) IsValidInMsg(in []Msg) bool {
if !dNode.baseNode.IsValidInMsg(in) {
return false
}
_, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
return false
}
return true
}
// Operate handles input messages, do delete operations
func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name()))
return []Msg{}
}
if len(in) != 1 {
log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name()))
return []Msg{}
}
dMsg, ok := in[0].(*deleteMsg)
if !ok {
log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name()))
return []Msg{}
}
delData := &deleteData{
deleteIDs: map[UniqueID][]primaryKey{},
deleteTimestamps: map[UniqueID][]Timestamp{},
deleteOffset: map[UniqueID]int64{},
}
var spans []opentracing.Span
for _, msg := range dMsg.deleteMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
@ -84,6 +80,24 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
if dMsg.IsCloseMsg() {
return []Msg{
&serviceTimeMsg{BaseMsg: flowgraph.NewBaseMsg(true)},
}
}
delData := &deleteData{
deleteIDs: map[UniqueID][]primaryKey{},
deleteTimestamps: map[UniqueID][]Timestamp{},
deleteOffset: map[UniqueID]int64{},
}
// 1. filter segment by bloom filter
for i, delMsg := range dMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
@ -154,9 +168,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
var res Msg = &serviceTimeMsg{
timeRange: dMsg.timeRange,
}
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}

View File

@ -43,23 +43,21 @@ func (fddNode *filterDeleteNode) Name() string {
return fmt.Sprintf("fdNode-%s", fddNode.vchannel)
}
// Operate handles input messages, to filter invalid delete messages
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name()))
return []Msg{}
func (fddNode *filterDeleteNode) IsValidInMsg(in []Msg) bool {
if !fddNode.baseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name()))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
_, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name()))
return []Msg{}
return false
}
return true
}
// Operate handles input messages, to filter invalid delete messages
func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg := in[0].(*MsgStreamMsg)
var spans []opentracing.Span
for _, msg := range msgStreamMsg.TsMessages() {
@ -68,6 +66,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
if msgStreamMsg.IsCloseMsg() {
return []Msg{
&deleteMsg{BaseMsg: flowgraph.NewBaseMsg(true)},
}
}
var dMsg = deleteMsg{
deleteMessages: make([]*msgstream.DeleteMsg, 0),
timeRange: TimeRange{
@ -102,11 +112,8 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
zap.String("vchannel", fddNode.vchannel))
}
}
var res Msg = &dMsg
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
return []Msg{&dMsg}
}
// filterInvalidDeleteMessage would filter invalid delete messages
@ -142,7 +149,6 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet
// newFilteredDeleteNode returns a new filterDeleteNode
func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDeleteNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism.GetAsInt32()

View File

@ -48,22 +48,21 @@ func (fdmNode *filterDmNode) Name() string {
return fmt.Sprintf("fdmNode-%s", fdmNode.vchannel)
}
// Operate handles input messages, to filter invalid insert messages
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name()))
return []Msg{}
func (fdmNode *filterDmNode) IsValidInMsg(in []Msg) bool {
if !fdmNode.baseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name()))
return []Msg{}
}
msgStreamMsg, ok := in[0].(*MsgStreamMsg)
_, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name()))
return []Msg{}
return false
}
return true
}
// Operate handles input messages, to filter invalid insert messages
func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
msgStreamMsg := in[0].(*MsgStreamMsg)
var spans []opentracing.Span
for _, msg := range msgStreamMsg.TsMessages() {
@ -71,6 +70,17 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
if msgStreamMsg.IsCloseMsg() {
return []Msg{
&insertMsg{BaseMsg: flowgraph.NewBaseMsg(true)},
}
}
var iMsg = insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
@ -125,11 +135,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
}
}
var res Msg = &iMsg
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
return []Msg{&iMsg}
}
// filterInvalidDeleteMessage would filter out invalid delete messages
@ -231,7 +237,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
// newFilteredDmNode returns a new filterDmNode
func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDmNode {
maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()
maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism.GetAsInt32()

View File

@ -71,22 +71,38 @@ func (iNode *insertNode) Name() string {
return fmt.Sprintf("iNode-%s", iNode.vchannel)
}
// Operate handles input messages, to execute insert operations
func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name()))
return []Msg{}
func (iNode *insertNode) IsValidInMsg(in []Msg) bool {
if !iNode.baseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name()))
return []Msg{}
}
iMsg, ok := in[0].(*insertMsg)
_, ok := in[0].(*insertMsg)
if !ok {
log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name()))
return []Msg{}
return false
}
return true
}
// Operate handles input messages, to execute insert operations
func (iNode *insertNode) Operate(in []Msg) []Msg {
iMsg := in[0].(*insertMsg)
var spans []opentracing.Span
for _, msg := range iMsg.insertMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()
if iMsg.IsCloseMsg() {
return []Msg{
&serviceTimeMsg{BaseMsg: flowgraph.NewBaseMsg(true)},
}
}
iData := insertData{
@ -97,13 +113,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
insertPKs: make(map[UniqueID][]primaryKey),
}
var spans []opentracing.Span
for _, msg := range iMsg.insertMessages {
sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
spans = append(spans, sp)
msg.SetTraceCtx(ctx)
}
collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID)
if err != nil {
// QueryNode should add collection before start flow graph
@ -295,9 +304,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
var res Msg = &serviceTimeMsg{
timeRange: iMsg.timeRange,
}
for _, sp := range spans {
sp.Finish()
}
return []Msg{res}
}

View File

@ -23,12 +23,14 @@ import (
// Msg is an interface which has a function named TimeTick
type Msg = flowgraph.Msg
type BaseMsg = flowgraph.BaseMsg
// MsgStreamMsg is an implementation of interface Msg
type MsgStreamMsg = flowgraph.MsgStreamMsg
// insertMsg is an implementation of interface Msg
type insertMsg struct {
BaseMsg
insertMessages []*msgstream.InsertMsg
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
@ -36,12 +38,14 @@ type insertMsg struct {
// deleteMsg is an implementation of interface Msg
type deleteMsg struct {
BaseMsg
deleteMessages []*msgstream.DeleteMsg
timeRange TimeRange
}
// serviceTimeMsg is an implementation of interface Msg
type serviceTimeMsg struct {
BaseMsg
timeRange TimeRange
}
@ -50,12 +54,24 @@ func (iMsg *insertMsg) TimeTick() Timestamp {
return iMsg.timeRange.timestampMax
}
func (iMsg *insertMsg) IsClose() bool {
return iMsg.IsCloseMsg()
}
// TimeTick returns timestamp of deleteMsg
func (dMsg *deleteMsg) TimeTick() Timestamp {
return dMsg.timeRange.timestampMax
}
func (dMsg *deleteMsg) IsClose() bool {
return dMsg.IsCloseMsg()
}
// TimeTick returns timestamp of serviceTimeMsg
func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
return stMsg.timeRange.timestampMax
}
func (stMsg *serviceTimeMsg) IsClose() bool {
return stMsg.IsCloseMsg()
}

View File

@ -41,22 +41,28 @@ func (stNode *serviceTimeNode) Name() string {
return fmt.Sprintf("stNode-%s", stNode.vChannel)
}
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
if in == nil {
log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name()))
return []Msg{}
func (stNode *serviceTimeNode) IsValidInMsg(in []Msg) bool {
if !stNode.baseNode.IsValidInMsg(in) {
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name()))
return []Msg{}
}
serviceTimeMsg, ok := in[0].(*serviceTimeMsg)
_, ok := in[0].(*serviceTimeMsg)
if !ok {
log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name()))
return []Msg{}
return false
}
return true
}
// Operate handles input messages, to execute insert operations
func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
serviceTimeMsg := in[0].(*serviceTimeMsg)
if serviceTimeMsg.IsCloseMsg() {
log.Info("service node hit close msg",
zap.Int64("collectionID", stNode.collectionID),
zap.Uint64("tSafe", serviceTimeMsg.timeRange.timestampMax),
zap.String("channel", stNode.vChannel),
)
return in
}
// update service time

View File

@ -58,12 +58,16 @@ func (m *numMsg) TimeTick() Timestamp {
return Timestamp(0)
}
func (m *numMsg) IsClose() bool {
return false
}
func (n *nodeA) Name() string {
return "NodeA"
}
func (n *nodeA) Operate(in []Msg) []Msg {
// ignore `in` because nodeA doesn't have any upstream node.
// ignore `in` because nodeA doesn't have any upstream node.git s
a := <-n.inputChan
var res Msg = &numMsg{
num: a,

View File

@ -37,6 +37,7 @@ import (
type InputNode struct {
BaseNode
inStream msgstream.MsgStream
lastMsg *msgstream.MsgPack
name string
role string
nodeID int64
@ -61,6 +62,10 @@ func (inNode *InputNode) Close() {
})
}
func (inNode *InputNode) IsValidInMsg(in []Msg) bool {
return true
}
// Name returns node name
func (inNode *InputNode) Name() string {
return inNode.name
@ -76,8 +81,19 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
msgPack, ok := <-inNode.inStream.Chan()
if !ok {
log.Warn("MsgStream closed", zap.Any("input node", inNode.Name()))
if inNode.lastMsg != nil {
log.Info("trigger force sync", zap.Int64("collection", inNode.collectionID), zap.Any("position", inNode.lastMsg))
return []Msg{&MsgStreamMsg{
BaseMsg: NewBaseMsg(true),
tsMessages: []msgstream.TsMsg{},
timestampMin: inNode.lastMsg.BeginTs,
timestampMax: inNode.lastMsg.EndTs,
startPositions: inNode.lastMsg.StartPositions,
endPositions: inNode.lastMsg.EndPositions,
}}
}
return []Msg{&MsgStreamMsg{
isCloseMsg: true,
BaseMsg: NewBaseMsg(true),
}}
}
@ -86,6 +102,7 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
return []Msg{}
}
inNode.lastMsg = msgPack
sub := tsoutil.SubByNow(msgPack.EndTs)
if inNode.role == typeutil.QueryNodeRole {
metrics.QueryNodeConsumerMsgCount.

View File

@ -23,16 +23,31 @@ import (
// Msg is an abstract class that contains a method to get the time tick of this message
type Msg interface {
TimeTick() Timestamp
IsClose() bool
}
type BaseMsg struct {
isCloseMsg bool
}
func (msg BaseMsg) IsCloseMsg() bool {
return msg.isCloseMsg
}
func NewBaseMsg(isCloseMsg bool) BaseMsg {
return BaseMsg{
isCloseMsg: isCloseMsg,
}
}
// MsgStreamMsg is a wrapper of TsMsg in flowgraph
type MsgStreamMsg struct {
BaseMsg
tsMessages []msgstream.TsMsg
timestampMin Timestamp
timestampMax Timestamp
startPositions []*MsgPosition
endPositions []*MsgPosition
isCloseMsg bool
}
// GenerateMsgStreamMsg is used to create a new MsgStreamMsg object
@ -51,6 +66,10 @@ func (msMsg *MsgStreamMsg) TimeTick() Timestamp {
return msMsg.timestampMax
}
func (msMsg *MsgStreamMsg) IsClose() bool {
return msMsg.isCloseMsg
}
// DownStreamNodeIdx returns 0
func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int {
return 0

View File

@ -38,6 +38,7 @@ type Node interface {
Name() string
MaxQueueLength() int32
MaxParallelism() int32
IsValidInMsg(in []Msg) bool
Operate(in []Msg) []Msg
IsInputNode() bool
Start()
@ -85,8 +86,7 @@ func (nodeCtx *nodeCtx) Unblock() {
func isCloseMsg(msgs []Msg) bool {
if len(msgs) == 1 {
msg, ok := msgs[0].(*MsgStreamMsg)
return ok && msg.isCloseMsg
return msgs[0].IsClose()
}
return false
}
@ -118,15 +118,14 @@ func (nodeCtx *nodeCtx) work() {
input = <-nodeCtx.inputChannel
}
// the input message decides whether the operate method is executed
if isCloseMsg(input) {
output = input
}
if len(output) == 0 {
n := nodeCtx.node
nodeCtx.blockMutex.RLock()
output = n.Operate(input)
n := nodeCtx.node
nodeCtx.blockMutex.RLock()
if !n.IsValidInMsg(input) {
nodeCtx.blockMutex.RUnlock()
continue
}
output = n.Operate(input)
nodeCtx.blockMutex.RUnlock()
// the output decide whether the node should be closed.
if isCloseMsg(output) {
close(nodeCtx.closeCh)
@ -186,3 +185,24 @@ func (node *BaseNode) Start() {}
// Close implementing Node, base node does nothing when stops
func (node *BaseNode) Close() {}
func (node *BaseNode) Name() string {
return "BaseNode"
}
func (node *BaseNode) Operate(in []Msg) []Msg {
return in
}
func (node *BaseNode) IsValidInMsg(in []Msg) bool {
if in == nil {
log.Info("type assertion failed because it's nil")
return false
}
if len(in) != 1 {
log.Warn("Invalid operate message input", zap.Int("input length", len(in)))
return false
}
return true
}