mirror of https://github.com/milvus-io/milvus.git
Add more logs on write path (#14600)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/14774/head
parent
841c5af42e
commit
7abebe576c
|
@ -262,7 +262,15 @@ func (c *ChannelManager) Watch(ch *channel) error {
|
|||
c.fillChannelPosition(v)
|
||||
}
|
||||
}
|
||||
return c.store.Update(updates)
|
||||
err := c.store.Update(updates)
|
||||
if err != nil {
|
||||
log.Error("ChannelManager RWChannelStore update failed", zap.Int64("collectionID", ch.CollectionID),
|
||||
zap.String("channelName", ch.Name), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("ChannelManager RWChannelStore update success", zap.Int64("collectionID", ch.CollectionID),
|
||||
zap.String("channelName", ch.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
|
||||
|
|
|
@ -286,12 +286,16 @@ func (node *DataNode) checkWatchedList() error {
|
|||
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
|
||||
switch evt.Type {
|
||||
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
|
||||
log.Debug("DataNode handleChannelEvt EventTypePut", zap.String("key", string(evt.Kv.Key)))
|
||||
node.handleWatchInfo(string(evt.Kv.Key), evt.Kv.Value)
|
||||
case clientv3.EventTypeDelete:
|
||||
// guaranteed there is no "/" in channel name
|
||||
parts := strings.Split(string(evt.Kv.Key), "/")
|
||||
vchanName := parts[len(parts)-1]
|
||||
log.Warn("handle channel delete event", zap.Int64("node id", Params.DataNodeCfg.NodeID), zap.String("vchannel", vchanName))
|
||||
log.Debug("DataNode handleChannelEvt EventTypeDelete",
|
||||
zap.String("key", string(evt.Kv.Key)),
|
||||
zap.String("vChanName", vchanName),
|
||||
zap.Int64("node id", Params.DataNodeCfg.NodeID))
|
||||
node.ReleaseDataSyncService(vchanName)
|
||||
}
|
||||
}
|
||||
|
@ -303,28 +307,37 @@ func (node *DataNode) handleWatchInfo(key string, data []byte) {
|
|||
log.Warn("fail to parse ChannelWatchInfo", zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Debug("DataNode handleWatchInfo Unmarshal success")
|
||||
if watchInfo.State == datapb.ChannelWatchState_Complete {
|
||||
log.Warn("DataNode handleWatchInfo State is already ChannelWatchState_Complete")
|
||||
return
|
||||
}
|
||||
if watchInfo.Vchan == nil {
|
||||
log.Warn("found ChannelWatchInfo with nil VChannelInfo", zap.String("key", key))
|
||||
return
|
||||
}
|
||||
log.Warn("DataNode handleWatchInfo try to NewDataSyncService", zap.String("key", key))
|
||||
err = node.NewDataSyncService(watchInfo.Vchan)
|
||||
if err != nil {
|
||||
log.Warn("fail to create DataSyncService", zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.Warn("DataNode handleWatchInfo NewDataSyncService success", zap.String("key", key))
|
||||
|
||||
watchInfo.State = datapb.ChannelWatchState_Complete
|
||||
v, err := proto.Marshal(&watchInfo)
|
||||
if err != nil {
|
||||
log.Warn("fail to Marshal watchInfo", zap.String("key", key), zap.Error(err))
|
||||
log.Warn("DataNode handleWatchInfo fail to Marshal watchInfo", zap.String("key", key), zap.Error(err))
|
||||
return
|
||||
}
|
||||
k := path.Join(Params.DataNodeCfg.ChannelWatchSubPath, fmt.Sprintf("%d", node.NodeID), watchInfo.GetVchan().GetChannelName())
|
||||
log.Warn("DataNode handleWatchInfo try to Save", zap.String("key", key),
|
||||
zap.String("k", k),
|
||||
zap.String("v", string(v)))
|
||||
|
||||
err = node.watchKv.Save(k, string(v))
|
||||
if err != nil {
|
||||
log.Warn("fail to change WatchState to complete", zap.String("key", key), zap.Error(err))
|
||||
log.Warn("DataNode handleWatchInfo fail to change WatchState to complete", zap.String("key", key), zap.Error(err))
|
||||
node.ReleaseDataSyncService(key)
|
||||
}
|
||||
}
|
||||
|
@ -344,7 +357,8 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
|
|||
|
||||
var alloc allocatorInterface = newAllocator(node.rootCoord)
|
||||
|
||||
log.Debug("Received Vchannel Info",
|
||||
log.Debug("DataNode NewDataSyncService received Vchannel Info",
|
||||
zap.Int64("collectionID", vchan.CollectionID),
|
||||
zap.Int("Unflushed Segment Number", len(vchan.GetUnflushedSegments())),
|
||||
zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())),
|
||||
)
|
||||
|
@ -353,13 +367,16 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
|
|||
|
||||
dataSyncService, err := newDataSyncService(node.ctx, flushCh, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache, node.blobKv, node.compactionExecutor)
|
||||
if err != nil {
|
||||
log.Error("DataNode NewDataSyncService newDataSyncService failed",
|
||||
zap.Error(err),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
|
||||
node.vchan2FlushChs[vchan.GetChannelName()] = flushCh
|
||||
|
||||
log.Info("Start New dataSyncService",
|
||||
log.Info("DataNode NewDataSyncService success",
|
||||
zap.Int64("Collection ID", vchan.GetCollectionID()),
|
||||
zap.String("Vchannel name", vchan.GetChannelName()),
|
||||
)
|
||||
|
|
|
@ -115,16 +115,19 @@ func newParallelConfig() parallelConfig {
|
|||
// start starts the flowgraph in datasyncservice
|
||||
func (dsService *dataSyncService) start() {
|
||||
if dsService.fg != nil {
|
||||
log.Debug("Data Sync Service starting flowgraph")
|
||||
log.Debug("dataSyncService starting flowgraph", zap.Int64("collectionID", dsService.collectionID),
|
||||
zap.String("vChanName", dsService.vchannelName))
|
||||
dsService.fg.Start()
|
||||
} else {
|
||||
log.Debug("Data Sync Service flowgraph nil")
|
||||
log.Warn("dataSyncService starting flowgraph is nil", zap.Int64("collectionID", dsService.collectionID),
|
||||
zap.String("vChanName", dsService.vchannelName))
|
||||
}
|
||||
}
|
||||
|
||||
func (dsService *dataSyncService) close() {
|
||||
if dsService.fg != nil {
|
||||
log.Debug("Data Sync Service closing flowgraph")
|
||||
log.Debug("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID),
|
||||
zap.String("vChanName", dsService.vchannelName))
|
||||
dsService.fg.Close()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue