mirror of https://github.com/milvus-io/milvus.git
324 lines
9.3 KiB
Go
324 lines
9.3 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package datanode implements data persistence logic.
|
|
//
|
|
// Data node persists insert logs into persistent storage like minIO/S3.
|
|
package datanode
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/tidwall/gjson"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
|
"github.com/milvus-io/milvus/internal/datanode/compactor"
|
|
"github.com/milvus-io/milvus/internal/datanode/importv2"
|
|
"github.com/milvus-io/milvus/internal/datanode/index"
|
|
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
|
|
"github.com/milvus-io/milvus/internal/types"
|
|
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/metrics"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/conc"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/expr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/lifetime"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
const (
|
|
// ConnectEtcdMaxRetryTime is used to limit the max retry time for connection etcd
|
|
ConnectEtcdMaxRetryTime = 100
|
|
)
|
|
|
|
// makes sure DataNode implements types.DataNode
|
|
var _ types.DataNode = (*DataNode)(nil)
|
|
|
|
// Params from config.yaml
|
|
var Params *paramtable.ComponentParam = paramtable.Get()
|
|
|
|
// DataNode communicates with outside services and unioun all
|
|
// services in datanode package.
|
|
//
|
|
// DataNode implements `types.Component`, `types.DataNode` interfaces.
|
|
//
|
|
// `etcdCli` is a connection of etcd
|
|
// `rootCoord` is a grpc client of root coordinator.
|
|
// `dataCoord` is a grpc client of data service.
|
|
// `stateCode` is current statement of this data node, indicating whether it's healthy.
|
|
type DataNode struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
Role string
|
|
lifetime lifetime.Lifetime[commonpb.StateCode]
|
|
|
|
syncMgr syncmgr.SyncManager
|
|
importTaskMgr importv2.TaskManager
|
|
importScheduler importv2.Scheduler
|
|
|
|
// indexnode related
|
|
storageFactory StorageFactory
|
|
taskScheduler *index.TaskScheduler
|
|
taskManager *index.TaskManager
|
|
|
|
compactionExecutor compactor.Executor
|
|
|
|
etcdCli *clientv3.Client
|
|
address string
|
|
|
|
// call once
|
|
initOnce sync.Once
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
sessionMu sync.Mutex // to fix data race
|
|
session *sessionutil.Session
|
|
|
|
closer io.Closer
|
|
|
|
reportImportRetryTimes uint // unitest set this value to 1 to save time, default is 10
|
|
pool *conc.Pool[any]
|
|
|
|
totalSlot int64
|
|
|
|
metricsRequest *metricsinfo.MetricsRequest
|
|
}
|
|
|
|
// NewDataNode will return a DataNode with abnormal state.
|
|
func NewDataNode(ctx context.Context) *DataNode {
|
|
rand.Seed(time.Now().UnixNano())
|
|
ctx2, cancel2 := context.WithCancel(ctx)
|
|
node := &DataNode{
|
|
ctx: ctx2,
|
|
cancel: cancel2,
|
|
Role: typeutil.DataNodeRole,
|
|
lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal),
|
|
compactionExecutor: compactor.NewExecutor(),
|
|
reportImportRetryTimes: 10,
|
|
metricsRequest: metricsinfo.NewMetricsRequest(),
|
|
totalSlot: index.CalculateNodeSlots(),
|
|
}
|
|
sc := index.NewTaskScheduler(ctx2)
|
|
node.storageFactory = NewChunkMgrFactory()
|
|
node.taskScheduler = sc
|
|
node.taskManager = index.NewTaskManager(ctx2)
|
|
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
|
expr.Register("datanode", node)
|
|
return node
|
|
}
|
|
|
|
func (node *DataNode) SetAddress(address string) {
|
|
node.address = address
|
|
}
|
|
|
|
func (node *DataNode) GetAddress() string {
|
|
return node.address
|
|
}
|
|
|
|
// SetEtcdClient sets etcd client for DataNode
|
|
func (node *DataNode) SetEtcdClient(etcdCli *clientv3.Client) {
|
|
node.etcdCli = etcdCli
|
|
}
|
|
|
|
// SetRootCoordClient sets RootCoord's grpc client, error is returned if repeatedly set.
|
|
func (node *DataNode) SetMixCoordClient(mixc types.MixCoordClient) error {
|
|
return nil
|
|
}
|
|
|
|
// Register register datanode to etcd
|
|
func (node *DataNode) Register() error {
|
|
log := log.Ctx(node.ctx)
|
|
log.Debug("node begin to register to etcd", zap.String("serverName", node.session.ServerName), zap.Int64("ServerID", node.session.ServerID))
|
|
node.session.Register()
|
|
|
|
metrics.NumNodes.WithLabelValues(fmt.Sprint(node.GetNodeID()), typeutil.DataNodeRole).Inc()
|
|
log.Info("DataNode Register Finished")
|
|
// Start liveness check
|
|
node.session.LivenessCheck(node.ctx, func() {
|
|
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID))
|
|
os.Exit(1)
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (node *DataNode) initSession() error {
|
|
node.session = sessionutil.NewSession(node.ctx)
|
|
if node.session == nil {
|
|
return errors.New("failed to initialize session")
|
|
}
|
|
node.session.Init(typeutil.DataNodeRole, node.address, false, true)
|
|
sessionutil.SaveServerInfo(typeutil.DataNodeRole, node.session.ServerID)
|
|
return nil
|
|
}
|
|
|
|
func (node *DataNode) GetNodeID() int64 {
|
|
if node.session != nil {
|
|
return node.session.ServerID
|
|
}
|
|
return paramtable.GetNodeID()
|
|
}
|
|
|
|
func (node *DataNode) Init() error {
|
|
var initError error
|
|
node.initOnce.Do(func() {
|
|
node.registerMetricsRequest()
|
|
log.Ctx(node.ctx).Info("DataNode server initializing")
|
|
if err := node.initSession(); err != nil {
|
|
log.Error("DataNode server init session failed", zap.Error(err))
|
|
initError = err
|
|
return
|
|
}
|
|
|
|
serverID := node.GetNodeID()
|
|
log := log.Ctx(node.ctx).With(zap.String("role", typeutil.DataNodeRole), zap.Int64("nodeID", serverID))
|
|
log.Info("DataNode server init succeeded")
|
|
|
|
syncMgr := syncmgr.NewSyncManager(nil)
|
|
node.syncMgr = syncMgr
|
|
|
|
node.importTaskMgr = importv2.NewTaskManager()
|
|
node.importScheduler = importv2.NewScheduler(node.importTaskMgr)
|
|
|
|
index.InitSegcore()
|
|
|
|
log.Info("init datanode done", zap.String("Address", node.address))
|
|
})
|
|
return initError
|
|
}
|
|
|
|
func (node *DataNode) registerMetricsRequest() {
|
|
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SystemInfoMetrics,
|
|
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
|
return node.getSystemInfoMetrics(ctx, req)
|
|
})
|
|
|
|
node.metricsRequest.RegisterMetricsRequest(metricsinfo.SyncTaskKey,
|
|
func(ctx context.Context, req *milvuspb.GetMetricsRequest, jsonReq gjson.Result) (string, error) {
|
|
return node.syncMgr.TaskStatsJSON(), nil
|
|
})
|
|
log.Ctx(node.ctx).Info("register metrics actions finished")
|
|
}
|
|
|
|
// Start will update DataNode state to HEALTHY
|
|
func (node *DataNode) Start() error {
|
|
log := log.Ctx(node.ctx)
|
|
var startErr error
|
|
node.startOnce.Do(func() {
|
|
go node.compactionExecutor.Start(node.ctx)
|
|
|
|
go node.importScheduler.Start()
|
|
|
|
err := node.taskScheduler.Start()
|
|
if err != nil {
|
|
startErr = err
|
|
return
|
|
}
|
|
|
|
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
|
log.Info("datanode start successfully")
|
|
})
|
|
return startErr
|
|
}
|
|
|
|
// UpdateStateCode updates datanode's state code
|
|
func (node *DataNode) UpdateStateCode(code commonpb.StateCode) {
|
|
node.lifetime.SetState(code)
|
|
}
|
|
|
|
// GetStateCode return datanode's state code
|
|
func (node *DataNode) GetStateCode() commonpb.StateCode {
|
|
return node.lifetime.GetState()
|
|
}
|
|
|
|
func (node *DataNode) isHealthy() bool {
|
|
return node.GetStateCode() == commonpb.StateCode_Healthy
|
|
}
|
|
|
|
// ReadyToFlush tells whether DataNode is ready for flushing
|
|
func (node *DataNode) ReadyToFlush() error {
|
|
if !node.isHealthy() {
|
|
return errors.New("DataNode not in HEALTHY state")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stop will release DataNode resources and shutdown datanode
|
|
func (node *DataNode) Stop() error {
|
|
node.stopOnce.Do(func() {
|
|
// https://github.com/milvus-io/milvus/issues/12282
|
|
node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
|
node.lifetime.Wait()
|
|
|
|
if node.syncMgr != nil {
|
|
err := node.syncMgr.Close()
|
|
if err != nil {
|
|
log.Error("sync manager close failed", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if node.closer != nil {
|
|
node.closer.Close()
|
|
}
|
|
|
|
if node.session != nil {
|
|
node.session.Stop()
|
|
}
|
|
|
|
if node.importScheduler != nil {
|
|
node.importScheduler.Close()
|
|
}
|
|
|
|
// cleanup all running tasks
|
|
node.taskManager.DeleteAllTasks()
|
|
|
|
if node.taskScheduler != nil {
|
|
node.taskScheduler.Close()
|
|
}
|
|
|
|
index.CloseSegcore()
|
|
|
|
// Delay the cancellation of ctx to ensure that the session is automatically recycled after closed the flow graph
|
|
node.cancel()
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// SetSession to fix data race
|
|
func (node *DataNode) SetSession(session *sessionutil.Session) {
|
|
node.sessionMu.Lock()
|
|
defer node.sessionMu.Unlock()
|
|
node.session = session
|
|
}
|
|
|
|
// GetSession to fix data race
|
|
func (node *DataNode) GetSession() *sessionutil.Session {
|
|
node.sessionMu.Lock()
|
|
defer node.sessionMu.Unlock()
|
|
return node.session
|
|
}
|