diff --git a/internal/datanode/buffer.go b/internal/datanode/buffer.go index da1f46bde0..19cc4016b6 100644 --- a/internal/datanode/buffer.go +++ b/internal/datanode/buffer.go @@ -46,6 +46,8 @@ type DelBufferManager struct { } func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 { + bm.mu.Lock() + defer bm.mu.Unlock() if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { return delDataBuf.item.memorySize } @@ -53,6 +55,8 @@ func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 { } func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 { + bm.mu.Lock() + defer bm.mu.Unlock() if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok { return delDataBuf.GetEntriesNum() } diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 147abd2572..38efab5979 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -180,10 +180,10 @@ func (c *ChannelMeta) maxRowCountPerSegment(ts Timestamp) (int64, error) { // Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`. func (c *ChannelMeta) addSegment(req addSegmentReq) error { if req.collID != c.collectionID { - log.Warn("collection mismatch", + log.Warn("failed to addSegment, collection mismatch", zap.Int64("current collection ID", req.collID), zap.Int64("expected collection ID", c.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", req.collID) + return fmt.Errorf("failed to addSegment, mismatch collection, ID=%d", req.collID) } log.Info("adding segment", zap.String("type", req.segType.String()), @@ -489,8 +489,11 @@ func (c *ChannelMeta) getCollectionID() UniqueID { // // If you want the latest collection schema, ts should be 0. func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { - if !c.validCollection(collID) { - return nil, fmt.Errorf("mismatch collection, want %d, actual %d", c.collectionID, collID) + if collID != c.collectionID { + log.Warn("failed to getCollectionSchema, collection mismatch", + zap.Int64("current collection ID", collID), + zap.Int64("expected collection ID", c.collectionID)) + return nil, fmt.Errorf("failed to getCollectionSchema, mismatch collection, want %d, actual %d", c.collectionID, collID) } c.schemaMut.RLock() @@ -513,12 +516,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem return c.collSchema, nil } -func (c *ChannelMeta) validCollection(collID UniqueID) bool { - return collID == c.collectionID -} - func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compactedFrom []UniqueID) error { - log := log.With( zap.Int64("segment ID", seg.segmentID), zap.Int64("collection ID", seg.collectionID), @@ -528,9 +526,10 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac zap.String("channel name", c.channelName)) if seg.collectionID != c.collectionID { - log.Warn("Mismatch collection", - zap.Int64("expected collectionID", c.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", seg.collectionID) + log.Warn("failed to mergeFlushedSegments, collection mismatch", + zap.Int64("current collection ID", seg.collectionID), + zap.Int64("expected collection ID", c.collectionID)) + return fmt.Errorf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID) } var inValidSegments []UniqueID @@ -571,10 +570,10 @@ func (c *ChannelMeta) mergeFlushedSegments(seg *Segment, planID UniqueID, compac // for tests only func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, numOfRows int64, ids storage.FieldData) error { if collID != c.collectionID { - log.Warn("Mismatch collection", - zap.Int64("input ID", collID), - zap.Int64("expected ID", c.collectionID)) - return fmt.Errorf("mismatch collection, ID=%d", collID) + log.Warn("failed to addFlushedSegmentWithPKs, collection mismatch", + zap.Int64("current collection ID", collID), + zap.Int64("expected collection ID", c.collectionID)) + return fmt.Errorf("failed to addFlushedSegmentWithPKs, mismatch collection, ID=%d", collID) } log.Info("Add Flushed segment", diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index e267af6c3f..ee2e420103 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sync" "go.uber.org/zap" @@ -43,10 +44,10 @@ type dataSyncService struct { ctx context.Context cancelFn context.CancelFunc fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages - flushCh chan flushMsg // chan to notify flush - resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. - channel Channel // channel stores meta of channel - idAllocator allocatorInterface // id/timestamp allocator + flushCh chan flushMsg + resendTTCh chan resendTTMsg // chan to ask for resending DataNode time tick message. + channel Channel // channel stores meta of channel + idAllocator allocatorInterface // id/timestamp allocator msFactory msgstream.Factory collectionID UniqueID // collection id of vchan for which this data sync service serves vchannelName string @@ -58,6 +59,9 @@ type dataSyncService struct { flushManager flushManager // flush manager handles flush process chunkManager storage.ChunkManager compactor *compactionExecutor // reference to compaction executor + + stopOnce sync.Once + flushListener chan *segmentFlushPack // chan to listen flush event } func newDataSyncService(ctx context.Context, @@ -131,7 +135,7 @@ func newParallelConfig() parallelConfig { return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength, Params.DataNodeCfg.FlowGraphMaxParallelism} } -// start starts the flow graph in datasyncservice +// start the flow graph in datasyncservice func (dsService *dataSyncService) start() { if dsService.fg != nil { log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID), @@ -144,18 +148,20 @@ func (dsService *dataSyncService) start() { } func (dsService *dataSyncService) close() { - if dsService.fg != nil { - log.Info("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID), - zap.String("vChanName", dsService.vchannelName)) - dsService.fg.Close() - metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Dec() - metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel - } + dsService.stopOnce.Do(func() { + if dsService.fg != nil { + log.Info("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID), + zap.String("vChanName", dsService.vchannelName)) + dsService.fg.Close() + metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Dec() + metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel + } - dsService.clearGlobalFlushingCache() - - dsService.cancelFn() - dsService.flushManager.close() + dsService.clearGlobalFlushingCache() + close(dsService.flushCh) + dsService.flushManager.close() + dsService.cancelFn() + }) } func (dsService *dataSyncService) clearGlobalFlushingCache() { @@ -170,6 +176,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel, flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService)) + log.Info("begin to init data sync service", zap.Int64("collection", vchanInfo.CollectionID), + zap.String("Chan", vchanInfo.ChannelName), + zap.Int64s("unflushed", vchanInfo.GetUnflushedSegmentIds()), + zap.Int64s("flushed", vchanInfo.GetFlushedSegmentIds()), + ) var err error // recover segment checkpoints unflushedSegmentInfos, err := dsService.getSegmentInfos(vchanInfo.GetUnflushedSegmentIds()) @@ -186,7 +197,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro for _, us := range unflushedSegmentInfos { if us.CollectionID != dsService.collectionID || us.GetInsertChannel() != vchanInfo.ChannelName { - log.Warn("Collection ID or ChannelName not compact", + log.Warn("Collection ID or ChannelName not match", zap.Int64("Wanted ID", dsService.collectionID), zap.Int64("Actual ID", us.CollectionID), zap.String("Wanted Channel Name", vchanInfo.ChannelName), @@ -223,7 +234,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro for _, fs := range flushedSegmentInfos { if fs.CollectionID != dsService.collectionID || fs.GetInsertChannel() != vchanInfo.ChannelName { - log.Warn("Collection ID or ChannelName not compact", + log.Warn("Collection ID or ChannelName not match", zap.Int64("Wanted ID", dsService.collectionID), zap.Int64("Actual ID", fs.CollectionID), zap.String("Wanted Channel Name", vchanInfo.ChannelName), diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 7507bdaee0..06587294c6 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -21,6 +21,7 @@ import ( "context" "encoding/binary" "math" + "os" "testing" "time" @@ -36,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/tsoutil" ) var dataSyncServiceTestDir = "/tmp/milvus_test/data_sync_service" @@ -189,38 +191,36 @@ func TestDataSyncService_newDataSyncService(te *testing.T) { // NOTE: start pulsar before test func TestDataSyncService_Start(t *testing.T) { - t.Skip() - const ctxTimeInMillisecond = 2000 + const ctxTimeInMillisecond = 10000 delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), delay) defer cancel() // init data node - - insertChannelName := "data_sync_service_test_dml" - ddlChannelName := "data_sync_service_test_ddl" + insertChannelName := "by-dev-rootcoord-dml" + ddlChannelName := "by-dev-rootcoord-ddl" Factory := &MetaFactory{} collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) mockRootCoord := &RootCoordFactory{ pkType: schemapb.DataType_Int64, } - collectionID := UniqueID(1) flushChan := make(chan flushMsg, 100) resendTTChan := make(chan resendTTMsg, 100) cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - channel := newChannel(insertChannelName, collectionID, collMeta.GetSchema(), mockRootCoord, cm) + channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) allocFactory := NewAllocatorFactory(1) factory := dependency.NewDefaultFactory(true) - + defer os.RemoveAll("/tmp/milvus") Params.DataNodeCfg.FlushInsertBufferSize = 1 ufs := []*datapb.SegmentInfo{{ CollectionID: collMeta.ID, + PartitionID: 1, InsertChannel: insertChannelName, ID: 0, NumOfRows: 0, @@ -250,18 +250,38 @@ func TestDataSyncService_Start(t *testing.T) { } signalCh := make(chan string, 100) - sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, &DataCoordFactory{}, newCache(), cm, newCompactionExecutor()) + dataCoord := &DataCoordFactory{} + dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + 0: { + ID: 0, + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + }, + + 1: { + ID: 1, + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + }, + } + + sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor()) assert.Nil(t, err) - // sync.channel.addCollection(collMeta.ID, collMeta.Schema) + + sync.flushListener = make(chan *segmentFlushPack) + defer close(sync.flushListener) sync.start() + defer sync.close() timeRange := TimeRange{ timestampMin: 0, timestampMax: math.MaxUint64, } dataFactory := NewDataFactory() - insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName) + insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, tsoutil.GetCurrentTime()) msgPack := msgstream.MsgPack{ BeginTs: timeRange.timestampMin, @@ -317,10 +337,213 @@ func TestDataSyncService_Start(t *testing.T) { err = ddMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - // dataSync - <-sync.ctx.Done() + select { + case flushPack := <-sync.flushListener: + assert.True(t, flushPack.segmentID == 1) + return + case <-sync.ctx.Done(): + assert.Fail(t, "test timeout") + } +} +func TestDataSyncService_Close(t *testing.T) { + const ctxTimeInMillisecond = 1000000 + + delay := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) + ctx, cancel := context.WithDeadline(context.Background(), delay) + defer cancel() + + os.RemoveAll("/tmp/milvus") + + // init data node + insertChannelName := "by-dev-rootcoord-dml2" + ddlChannelName := "by-dev-rootcoord-ddl2" + + Factory := &MetaFactory{} + collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64) + mockRootCoord := &RootCoordFactory{ + pkType: schemapb.DataType_Int64, + } + + flushChan := make(chan flushMsg, 100) + resendTTChan := make(chan resendTTMsg, 100) + cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm) + + allocFactory := NewAllocatorFactory(1) + factory := dependency.NewDefaultFactory(true) + defer os.RemoveAll("/tmp/milvus") + + Params.DataNodeCfg.FlushInsertBufferSize = 0 + ufs := []*datapb.SegmentInfo{{ + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + ID: 0, + NumOfRows: 0, + DmlPosition: &internalpb.MsgPosition{}, + }} + fs := []*datapb.SegmentInfo{{ + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + ID: 1, + NumOfRows: 0, + DmlPosition: &internalpb.MsgPosition{}, + }} + var ufsIds []int64 + var fsIds []int64 + for _, segmentInfo := range ufs { + ufsIds = append(ufsIds, segmentInfo.ID) + } + for _, segmentInfo := range fs { + fsIds = append(fsIds, segmentInfo.ID) + } + vchan := &datapb.VchannelInfo{ + CollectionID: collMeta.ID, + ChannelName: insertChannelName, + UnflushedSegmentIds: ufsIds, + FlushedSegmentIds: fsIds, + } + + signalCh := make(chan string, 100) + + dataCoord := &DataCoordFactory{} + dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + 0: { + ID: 0, + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + }, + + 1: { + ID: 1, + CollectionID: collMeta.ID, + PartitionID: 1, + InsertChannel: insertChannelName, + }, + } + + sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor()) + assert.Nil(t, err) + + sync.flushListener = make(chan *segmentFlushPack, 10) + defer close(sync.flushListener) + sync.start() + + dataFactory := NewDataFactory() + ts := tsoutil.GetCurrentTime() + insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, ts) + msgPack := msgstream.MsgPack{ + BeginTs: ts, + EndTs: ts, + Msgs: insertMessages, + StartPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + EndPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + } + + // 400 is the actual data + int64Pks := []primaryKey{ + newInt64PrimaryKey(400), + } + deleteMessages := dataFactory.GenMsgStreamDeleteMsgWithTs(0, int64Pks, insertChannelName, ts+1) + inMsgs := make([]msgstream.TsMsg, 0) + inMsgs = append(inMsgs, deleteMessages) + + msgPackDelete := msgstream.MsgPack{ + BeginTs: ts + 1, + EndTs: ts + 1, + Msgs: inMsgs, + StartPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + EndPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + } + + // generate timeTick + timeTickMsg := &msgstream.TimeTickMsg{ + BaseMsg: msgstream.BaseMsg{ + BeginTimestamp: ts, + EndTimestamp: ts + 2, + HashValues: []uint32{0}, + }, + TimeTickMsg: internalpb.TimeTickMsg{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_TimeTick, + MsgID: UniqueID(2), + Timestamp: ts + 2, + SourceID: 0, + }, + }, + } + + timeTickMsgPack := msgstream.MsgPack{ + BeginTs: ts + 2, + EndTs: ts + 2, + StartPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + EndPositions: []*internalpb.MsgPosition{{ + ChannelName: insertChannelName, + }}, + } + timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg) + + // pulsar produce + assert.NoError(t, err) + insertStream, _ := factory.NewMsgStream(ctx) + insertStream.AsProducer([]string{insertChannelName}) + + ddStream, _ := factory.NewMsgStream(ctx) + ddStream.AsProducer([]string{ddlChannelName}) + + var insertMsgStream msgstream.MsgStream = insertStream + var ddMsgStream msgstream.MsgStream = ddStream + + err = insertMsgStream.Produce(&msgPack) + assert.NoError(t, err) + + err = insertMsgStream.Produce(&msgPackDelete) + assert.NoError(t, err) + + err = insertMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) + err = ddMsgStream.Broadcast(&timeTickMsgPack) + assert.NoError(t, err) + + // wait for delete + for sync.delBufferManager.GetEntriesNum(1) == 0 { + time.Sleep(100) + } + + // close and wait for flush sync.close() + for { + select { + case flushPack, ok := <-sync.flushListener: + assert.True(t, ok) + if flushPack.segmentID == 1 { + assert.True(t, len(flushPack.insertLogs) == 12) + assert.True(t, len(flushPack.statsLogs) == 1) + assert.True(t, len(flushPack.deltaLogs) == 1) + return + } + if flushPack.segmentID == 0 { + assert.True(t, len(flushPack.insertLogs) == 0) + assert.True(t, len(flushPack.statsLogs) == 0) + assert.True(t, len(flushPack.deltaLogs) == 0) + } + case <-sync.ctx.Done(): + } + } } func genBytes() (rawData []byte) { @@ -407,6 +630,22 @@ func TestGetSegmentInfos(t *testing.T) { segmentInfos3, err := dsService.getSegmentInfos([]int64{1}) assert.Error(t, err) assert.Empty(t, segmentInfos3) + + dataCoord.GetSegmentInfosError = false + dataCoord.GetSegmentInfosNotSuccess = false + dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{ + 5: { + ID: 100, + CollectionID: 101, + PartitionID: 102, + InsertChannel: "by-dev-rootcoord-dml-test_v1", + }, + } + + segmentInfos, err = dsService.getSegmentInfos([]int64{5}) + assert.NoError(t, err) + assert.Equal(t, 1, len(segmentInfos)) + assert.Equal(t, int64(100), segmentInfos[0].ID) } func TestClearGlobalFlushingCache(t *testing.T) { diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index d2d3bbaf8f..b57244a134 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -85,37 +85,60 @@ func (ddn *ddNode) Name() string { return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName) } +func (ddn *ddNode) IsValidInMsg(in []Msg) bool { + if !ddn.BaseNode.IsValidInMsg(in) { + return false + } + _, ok := in[0].(*MsgStreamMsg) + if !ok { + log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + return false + } + return true +} + // Operate handles input messages, implementing flowgrpah.Node func (ddn *ddNode) Operate(in []Msg) []Msg { - if in == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil") - return []Msg{} - } - - if len(in) != 1 { - log.Warn("Invalid operate message input in ddNode", zap.Int("input length", len(in))) - return []Msg{} - } - msMsg, ok := in[0].(*MsgStreamMsg) if !ok { log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) return []Msg{} } + if msMsg.IsCloseMsg() { + var fgMsg = flowGraphMsg{ + BaseMsg: flowgraph.NewBaseMsg(true), + insertMessages: make([]*msgstream.InsertMsg, 0), + timeRange: TimeRange{ + timestampMin: msMsg.TimestampMin(), + timestampMax: msMsg.TimestampMax(), + }, + startPositions: msMsg.StartPositions(), + endPositions: msMsg.EndPositions(), + dropCollection: false, + } + log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID), zap.String("channel", ddn.vChannelName)) + return []Msg{&fgMsg} + } + + if load := ddn.dropMode.Load(); load != nil && load.(bool) { + log.Debug("ddNode in dropMode", + zap.String("vChannelName", ddn.vChannelName), + zap.Int64("collection ID", ddn.collectionID)) + return []Msg{} + } + var spans []opentracing.Span for _, msg := range msMsg.TsMessages() { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) spans = append(spans, sp) msg.SetTraceCtx(ctx) } - - if load := ddn.dropMode.Load(); load != nil && load.(bool) { - log.Debug("ddNode in dropMode", - zap.String("vChannelName", ddn.vChannelName), - zap.Int64("collection ID", ddn.collectionID)) - return []Msg{} - } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() var fgMsg = flowGraphMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), @@ -192,7 +215,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { case commonpb.MsgType_Delete: dmsg := msg.(*msgstream.DeleteMsg) log.Debug("DDNode receive delete messages", - zap.Int64("num", dmsg.NumRows), + zap.Int64("numRows", dmsg.NumRows), zap.String("vChannelName", ddn.vChannelName)) for i := int64(0); i < dmsg.NumRows; i++ { dmsg.HashValues = append(dmsg.HashValues, uint32(0)) @@ -230,10 +253,6 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) fgMsg.endPositions = append(fgMsg.endPositions, msMsg.EndPositions()...) - for _, sp := range spans { - sp.Finish() - } - return []Msg{&fgMsg} } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 01df072fee..82dbedaac7 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -110,11 +110,9 @@ func TestFlowGraph_DDNode_Operate(t *testing.T) { for _, test := range invalidInTests { t.Run(test.description, func(t *testing.T) { ddn := ddNode{} - rt := ddn.Operate(test.in) - assert.Empty(t, rt) + assert.False(t, ddn.IsValidInMsg(test.in)) }) } - // valid inputs tests := []struct { ddnCollID UniqueID diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index fa63699b82..580aece05c 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -67,23 +67,21 @@ func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) { } } -// Operate implementing flowgraph.Node, performs delete data process -func (dn *deleteNode) Operate(in []Msg) []Msg { - if in == nil { - log.Debug("type assertion failed for flowGraphMsg because it's nil") - return []Msg{} +func (dn *deleteNode) IsValidInMsg(in []Msg) bool { + if !dn.BaseNode.IsValidInMsg(in) { + return false } - - if len(in) != 1 { - log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in))) - return []Msg{} - } - - fgMsg, ok := in[0].(*flowGraphMsg) + _, ok := in[0].(*flowGraphMsg) if !ok { log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - return []Msg{} + return false } + return true +} + +// Operate implementing flowgraph.Node, performs delete data process +func (dn *deleteNode) Operate(in []Msg) []Msg { + fgMsg := in[0].(*flowGraphMsg) var spans []opentracing.Span for _, msg := range fgMsg.deleteMessages { @@ -127,6 +125,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { // no related delta data to flush, send empty buf to complete flush life-cycle dn.flushManager.flushDelData(nil, segmentToFlush, fgMsg.endPositions[0]) } else { + // TODO, this has to be async, no need to block here err := retry.Do(dn.ctx, func() error { return dn.flushManager.flushDelData(buf, segmentToFlush, fgMsg.endPositions[0]) }, getFlowGraphRetryOpt()) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index bb3d7e8ae9..b6923dae76 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -150,8 +150,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { for _, test := range invalidInTests { te.Run(test.desc, func(t *testing.T) { dn := deleteNode{} - rt := dn.Operate(test.in) - assert.Empty(t, rt) + assert.False(t, dn.IsValidInMsg(test.in)) }) } }) @@ -439,6 +438,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { //2. here we set flushing segments inside fgmsg to empty //in order to verify the validity of auto flush function msg := genFlowGraphDeleteMsg(int64Pks, chanName) + + // delete has to match segment partition ID + for _, msg := range msg.deleteMessages { + msg.PartitionID = 0 + } msg.segmentsToSync = []UniqueID{} var fgMsg flowgraph.Msg = &msg diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 8d80de93aa..d511cc9b62 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/trace" @@ -102,10 +103,36 @@ func (ibNode *insertBufferNode) Close() { } } -func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { - fgMsg, ok := ibNode.verifyInMsg(in) +func (ibNode *insertBufferNode) IsValidInMsg(in []Msg) bool { + if !ibNode.BaseNode.IsValidInMsg(in) { + return false + } + _, ok := in[0].(*flowGraphMsg) if !ok { - return []Msg{} + log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) + return false + } + return true +} + +func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { + fgMsg := in[0].(*flowGraphMsg) + if fgMsg.IsCloseMsg() { + if len(fgMsg.endPositions) != 0 { + // try to sync all segments + segmentsToSync := ibNode.Sync(fgMsg, make([]UniqueID, 0), fgMsg.endPositions[0]) + res := flowGraphMsg{ + deleteMessages: []*msgstream.DeleteMsg{}, + timeRange: fgMsg.timeRange, + startPositions: fgMsg.startPositions, + endPositions: fgMsg.endPositions, + segmentsToSync: segmentsToSync, + dropCollection: fgMsg.dropCollection, + BaseMsg: flowgraph.NewBaseMsg(true), + } + return []Msg{&res} + } + return in } if fgMsg.dropCollection { @@ -119,6 +146,12 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { msg.SetTraceCtx(ctx) } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() + // replace pchannel with vchannel startPositions := make([]*internalpb.MsgPosition, 0, len(fgMsg.startPositions)) for idx := range fgMsg.startPositions { @@ -180,33 +213,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg { dropCollection: fgMsg.dropCollection, } - for _, sp := range spans { - sp.Finish() - } - // send delete msg to DeleteNode return []Msg{&res} } -func (ibNode *insertBufferNode) verifyInMsg(in []Msg) (*flowGraphMsg, bool) { - // while closing - if in == nil { - log.Warn("type assertion failed for flowGraphMsg because it's nil") - return nil, false - } - - if len(in) != 1 { - log.Warn("Invalid operate message input in insertBufferNode", zap.Int("input length", len(in))) - return nil, false - } - - fgMsg, ok := in[0].(*flowGraphMsg) - if !ok { - log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - } - return fgMsg, ok -} - func (ibNode *insertBufferNode) GetBufferIfFull(segID UniqueID) (*BufferData, bool) { if bd, ok := ibNode.channel.getCurInsertBuffer(segID); ok && bd.effectiveCap() <= 0 { return bd, true @@ -305,7 +315,6 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload for _, segID := range segmentIDs { buf := ibNode.GetBuffer(segID) - syncTasks[segID] = &syncTask{ buffer: buf, // nil is valid segmentID: segID, @@ -316,6 +325,32 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload return syncTasks } + if fgMsg.IsCloseMsg() { + // All segments in the collection will be synced, not matter empty buffer or not + segmentIDs := ibNode.channel.listAllSegmentIDs() + log.Info("Receive close request and syncing all segments", + zap.Int64s("segments", segmentIDs), + zap.String("channel", ibNode.channelName), + ) + + for _, segID := range segmentIDs { + // if segment has data or delete then force sync + insertBuf, hasInsert := ibNode.channel.getCurInsertBuffer(segID) + deleteEntry := ibNode.delBufferManager.GetEntriesNum(segID) + // if insert buf or or delete buf is not empty, trigger sync + if (hasInsert && insertBuf.size > 0) || (deleteEntry > 0) { + syncTasks[segID] = &syncTask{ + buffer: insertBuf, // nil is valid + segmentID: segID, + flushed: false, + dropped: false, + auto: true, + } + } + } + return syncTasks + } + // Auto Sync // TODO: move to segment_sync_policy for _, segID := range seg2Upload { if ibuffer, ok := ibNode.GetBufferIfFull(segID); ok { @@ -357,7 +392,7 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload } } if len(syncSegmentIDs) > 0 { - log.Debug("sync segments", zap.String("vChannel", ibNode.channelName), + log.Info("sync segments", zap.String("vChannel", ibNode.channelName), zap.Int64s("segIDs", syncSegmentIDs)) // TODO: maybe too many prints here } @@ -423,6 +458,7 @@ func (ibNode *insertBufferNode) Sync(fgMsg *flowGraphMsg, seg2Upload []UniqueID, ) // use the flushed pk stats to take current stat var pkStats []*storage.PrimaryKeyStats + // TODO, this has to be async flush, no need to block here. err := retry.Do(ibNode.ctx, func() error { statBlobs, err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index c7f8123699..dba63651bb 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -127,12 +127,18 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) { assert.Error(t, err) } -type mockMsg struct{} +type mockMsg struct { + BaseMsg +} func (*mockMsg) TimeTick() Timestamp { return 0 } +func (*mockMsg) IsClose() bool { + return false +} + func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { t.Run("Test iBNode Operate invalid Msg", func(te *testing.T) { invalidInTests := []struct { @@ -152,8 +158,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { ibn := &insertBufferNode{ ttMerger: newMergedTimeTickerSender(func(Timestamp, []int64) error { return nil }), } - rt := ibn.Operate(test.in) - assert.Empty(t0, rt) + assert.False(t0, ibn.IsValidInMsg(test.in)) }) } }) @@ -712,16 +717,15 @@ func (s *InsertBufferNodeSuit) SetupSuite() { pkType: schemapb.DataType_Int64, } - delBufManager := &DelBufferManager{ + s.collID = 1 + s.partID = 10 + s.channel = newChannel("channel", s.collID, nil, rc, s.cm) + + s.delBufManager = &DelBufferManager{ channel: s.channel, delMemorySize: 0, delBufHeap: &PriorityQueue{}, } - - s.collID = 1 - s.partID = 10 - s.channel = newChannel("channel", s.collID, nil, rc, s.cm) - s.delBufManager = delBufManager s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir)) s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize @@ -903,6 +907,27 @@ func (s *InsertBufferNodeSuit) TestFillInSyncTasks() { s.Assert().False(task.dropped) } }) + + s.Run("test close", func() { + fgMsg := &flowGraphMsg{BaseMsg: flowgraph.NewBaseMsg(true)} + + node := &insertBufferNode{ + channelName: s.channel.channelName, + channel: s.channel, + delBufferManager: s.delBufManager, + flushChan: make(chan flushMsg, 100), + } + + syncTasks := node.FillInSyncTasks(fgMsg, nil) + s.Assert().Equal(1, len(syncTasks)) + for _, task := range syncTasks { + s.Assert().Equal(task.segmentID, int64(1)) + s.Assert().False(task.dropped) + s.Assert().False(task.flushed) + s.Assert().True(task.auto) + } + }) + } func TestInsertBufferNodeSuite(t *testing.T) { diff --git a/internal/datanode/flow_graph_message.go b/internal/datanode/flow_graph_message.go index f3663a228e..ab04f1ceee 100644 --- a/internal/datanode/flow_graph_message.go +++ b/internal/datanode/flow_graph_message.go @@ -27,6 +27,8 @@ type ( // Msg is flowgraph.Msg Msg = flowgraph.Msg + BaseMsg = flowgraph.BaseMsg + // MsgStreamMsg is flowgraph.MsgStreamMsg MsgStreamMsg = flowgraph.MsgStreamMsg @@ -41,6 +43,7 @@ type ( ) type flowGraphMsg struct { + BaseMsg insertMessages []*msgstream.InsertMsg deleteMessages []*msgstream.DeleteMsg timeRange TimeRange @@ -56,6 +59,10 @@ func (fgMsg *flowGraphMsg) TimeTick() Timestamp { return fgMsg.timeRange.timestampMax } +func (fgMsg *flowGraphMsg) IsClose() bool { + return fgMsg.BaseMsg.IsCloseMsg() +} + // flush Msg is used in flowgraph insertBufferNode to flush the given segment type flushMsg struct { msgID UniqueID @@ -63,6 +70,8 @@ type flushMsg struct { segmentID UniqueID collectionID UniqueID flushed bool + //isFlush illustrates if this is a flush or normal sync + isFlush bool } type resendTTMsg struct { diff --git a/internal/datanode/flow_graph_time_tick_node.go b/internal/datanode/flow_graph_time_tick_node.go index 824cc0153f..f08e9bfb35 100644 --- a/internal/datanode/flow_graph_time_tick_node.go +++ b/internal/datanode/flow_graph_time_tick_node.go @@ -55,22 +55,23 @@ func (ttn *ttNode) Name() string { return fmt.Sprintf("ttNode-%s", ttn.vChannelName) } -// Operate handles input messages, implementing flowgraph.Node -func (ttn *ttNode) Operate(in []Msg) []Msg { - if in == nil { - log.Warn("type assertion failed for flowGraphMsg because it's nil") - return []Msg{} +func (ttn *ttNode) IsValidInMsg(in []Msg) bool { + if !ttn.BaseNode.IsValidInMsg(in) { + return false } - - if len(in) != 1 { - log.Warn("Invalid operate message input in ttNode", zap.Int("input length", len(in))) - return []Msg{} - } - - fgMsg, ok := in[0].(*flowGraphMsg) + _, ok := in[0].(*flowGraphMsg) if !ok { log.Warn("type assertion failed for flowGraphMsg", zap.String("name", reflect.TypeOf(in[0]).Name())) - return []Msg{} + return false + } + return true +} + +// Operate handles input messages, implementing flowgraph.Node +func (ttn *ttNode) Operate(in []Msg) []Msg { + fgMsg := in[0].(*flowGraphMsg) + if fgMsg.IsCloseMsg() { + return in } curTs, _ := tsoutil.ParseTS(fgMsg.timeRange.timestampMax) diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index ca680bdcd9..aa6c9bcedb 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -311,6 +311,7 @@ func (m *rendezvousFlushManager) handleInsertTask(segmentID UniqueID, task flush } func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flushDeleteTask, deltaLogs *DelDataBuf, pos *internalpb.MsgPosition) { + log.Info("handling delete task", zap.Int64("segment ID", segmentID)) // in dropping mode if m.dropping.Load() { // preventing separate delete, check position exists in queue first @@ -566,6 +567,7 @@ func (m *rendezvousFlushManager) close() { queue.injectMut.Unlock() return true }) + m.waitForAllFlushQueue() } type flushBufferInsertTask struct { @@ -788,6 +790,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet zap.Int64("SegmentID", pack.segmentID), zap.Int64("CollectionID", dsService.collectionID), zap.Any("startPos", startPos), + zap.Any("checkPoints", checkPoints), zap.Int("Length of Field2BinlogPaths", len(fieldInsert)), zap.Int("Length of Field2Stats", len(fieldStats)), zap.Int("Length of Field2Deltalogs", len(deltaInfos[0].GetBinlogs())), @@ -817,7 +820,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet rsp, err := dsService.dataCoord.SaveBinlogPaths(context.Background(), req) // should be network issue, return error and retry if err != nil { - return fmt.Errorf(err.Error()) + return err } // Segment not found during stale segment flush. Segment might get compacted already. @@ -855,6 +858,10 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet if pack.flushed || pack.dropped { dsService.channel.segmentFlushed(pack.segmentID) } + + if dsService.flushListener != nil { + dsService.flushListener <- pack + } dsService.flushingSegCache.Remove(req.GetSegmentID()) dsService.channel.evictHistoryInsertBuffer(req.GetSegmentID(), pack.pos) dsService.channel.evictHistoryDeleteBuffer(req.GetSegmentID(), pack.pos) diff --git a/internal/datanode/flush_task.go b/internal/datanode/flush_task.go index 9ded3cfa96..2da6f44a07 100644 --- a/internal/datanode/flush_task.go +++ b/internal/datanode/flush_task.go @@ -218,7 +218,14 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack { dropped: t.dropped, } log.Debug("flush pack composed", - zap.Any("pack", pack)) + zap.Int64("segmentID", t.segmentID), + zap.Int("insertLogs", len(t.insertLogs)), + zap.Int("statsLogs", len(t.statsLogs)), + zap.Int("deleteLogs", len(t.deltaLogs)), + zap.Bool("flushed", t.flushed), + zap.Bool("dropped", t.dropped), + ) + if t.insertErr != nil || t.deleteErr != nil { log.Warn("flush task error detected", zap.Error(t.insertErr), zap.Error(t.deleteErr)) pack.err = errors.New("execution failed") diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 78c76c3afe..18bc58ba99 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -207,6 +207,7 @@ type DataCoordFactory struct { GetSegmentInfosError bool GetSegmentInfosNotSuccess bool + UserSegmentInfo map[int64]*datapb.SegmentInfo AddSegmentError bool AddSegmentNotSuccess bool @@ -310,7 +311,9 @@ func (ds *DataCoordFactory) GetSegmentInfo(ctx context.Context, req *datapb.GetS } var segmentInfos []*datapb.SegmentInfo for _, segmentID := range req.SegmentIDs { - if segInfo, ok := segID2SegInfo[segmentID]; ok { + if segInfo, ok := ds.UserSegmentInfo[segmentID]; ok { + segmentInfos = append(segmentInfos, segInfo) + } else if segInfo, ok := segID2SegInfo[segmentID]; ok { segmentInfos = append(segmentInfos, segInfo) } else { segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ @@ -780,9 +783,39 @@ func (df *DataFactory) GenMsgStreamInsertMsg(idx int, chanName string) *msgstrea return msg } -func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int, chanName string) (inMsgs []msgstream.TsMsg) { +func (df *DataFactory) GenMsgStreamInsertMsgWithTs(idx int, chanName string, ts Timestamp) *msgstream.InsertMsg { + var msg = &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(idx)}, + BeginTimestamp: ts, + EndTimestamp: ts, + }, + InsertRequest: internalpb.InsertRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Insert, + MsgID: 0, + Timestamp: ts, + SourceID: 0, + }, + CollectionName: "col1", + PartitionName: "default", + SegmentID: 1, + CollectionID: UniqueID(0), + ShardName: chanName, + Timestamps: []Timestamp{ts}, + RowIDs: []UniqueID{UniqueID(idx)}, + // RowData: []*commonpb.Blob{{Value: df.rawData}}, + FieldsData: df.columnData, + Version: internalpb.InsertDataVersion_ColumnBased, + NumRows: 1, + }, + } + return msg +} + +func (df *DataFactory) GetMsgStreamTsInsertMsgs(n int, chanName string, ts Timestamp) (inMsgs []msgstream.TsMsg) { for i := 0; i < n; i++ { - var msg = df.GenMsgStreamInsertMsg(i, chanName) + var msg = df.GenMsgStreamInsertMsgWithTs(i, chanName, ts) var tsMsg msgstream.TsMsg = msg inMsgs = append(inMsgs, tsMsg) } @@ -816,6 +849,7 @@ func (df *DataFactory) GenMsgStreamDeleteMsg(pks []primaryKey, chanName string) }, CollectionName: "col1", PartitionName: "default", + PartitionID: 1, ShardName: chanName, PrimaryKeys: s.ParsePrimaryKeys2IDs(pks), Timestamps: timestamps, @@ -825,6 +859,33 @@ func (df *DataFactory) GenMsgStreamDeleteMsg(pks []primaryKey, chanName string) return msg } +func (df *DataFactory) GenMsgStreamDeleteMsgWithTs(idx int, pks []primaryKey, chanName string, ts Timestamp) *msgstream.DeleteMsg { + var msg = &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{uint32(idx)}, + BeginTimestamp: ts, + EndTimestamp: ts, + }, + DeleteRequest: internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: 1, + Timestamp: ts, + SourceID: 0, + }, + CollectionName: "col1", + PartitionName: "default", + PartitionID: 1, + CollectionID: UniqueID(0), + ShardName: chanName, + PrimaryKeys: s.ParsePrimaryKeys2IDs(pks), + Timestamps: []Timestamp{ts}, + NumRows: int64(len(pks)), + }, + } + return msg +} + func genFlowGraphInsertMsg(chanName string) flowGraphMsg { timeRange := TimeRange{ timestampMin: 0, diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go index b512b3a3ac..b336148bdd 100644 --- a/internal/datanode/segment_sync_policy.go +++ b/internal/datanode/segment_sync_policy.go @@ -17,7 +17,9 @@ package datanode import ( + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/tsoutil" + "go.uber.org/zap" ) // segmentSyncPolicy sync policy applies to segment @@ -28,7 +30,10 @@ func syncPeriodically() segmentSyncPolicy { return func(segment *Segment, ts Timestamp) bool { endTime := tsoutil.PhysicalTime(ts) lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs) - return endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod && - !segment.isBufferEmpty() + shouldSync := endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod && !segment.isBufferEmpty() + if shouldSync { + log.Info("sync segment periodically ", zap.Time("now", endTime), zap.Time("last sync", lastSyncTime)) + } + return shouldSync } } diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 8d09bb6f4e..1f85ff5dce 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -53,30 +53,26 @@ func (dNode *deleteNode) Name() string { return fmt.Sprintf("dNode-%s", dNode.deltaVchannel) } +func (dNode *deleteNode) IsValidInMsg(in []Msg) bool { + if !dNode.baseNode.IsValidInMsg(in) { + return false + } + _, ok := in[0].(*deleteMsg) + if !ok { + log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name())) + return false + } + return true +} + // Operate handles input messages, do delete operations func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - if in == nil { - log.Debug("type assertion failed for deleteMsg because it's nil", zap.String("name", dNode.Name())) - return []Msg{} - } - - if len(in) != 1 { - log.Warn("Invalid operate message input in deleteNode", zap.Int("input length", len(in)), zap.String("name", dNode.Name())) - return []Msg{} - } - dMsg, ok := in[0].(*deleteMsg) if !ok { log.Warn("type assertion failed for deleteMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", dNode.Name())) return []Msg{} } - delData := &deleteData{ - deleteIDs: map[UniqueID][]primaryKey{}, - deleteTimestamps: map[UniqueID][]Timestamp{}, - deleteOffset: map[UniqueID]int64{}, - } - var spans []opentracing.Span for _, msg := range dMsg.deleteMessages { sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) @@ -84,6 +80,24 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msg.SetTraceCtx(ctx) } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() + + if dMsg.IsCloseMsg() { + return []Msg{ + &serviceTimeMsg{BaseMsg: flowgraph.NewBaseMsg(true)}, + } + } + + delData := &deleteData{ + deleteIDs: map[UniqueID][]primaryKey{}, + deleteTimestamps: map[UniqueID][]Timestamp{}, + deleteOffset: map[UniqueID]int64{}, + } + // 1. filter segment by bloom filter for i, delMsg := range dMsg.deleteMessages { traceID, _, _ := trace.InfoFromSpan(spans[i]) @@ -154,9 +168,6 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { var res Msg = &serviceTimeMsg{ timeRange: dMsg.timeRange, } - for _, sp := range spans { - sp.Finish() - } return []Msg{res} } diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index 9a74c0aa5f..8763879909 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -43,23 +43,21 @@ func (fddNode *filterDeleteNode) Name() string { return fmt.Sprintf("fdNode-%s", fddNode.vchannel) } -// Operate handles input messages, to filter invalid delete messages -func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - if in == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fddNode.Name())) - return []Msg{} +func (fddNode *filterDeleteNode) IsValidInMsg(in []Msg) bool { + if !fddNode.baseNode.IsValidInMsg(in) { + return false } - - if len(in) != 1 { - log.Warn("Invalid operate message input in filterDDNode", zap.Int("input length", len(in)), zap.String("name", fddNode.Name())) - return []Msg{} - } - - msgStreamMsg, ok := in[0].(*MsgStreamMsg) + _, ok := in[0].(*MsgStreamMsg) if !ok { log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fddNode.Name())) - return []Msg{} + return false } + return true +} + +// Operate handles input messages, to filter invalid delete messages +func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + msgStreamMsg := in[0].(*MsgStreamMsg) var spans []opentracing.Span for _, msg := range msgStreamMsg.TsMessages() { @@ -68,6 +66,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msg.SetTraceCtx(ctx) } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() + + if msgStreamMsg.IsCloseMsg() { + return []Msg{ + &deleteMsg{BaseMsg: flowgraph.NewBaseMsg(true)}, + } + } + var dMsg = deleteMsg{ deleteMessages: make([]*msgstream.DeleteMsg, 0), timeRange: TimeRange{ @@ -102,11 +112,8 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { zap.String("vchannel", fddNode.vchannel)) } } - var res Msg = &dMsg - for _, sp := range spans { - sp.Finish() - } - return []Msg{res} + + return []Msg{&dMsg} } // filterInvalidDeleteMessage would filter invalid delete messages @@ -142,7 +149,6 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet // newFilteredDeleteNode returns a new filterDeleteNode func newFilteredDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDeleteNode { - maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index 78a55724dc..c0b585c28f 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -47,22 +47,21 @@ func (fdmNode *filterDmNode) Name() string { return fmt.Sprintf("fdmNode-%s", fdmNode.vchannel) } -// Operate handles input messages, to filter invalid insert messages -func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - if in == nil { - log.Debug("type assertion failed for MsgStreamMsg because it's nil", zap.String("name", fdmNode.Name())) - return []Msg{} +func (fdmNode *filterDmNode) IsValidInMsg(in []Msg) bool { + if !fdmNode.baseNode.IsValidInMsg(in) { + return false } - if len(in) != 1 { - log.Warn("Invalid operate message input in filterDmNode", zap.Int("input length", len(in)), zap.String("name", fdmNode.Name())) - return []Msg{} - } - - msgStreamMsg, ok := in[0].(*MsgStreamMsg) + _, ok := in[0].(*MsgStreamMsg) if !ok { log.Warn("type assertion failed for MsgStreamMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", fdmNode.Name())) - return []Msg{} + return false } + return true +} + +// Operate handles input messages, to filter invalid insert messages +func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + msgStreamMsg := in[0].(*MsgStreamMsg) var spans []opentracing.Span for _, msg := range msgStreamMsg.TsMessages() { @@ -70,6 +69,17 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { spans = append(spans, sp) msg.SetTraceCtx(ctx) } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() + + if msgStreamMsg.IsCloseMsg() { + return []Msg{ + &insertMsg{BaseMsg: flowgraph.NewBaseMsg(true)}, + } + } var iMsg = insertMsg{ insertMessages: make([]*msgstream.InsertMsg, 0), @@ -124,11 +134,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } - var res Msg = &iMsg - for _, sp := range spans { - sp.Finish() - } - return []Msg{res} + return []Msg{&iMsg} } // filterInvalidDeleteMessage would filter out invalid delete messages @@ -230,7 +236,6 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg // newFilteredDmNode returns a new filterDmNode func newFilteredDmNode(metaReplica ReplicaInterface, collectionID UniqueID, vchannel Channel) *filterDmNode { - maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index f8cac99f7c..01da03f428 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -71,22 +71,38 @@ func (iNode *insertNode) Name() string { return fmt.Sprintf("iNode-%s", iNode.vchannel) } -// Operate handles input messages, to execute insert operations -func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - if in == nil { - log.Debug("type assertion failed for insertMsg because it's nil", zap.String("name", iNode.Name())) - return []Msg{} +func (iNode *insertNode) IsValidInMsg(in []Msg) bool { + if !iNode.baseNode.IsValidInMsg(in) { + return false } - - if len(in) != 1 { - log.Warn("Invalid operate message input in insertNode", zap.Int("input length", len(in)), zap.String("name", iNode.Name())) - return []Msg{} - } - - iMsg, ok := in[0].(*insertMsg) + _, ok := in[0].(*insertMsg) if !ok { log.Warn("type assertion failed for insertMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", iNode.Name())) - return []Msg{} + return false + } + return true +} + +// Operate handles input messages, to execute insert operations +func (iNode *insertNode) Operate(in []Msg) []Msg { + iMsg := in[0].(*insertMsg) + + var spans []opentracing.Span + for _, msg := range iMsg.insertMessages { + sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) + spans = append(spans, sp) + msg.SetTraceCtx(ctx) + } + defer func() { + for _, sp := range spans { + sp.Finish() + } + }() + + if iMsg.IsCloseMsg() { + return []Msg{ + &serviceTimeMsg{BaseMsg: flowgraph.NewBaseMsg(true)}, + } } iData := insertData{ @@ -97,13 +113,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { insertPKs: make(map[UniqueID][]primaryKey), } - var spans []opentracing.Span - for _, msg := range iMsg.insertMessages { - sp, ctx := trace.StartSpanFromContext(msg.TraceCtx()) - spans = append(spans, sp) - msg.SetTraceCtx(ctx) - } - collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID) if err != nil { // QueryNode should add collection before start flow graph @@ -295,9 +304,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { var res Msg = &serviceTimeMsg{ timeRange: iMsg.timeRange, } - for _, sp := range spans { - sp.Finish() - } return []Msg{res} } diff --git a/internal/querynode/flow_graph_message.go b/internal/querynode/flow_graph_message.go index c9ee444830..c1b514d851 100644 --- a/internal/querynode/flow_graph_message.go +++ b/internal/querynode/flow_graph_message.go @@ -23,12 +23,14 @@ import ( // Msg is an interface which has a function named TimeTick type Msg = flowgraph.Msg +type BaseMsg = flowgraph.BaseMsg // MsgStreamMsg is an implementation of interface Msg type MsgStreamMsg = flowgraph.MsgStreamMsg // insertMsg is an implementation of interface Msg type insertMsg struct { + BaseMsg insertMessages []*msgstream.InsertMsg deleteMessages []*msgstream.DeleteMsg timeRange TimeRange @@ -36,12 +38,14 @@ type insertMsg struct { // deleteMsg is an implementation of interface Msg type deleteMsg struct { + BaseMsg deleteMessages []*msgstream.DeleteMsg timeRange TimeRange } // serviceTimeMsg is an implementation of interface Msg type serviceTimeMsg struct { + BaseMsg timeRange TimeRange } @@ -50,12 +54,24 @@ func (iMsg *insertMsg) TimeTick() Timestamp { return iMsg.timeRange.timestampMax } +func (iMsg *insertMsg) IsClose() bool { + return iMsg.IsCloseMsg() +} + // TimeTick returns timestamp of deleteMsg func (dMsg *deleteMsg) TimeTick() Timestamp { return dMsg.timeRange.timestampMax } +func (dMsg *deleteMsg) IsClose() bool { + return dMsg.IsCloseMsg() +} + // TimeTick returns timestamp of serviceTimeMsg func (stMsg *serviceTimeMsg) TimeTick() Timestamp { return stMsg.timeRange.timestampMax } + +func (stMsg *serviceTimeMsg) IsClose() bool { + return stMsg.IsCloseMsg() +} diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 36bb174257..6ceeb8cabb 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -41,22 +41,28 @@ func (stNode *serviceTimeNode) Name() string { return fmt.Sprintf("stNode-%s", stNode.vChannel) } -// Operate handles input messages, to execute insert operations -func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { - if in == nil { - log.Debug("type assertion failed for serviceTimeMsg because it's nil", zap.String("name", stNode.Name())) - return []Msg{} +func (stNode *serviceTimeNode) IsValidInMsg(in []Msg) bool { + if !stNode.baseNode.IsValidInMsg(in) { + return false } - - if len(in) != 1 { - log.Warn("Invalid operate message input in serviceTimeNode, input length = ", zap.Int("input node", len(in)), zap.String("name", stNode.Name())) - return []Msg{} - } - - serviceTimeMsg, ok := in[0].(*serviceTimeMsg) + _, ok := in[0].(*serviceTimeMsg) if !ok { log.Warn("type assertion failed for serviceTimeMsg", zap.String("msgType", reflect.TypeOf(in[0]).Name()), zap.String("name", stNode.Name())) - return []Msg{} + return false + } + return true +} + +// Operate handles input messages, to execute insert operations +func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { + serviceTimeMsg := in[0].(*serviceTimeMsg) + if serviceTimeMsg.IsCloseMsg() { + log.Info("service node hit close msg", + zap.Int64("collectionID", stNode.collectionID), + zap.Uint64("tSafe", serviceTimeMsg.timeRange.timestampMax), + zap.String("channel", stNode.vChannel), + ) + return in } // update service time diff --git a/internal/util/flowgraph/flow_graph_test.go b/internal/util/flowgraph/flow_graph_test.go index bb6d28eeed..a8a9fb4a22 100644 --- a/internal/util/flowgraph/flow_graph_test.go +++ b/internal/util/flowgraph/flow_graph_test.go @@ -56,12 +56,16 @@ func (m *numMsg) TimeTick() Timestamp { return Timestamp(0) } +func (m *numMsg) IsClose() bool { + return false +} + func (n *nodeA) Name() string { return "NodeA" } func (n *nodeA) Operate(in []Msg) []Msg { - // ignore `in` because nodeA doesn't have any upstream node. + // ignore `in` because nodeA doesn't have any upstream node.git s a := <-n.inputChan var res Msg = &numMsg{ num: a, diff --git a/internal/util/flowgraph/input_node.go b/internal/util/flowgraph/input_node.go index e62a331110..85dd4f1ac2 100644 --- a/internal/util/flowgraph/input_node.go +++ b/internal/util/flowgraph/input_node.go @@ -37,6 +37,7 @@ import ( type InputNode struct { BaseNode inStream msgstream.MsgStream + lastMsg *msgstream.MsgPack name string role string nodeID int64 @@ -62,6 +63,10 @@ func (inNode *InputNode) Close() { }) } +func (inNode *InputNode) IsValidInMsg(in []Msg) bool { + return true +} + // Name returns node name func (inNode *InputNode) Name() string { return inNode.name @@ -77,8 +82,19 @@ func (inNode *InputNode) Operate(in []Msg) []Msg { msgPack, ok := <-inNode.inStream.Chan() if !ok { log.Warn("MsgStream closed", zap.Any("input node", inNode.Name())) + if inNode.lastMsg != nil { + log.Info("trigger force sync", zap.Int64("collection", inNode.collectionID), zap.Any("position", inNode.lastMsg)) + return []Msg{&MsgStreamMsg{ + BaseMsg: NewBaseMsg(true), + tsMessages: []msgstream.TsMsg{}, + timestampMin: inNode.lastMsg.BeginTs, + timestampMax: inNode.lastMsg.EndTs, + startPositions: inNode.lastMsg.StartPositions, + endPositions: inNode.lastMsg.EndPositions, + }} + } return []Msg{&MsgStreamMsg{ - isCloseMsg: true, + BaseMsg: NewBaseMsg(true), }} } @@ -87,6 +103,7 @@ func (inNode *InputNode) Operate(in []Msg) []Msg { return []Msg{} } + inNode.lastMsg = msgPack sub := tsoutil.SubByNow(msgPack.EndTs) if inNode.role == typeutil.QueryNodeRole { metrics.QueryNodeConsumerMsgCount. diff --git a/internal/util/flowgraph/message.go b/internal/util/flowgraph/message.go index f2e8bcd563..90d144aab1 100644 --- a/internal/util/flowgraph/message.go +++ b/internal/util/flowgraph/message.go @@ -23,16 +23,31 @@ import ( // Msg is an abstract class that contains a method to get the time tick of this message type Msg interface { TimeTick() Timestamp + IsClose() bool +} + +type BaseMsg struct { + isCloseMsg bool +} + +func (msg BaseMsg) IsCloseMsg() bool { + return msg.isCloseMsg +} + +func NewBaseMsg(isCloseMsg bool) BaseMsg { + return BaseMsg{ + isCloseMsg: isCloseMsg, + } } // MsgStreamMsg is a wrapper of TsMsg in flowgraph type MsgStreamMsg struct { + BaseMsg tsMessages []msgstream.TsMsg timestampMin Timestamp timestampMax Timestamp startPositions []*MsgPosition endPositions []*MsgPosition - isCloseMsg bool } // GenerateMsgStreamMsg is used to create a new MsgStreamMsg object @@ -51,6 +66,10 @@ func (msMsg *MsgStreamMsg) TimeTick() Timestamp { return msMsg.timestampMax } +func (msMsg *MsgStreamMsg) IsClose() bool { + return msMsg.isCloseMsg +} + // DownStreamNodeIdx returns 0 func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int { return 0 diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index d2d22c6032..2e977888b3 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -38,6 +38,7 @@ type Node interface { Name() string MaxQueueLength() int32 MaxParallelism() int32 + IsValidInMsg(in []Msg) bool Operate(in []Msg) []Msg IsInputNode() bool Start() @@ -85,8 +86,7 @@ func (nodeCtx *nodeCtx) Unblock() { func isCloseMsg(msgs []Msg) bool { if len(msgs) == 1 { - msg, ok := msgs[0].(*MsgStreamMsg) - return ok && msg.isCloseMsg + return msgs[0].IsClose() } return false } @@ -118,15 +118,14 @@ func (nodeCtx *nodeCtx) work() { input = <-nodeCtx.inputChannel } // the input message decides whether the operate method is executed - if isCloseMsg(input) { - output = input - } - if len(output) == 0 { - n := nodeCtx.node - nodeCtx.blockMutex.RLock() - output = n.Operate(input) + n := nodeCtx.node + nodeCtx.blockMutex.RLock() + if !n.IsValidInMsg(input) { nodeCtx.blockMutex.RUnlock() + continue } + output = n.Operate(input) + nodeCtx.blockMutex.RUnlock() // the output decide whether the node should be closed. if isCloseMsg(output) { close(nodeCtx.closeCh) @@ -186,3 +185,24 @@ func (node *BaseNode) Start() {} // Close implementing Node, base node does nothing when stops func (node *BaseNode) Close() {} + +func (node *BaseNode) Name() string { + return "BaseNode" +} + +func (node *BaseNode) Operate(in []Msg) []Msg { + return in +} + +func (node *BaseNode) IsValidInMsg(in []Msg) bool { + if in == nil { + log.Info("type assertion failed because it's nil") + return false + } + + if len(in) != 1 { + log.Warn("Invalid operate message input", zap.Int("input length", len(in))) + return false + } + return true +}