
630 lines
20 KiB
Raw Normal View History

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package datanode implements data persistence logic.
// Data node persists insert logs into persistent storage like minIO/S3.
package datanode
import (
v3rpc ""
clientv3 ""
etcdkv ""
const (
// RPCConnectionTimeout is used to set the timeout for rpc request
RPCConnectionTimeout = 30 * time.Second
// ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd
ConnectEtcdMaxRetryTime = 100
// ImportCallTimeout is the timeout used in Import() method calls
// This value is equal to RootCoord's task expire time
ImportCallTimeout = 15 * 60 * time.Second
var getFlowGraphServiceAttempts = uint(50)
// makes sure DataNode implements types.DataNode
var _ types.DataNode = (*DataNode)(nil)
// Params from config.yaml
var Params *paramtable.ComponentParam = paramtable.Get()
// DataNode communicates with outside services and unioun all
// services in datanode package.
// DataNode implements `types.Component`, `types.DataNode` interfaces.
// `etcdCli` is a connection of etcd
// `rootCoord` is a grpc client of root coordinator.
// `dataCoord` is a grpc client of data service.
// `NodeID` is unique to each datanode.
// `State` is current statement of this data node, indicating whether it's healthy.
// `clearSignal` is a signal channel for releasing the flowgraph resources.
// `segmentCache` stores all flushing and flushed segments.
type DataNode struct {
ctx context.Context
cancel context.CancelFunc
Role string
stateCode atomic.Value // commonpb.StateCode_Initializing
flowgraphManager *flowgraphManager
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]
clearSignal chan string // vchannel name
segmentCache *Cache
compactionExecutor *compactionExecutor
timeTickSender *timeTickSender
etcdCli *clientv3.Client
address string
rootCoord types.RootCoord
dataCoord types.DataCoord
//call once
initOnce sync.Once
startOnce sync.Once
wg sync.WaitGroup
sessionMu sync.Mutex // to fix data race
session *sessionutil.Session
watchKv kv.WatchKV
chunkManager storage.ChunkManager
allocator allocator.Allocator
closer io.Closer
dispClient msgdispatcher.Client
factory dependency.Factory
reportImportRetryTimes uint // unitest set this value to 1 to save time, default is 10
// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory dependency.Factory) *DataNode {
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
Role: typeutil.DataNodeRole,
rootCoord: nil,
dataCoord: nil,
factory: factory,
segmentCache: newCache(),
compactionExecutor: newCompactionExecutor(),
eventManagerMap: typeutil.NewConcurrentMap[string, *channelEventManager](),
flowgraphManager: newFlowgraphManager(),
clearSignal: make(chan string, 100),
reportImportRetryTimes: 10,
return node
func (node *DataNode) SetAddress(address string) {
node.address = address
func (node *DataNode) GetAddress() string {
return node.address
// SetEtcdClient sets etcd client for DataNode
func (node *DataNode) SetEtcdClient(etcdCli *clientv3.Client) {
node.etcdCli = etcdCli
// SetRootCoord sets RootCoord's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetRootCoord(rc types.RootCoord) error {
switch {
2021-06-21 09:28:03 +00:00
case rc == nil, node.rootCoord != nil:
return errors.New("nil parameter or repeatedly set")
2021-06-21 09:28:03 +00:00
node.rootCoord = rc
return nil
// SetDataCoord sets data service's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetDataCoord(ds types.DataCoord) error {
switch {
case ds == nil, node.dataCoord != nil:
return errors.New("nil parameter or repeatedly set")
node.dataCoord = ds
return nil
// Register register datanode to etcd
func (node *DataNode) Register() error {
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Inc()
log.Info("DataNode Register Finished")
// Start liveness check
node.session.LivenessCheck(node.ctx, func() {
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID))
if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataNodeRole).Dec()
// manually send signal to starter goroutine
if node.session.TriggerKill {
if p, err := os.FindProcess(os.Getpid()); err == nil {
return nil
func (node *DataNode) initSession() error {
node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), node.etcdCli)
if node.session == nil {
return errors.New("failed to initialize session")
node.session.Init(typeutil.DataNodeRole, node.address, false, true)
return nil
// initRateCollector creates and starts rateCollector in QueryNode.
func (node *DataNode) initRateCollector() error {
err := initGlobalRateCollector()
if err != nil {
return err
return nil
func (node *DataNode) Init() error {
var initError error
node.initOnce.Do(func() {
logutil.Logger(node.ctx).Info("DataNode server initializing",
zap.String("TimeTickChannelName", Params.CommonCfg.DataCoordTimeTick.GetValue()),
if err := node.initSession(); err != nil {
log.Error("DataNode server init session failed", zap.Error(err))
initError = err
err := node.initRateCollector()
if err != nil {
log.Error("DataNode server init rateCollector failed", zap.Int64("node ID", paramtable.GetNodeID()), zap.Error(err))
initError = err
log.Info("DataNode server init rateCollector done", zap.Int64("node ID", paramtable.GetNodeID()))
node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.DataNodeRole, paramtable.GetNodeID())
log.Info("DataNode server init dispatcher client done", zap.Int64("node ID", paramtable.GetNodeID()))
alloc, err := allocator.New(context.Background(), node.rootCoord, paramtable.GetNodeID())
if err != nil {
log.Error("failed to create id allocator",
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNode ID", paramtable.GetNodeID()))
initError = err
node.allocator = alloc
log.Info("DataNode server init succeeded",
zap.String("MsgChannelSubName", Params.CommonCfg.DataNodeSubName.GetValue()))
return initError
// StartWatchChannels start loop to watch channel allocation status via kv(etcd for now)
func (node *DataNode) StartWatchChannels(ctx context.Context) {
defer node.wg.Done()
defer logutil.LogPanic()
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
// TODO, this is risky, we'd better watch etcd with revision rather simply a path
watchPrefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID))
log.Info("Start watch channel", zap.String("prefix", watchPrefix))
evtChan := node.watchKv.WatchWithPrefix(watchPrefix)
// after watch, first check all exists nodes first
err := node.checkWatchedList()
if err != nil {
log.Warn("StartWatchChannels failed", zap.Error(err))
for {
select {
case <-ctx.Done():
log.Info("watch etcd loop quit")
case event, ok := <-evtChan:
if !ok {
log.Warn("datanode failed to watch channel, return")
go node.StartWatchChannels(ctx)
if err := event.Err(); err != nil {
log.Warn("datanode watch channel canceled", zap.Error(event.Err()))
if event.Err() == v3rpc.ErrCompacted {
go node.StartWatchChannels(ctx)
// if watch loop return due to event canceled, the datanode is not functional anymore
log.Panic("datanode is not functional for event canceled", zap.Error(err))
for _, evt := range event.Events {
// We need to stay in order until events enqueued
// checkWatchedList list all nodes under [prefix]/channel/{node_id} and make sure all nodeds are watched
// serves the corner case for etcd connection lost and missing some events
func (node *DataNode) checkWatchedList() error {
// REF MEP#7 watch path should be [prefix]/channel/{node_id}/{channel_name}
prefix := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", paramtable.GetNodeID()))
keys, values, err := node.watchKv.LoadWithPrefix(prefix)
if err != nil {
return err
for i, val := range values {
node.handleWatchInfo(&event{eventType: putEventType}, keys[i], []byte(val))
return nil
// handleChannelEvt handles event from kv watch event
func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
var e *event
switch evt.Type {
case clientv3.EventTypePut: // datacoord shall put channels needs to be watched here
e = &event{
eventType: putEventType,
version: evt.Kv.Version,
case clientv3.EventTypeDelete:
e = &event{
eventType: deleteEventType,
version: evt.Kv.Version,
node.handleWatchInfo(e, string(evt.Kv.Key), evt.Kv.Value)
func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
switch e.eventType {
case putEventType:
watchInfo, err := parsePutEventData(data)
if err != nil {
log.Warn("fail to handle watchInfo", zap.Int("event type", e.eventType), zap.String("key", key), zap.Error(err))
if isEndWatchState(watchInfo.State) {
log.Info("DataNode received a PUT event with an end State", zap.String("state", watchInfo.State.String()))
if watchInfo.Progress != 0 {
log.Info("DataNode received a PUT event with tickler update progress", zap.String("channel", watchInfo.Vchan.ChannelName), zap.Int64("version", e.version))
} = watchInfo
e.vChanName = watchInfo.GetVchan().GetChannelName()
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
case deleteEventType:
e.vChanName = parseDeleteEventKey(key)
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))
actualManager, loaded := node.eventManagerMap.GetOrInsert(e.vChanName, newChannelEventManager(
node.handlePutEvent, node.handleDeleteEvent, retryWatchInterval,
if !loaded {
// Whenever a delete event comes, this eventManager will be removed from map
if e.eventType == deleteEventType {
if m, loaded := node.eventManagerMap.GetAndRemove(e.vChanName); loaded {
func parsePutEventData(data []byte) (*datapb.ChannelWatchInfo, error) {
watchInfo := datapb.ChannelWatchInfo{}
err := proto.Unmarshal(data, &watchInfo)
if err != nil {
return nil, fmt.Errorf("invalid event data: fail to parse ChannelWatchInfo, err: %v", err)
if watchInfo.Vchan == nil {
return nil, fmt.Errorf("invalid event: ChannelWatchInfo with nil VChannelInfo")
return &watchInfo, nil
func parseDeleteEventKey(key string) string {
parts := strings.Split(key, "/")
vChanName := parts[len(parts)-1]
return vChanName
func (node *DataNode) handlePutEvent(watchInfo *datapb.ChannelWatchInfo, version int64) (err error) {
vChanName := watchInfo.GetVchan().GetChannelName()
key := path.Join(Params.CommonCfg.DataCoordWatchSubPath.GetValue(), fmt.Sprintf("%d", node.GetSession().ServerID), vChanName)
tickler := newTickler(version, key, watchInfo, node.watchKv, Params.DataNodeCfg.WatchEventTicklerInterval.GetAsDuration(time.Second))
switch watchInfo.State {
case datapb.ChannelWatchState_Uncomplete, datapb.ChannelWatchState_ToWatch:
if err := node.flowgraphManager.addAndStart(node, watchInfo.GetVchan(), watchInfo.GetSchema(), tickler); err != nil {
log.Warn("handle put event: new data sync service failed", zap.String("vChanName", vChanName), zap.Error(err))
watchInfo.State = datapb.ChannelWatchState_WatchFailure
} else {
log.Info("handle put event: new data sync service success", zap.String("vChanName", vChanName))
watchInfo.State = datapb.ChannelWatchState_WatchSuccess
case datapb.ChannelWatchState_ToRelease:
// there is no reason why we release fail
watchInfo.State = datapb.ChannelWatchState_ReleaseSuccess
v, err := proto.Marshal(watchInfo)
if err != nil {
return fmt.Errorf("fail to marshal watchInfo with state, vChanName: %s, state: %s ,err: %w", vChanName, watchInfo.State.String(), err)
success, err := node.watchKv.CompareVersionAndSwap(key, tickler.version, string(v))
// etcd error
if err != nil {
// flow graph will leak if not release, causing new datanode failed to subscribe
log.Warn("fail to update watch state to etcd", zap.String("vChanName", vChanName),
zap.String("state", watchInfo.State.String()), zap.Error(err))
return err
// etcd valid but the states updated.
if !success {
log.Info("handle put event: failed to compare version and swap, release flowgraph",
zap.String("key", key), zap.String("state", watchInfo.State.String()),
zap.String("vChanName", vChanName))
// flow graph will leak if not release, causing new datanode failed to subscribe
return nil
log.Info("handle put event success", zap.String("key", key),
zap.String("state", watchInfo.State.String()), zap.String("vChanName", vChanName))
return nil
func (node *DataNode) handleDeleteEvent(vChanName string) {
// tryToReleaseFlowgraph tries to release a flowgraph
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
// BackGroundGC runs in background to release datanode resources
// GOOSE TODO: remove background GC, using ToRelease for drop-collection after #15846
func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
defer node.wg.Done()
log.Info("DataNode Background GC Start")
for {
select {
case vchanName := <-vChannelCh:
case <-node.ctx.Done():
log.Warn("DataNode context done, exiting background GC")
// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
var startErr error
node.startOnce.Do(func() {
if err := node.allocator.Start(); err != nil {
log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole))
startErr = err
log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole))
rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
Count: 1,
if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
startErr = errors.New("DataNode fail to alloc timestamp")
connectEtcdFn := func() error {
etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
node.watchKv = etcdKV
return nil
err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime))
if err != nil {
startErr = errors.New("DataNode fail to connect etcd")
chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
if err != nil {
startErr = err
node.chunkManager = chunkManager
go node.BackGroundGC(node.clearSignal)
go node.compactionExecutor.start(node.ctx)
if Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
node.timeTickSender = newTimeTickSender(node.dataCoord, node.session.ServerID)
go node.timeTickSender.start(node.ctx)
// Start node watch node
go node.StartWatchChannels(node.ctx)
go node.flowgraphManager.start()
return startErr
// UpdateStateCode updates datanode's state code
func (node *DataNode) UpdateStateCode(code commonpb.StateCode) {
// GetStateCode return datanode's state code
func (node *DataNode) GetStateCode() commonpb.StateCode {
return node.stateCode.Load().(commonpb.StateCode)
func (node *DataNode) isHealthy() bool {
return node.GetStateCode() == commonpb.StateCode_Healthy
// ReadyToFlush tells whether DataNode is ready for flushing
func (node *DataNode) ReadyToFlush() error {
if !node.isHealthy() {
return errors.New("DataNode not in HEALTHY state")
return nil
// Stop will release DataNode resources and shutdown datanode
func (node *DataNode) Stop() error {
node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool {
return true
if node.allocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
if node.closer != nil {
err := node.closer.Close()
if err != nil {
return err
if node.session != nil {
return nil
// to fix data race
func (node *DataNode) SetSession(session *sessionutil.Session) {
defer node.sessionMu.Unlock()
node.session = session
// to fix data race
func (node *DataNode) GetSession() *sessionutil.Session {
defer node.sessionMu.Unlock()
return node.session