mirror of https://github.com/milvus-io/milvus.git
585 lines
17 KiB
Go
585 lines
17 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package paramtable
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/backoff"
|
|
"google.golang.org/grpc/keepalive"
|
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
|
)
|
|
|
|
const (
|
|
// DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side.
|
|
DefaultServerMaxSendSize = 512 * 1024 * 1024
|
|
|
|
// DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side.
|
|
DefaultServerMaxRecvSize = 256 * 1024 * 1024
|
|
|
|
// DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side.
|
|
DefaultClientMaxSendSize = 256 * 1024 * 1024
|
|
|
|
// DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side.
|
|
DefaultClientMaxRecvSize = 512 * 1024 * 1024
|
|
|
|
// DefaultLogLevel defines the log level of grpc
|
|
DefaultLogLevel = "WARNING"
|
|
|
|
// Grpc Timeout related configs
|
|
DefaultDialTimeout = 200
|
|
DefaultKeepAliveTime = 10000
|
|
DefaultKeepAliveTimeout = 20000
|
|
|
|
// Grpc retry policy
|
|
DefaultMaxAttempts = 10
|
|
DefaultInitialBackoff float64 = 0.2
|
|
DefaultMaxBackoff float64 = 10
|
|
DefaultCompressionEnabled bool = false
|
|
|
|
ProxyInternalPort = 19529
|
|
ProxyExternalPort = 19530
|
|
)
|
|
|
|
// /////////////////////////////////////////////////////////////////////////////
|
|
// --- grpc ---
|
|
type grpcConfig struct {
|
|
Domain string `refreshable:"false"`
|
|
IP string `refreshable:"false"`
|
|
TLSMode ParamItem `refreshable:"false"`
|
|
IPItem ParamItem `refreshable:"false"`
|
|
Port ParamItem `refreshable:"false"`
|
|
InternalPort ParamItem `refreshable:"false"`
|
|
ServerPemPath ParamItem `refreshable:"false"`
|
|
ServerKeyPath ParamItem `refreshable:"false"`
|
|
CaPemPath ParamItem `refreshable:"false"`
|
|
}
|
|
|
|
func (p *grpcConfig) init(domain string, base *BaseTable) {
|
|
p.Domain = domain
|
|
p.IPItem = ParamItem{
|
|
Key: p.Domain + ".ip",
|
|
Version: "2.3.3",
|
|
Doc: "TCP/IP address of " + p.Domain + ". If not specified, use the first unicastable address",
|
|
Export: true,
|
|
}
|
|
p.IPItem.Init(base.mgr)
|
|
p.IP = funcutil.GetIP(p.IPItem.GetValue())
|
|
|
|
p.Port = ParamItem{
|
|
Key: p.Domain + ".port",
|
|
Version: "2.0.0",
|
|
DefaultValue: strconv.FormatInt(ProxyExternalPort, 10),
|
|
Doc: "TCP port of " + p.Domain,
|
|
Export: true,
|
|
}
|
|
p.Port.Init(base.mgr)
|
|
|
|
p.InternalPort = ParamItem{
|
|
Key: p.Domain + ".internalPort",
|
|
Version: "2.0.0",
|
|
DefaultValue: strconv.FormatInt(ProxyInternalPort, 10),
|
|
}
|
|
p.InternalPort.Init(base.mgr)
|
|
|
|
p.TLSMode = ParamItem{
|
|
Key: "common.security.tlsMode",
|
|
Version: "2.0.0",
|
|
DefaultValue: "0",
|
|
Export: true,
|
|
}
|
|
p.TLSMode.Init(base.mgr)
|
|
|
|
p.ServerPemPath = ParamItem{
|
|
Key: "tls.serverPemPath",
|
|
Version: "2.0.0",
|
|
Export: true,
|
|
}
|
|
p.ServerPemPath.Init(base.mgr)
|
|
|
|
p.ServerKeyPath = ParamItem{
|
|
Key: "tls.serverKeyPath",
|
|
Version: "2.0.0",
|
|
Export: true,
|
|
}
|
|
p.ServerKeyPath.Init(base.mgr)
|
|
|
|
p.CaPemPath = ParamItem{
|
|
Key: "tls.caPemPath",
|
|
Version: "2.0.0",
|
|
Export: true,
|
|
}
|
|
p.CaPemPath.Init(base.mgr)
|
|
}
|
|
|
|
// GetAddress return grpc address
|
|
func (p *grpcConfig) GetAddress() string {
|
|
return p.IP + ":" + p.Port.GetValue()
|
|
}
|
|
|
|
func (p *grpcConfig) GetInternalAddress() string {
|
|
return p.IP + ":" + p.InternalPort.GetValue()
|
|
}
|
|
|
|
// GrpcServerConfig is configuration for grpc server.
|
|
type GrpcServerConfig struct {
|
|
grpcConfig
|
|
|
|
ServerMaxSendSize ParamItem `refreshable:"false"`
|
|
ServerMaxRecvSize ParamItem `refreshable:"false"`
|
|
|
|
GracefulStopTimeout ParamItem `refreshable:"true"`
|
|
}
|
|
|
|
func (p *GrpcServerConfig) Init(domain string, base *BaseTable) {
|
|
p.grpcConfig.init(domain, base)
|
|
|
|
maxSendSize := strconv.FormatInt(DefaultServerMaxSendSize, 10)
|
|
p.ServerMaxSendSize = ParamItem{
|
|
Key: p.Domain + ".grpc.serverMaxSendSize",
|
|
DefaultValue: maxSendSize,
|
|
FallbackKeys: []string{"grpc.serverMaxSendSize"},
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxSendSize
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.serverMaxSendSize, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.serverMaxSendSize", v),
|
|
zap.Error(err))
|
|
return maxSendSize
|
|
}
|
|
return v
|
|
},
|
|
Doc: "The maximum size of each RPC request that the " + domain + " can send, unit: byte",
|
|
Export: true,
|
|
}
|
|
p.ServerMaxSendSize.Init(base.mgr)
|
|
|
|
maxRecvSize := strconv.FormatInt(DefaultServerMaxRecvSize, 10)
|
|
p.ServerMaxRecvSize = ParamItem{
|
|
Key: p.Domain + ".grpc.serverMaxRecvSize",
|
|
DefaultValue: maxRecvSize,
|
|
FallbackKeys: []string{"grpc.serverMaxRecvSize"},
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxRecvSize
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", v),
|
|
zap.Error(err))
|
|
return maxRecvSize
|
|
}
|
|
return v
|
|
},
|
|
Doc: "The maximum size of each RPC request that the " + domain + " can receive, unit: byte",
|
|
Export: true,
|
|
}
|
|
p.ServerMaxRecvSize.Init(base.mgr)
|
|
|
|
p.GracefulStopTimeout = ParamItem{
|
|
Key: "grpc.gracefulStopTimeout",
|
|
Version: "2.3.1",
|
|
DefaultValue: "10",
|
|
Doc: "second, time to wait graceful stop finish",
|
|
Export: true,
|
|
}
|
|
p.GracefulStopTimeout.Init(base.mgr)
|
|
}
|
|
|
|
// GrpcClientConfig is configuration for grpc client.
|
|
type GrpcClientConfig struct {
|
|
grpcConfig
|
|
|
|
CompressionEnabled ParamItem `refreshable:"false"`
|
|
|
|
ClientMaxSendSize ParamItem `refreshable:"false"`
|
|
ClientMaxRecvSize ParamItem `refreshable:"false"`
|
|
|
|
DialTimeout ParamItem `refreshable:"false"`
|
|
KeepAliveTime ParamItem `refreshable:"false"`
|
|
KeepAliveTimeout ParamItem `refreshable:"false"`
|
|
|
|
MaxAttempts ParamItem `refreshable:"false"`
|
|
InitialBackoff ParamItem `refreshable:"false"`
|
|
MaxBackoff ParamItem `refreshable:"false"`
|
|
BackoffMultiplier ParamItem `refreshable:"false"`
|
|
MinResetInterval ParamItem `refreshable:"false"`
|
|
MaxCancelError ParamItem `refreshable:"false"`
|
|
MinSessionCheckInterval ParamItem `refreshable:"false"`
|
|
}
|
|
|
|
func (p *GrpcClientConfig) Init(domain string, base *BaseTable) {
|
|
p.grpcConfig.init(domain, base)
|
|
|
|
maxSendSize := strconv.FormatInt(DefaultClientMaxSendSize, 10)
|
|
p.ClientMaxSendSize = ParamItem{
|
|
Key: p.Domain + ".grpc.clientMaxSendSize",
|
|
DefaultValue: maxSendSize,
|
|
FallbackKeys: []string{"grpc.clientMaxSendSize"},
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxSendSize
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.clientMaxSendSize, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", v),
|
|
zap.Error(err))
|
|
return maxSendSize
|
|
}
|
|
return v
|
|
},
|
|
Doc: "The maximum size of each RPC request that the clients on " + domain + " can send, unit: byte",
|
|
Export: true,
|
|
}
|
|
p.ClientMaxSendSize.Init(base.mgr)
|
|
|
|
maxRecvSize := strconv.FormatInt(DefaultClientMaxRecvSize, 10)
|
|
p.ClientMaxRecvSize = ParamItem{
|
|
Key: p.Domain + ".grpc.clientMaxRecvSize",
|
|
DefaultValue: maxRecvSize,
|
|
FallbackKeys: []string{"grpc.clientMaxRecvSize"},
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxRecvSize
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", v),
|
|
zap.Error(err))
|
|
return maxRecvSize
|
|
}
|
|
return v
|
|
},
|
|
Doc: "The maximum size of each RPC request that the clients on " + domain + " can receive, unit: byte",
|
|
Export: true,
|
|
}
|
|
p.ClientMaxRecvSize.Init(base.mgr)
|
|
|
|
dialTimeout := strconv.FormatInt(DefaultDialTimeout, 10)
|
|
p.DialTimeout = ParamItem{
|
|
Key: "grpc.client.dialTimeout",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return dialTimeout
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.dialTimeout, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.dialTimeout", v))
|
|
return dialTimeout
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.DialTimeout.Init(base.mgr)
|
|
|
|
keepAliveTimeout := strconv.FormatInt(DefaultKeepAliveTimeout, 10)
|
|
p.KeepAliveTimeout = ParamItem{
|
|
Key: "grpc.client.keepAliveTimeout",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return keepAliveTimeout
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.keepAliveTimeout, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.keepAliveTimeout", v))
|
|
return keepAliveTimeout
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.KeepAliveTimeout.Init(base.mgr)
|
|
|
|
keepAliveTime := strconv.FormatInt(DefaultKeepAliveTime, 10)
|
|
p.KeepAliveTime = ParamItem{
|
|
Key: "grpc.client.keepAliveTime",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return keepAliveTime
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.keepAliveTime, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.keepAliveTime", v))
|
|
return keepAliveTime
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.KeepAliveTime.Init(base.mgr)
|
|
|
|
maxAttempts := strconv.FormatInt(DefaultMaxAttempts, 10)
|
|
p.MaxAttempts = ParamItem{
|
|
Key: "grpc.client.maxMaxAttempts",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxAttempts
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.maxMaxAttempts, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.maxMaxAttempts", v))
|
|
return maxAttempts
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.MaxAttempts.Init(base.mgr)
|
|
|
|
initialBackoff := fmt.Sprintf("%f", DefaultInitialBackoff)
|
|
p.InitialBackoff = ParamItem{
|
|
Key: "grpc.client.initialBackoff",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return initialBackoff
|
|
}
|
|
_, err := strconv.ParseFloat(v, 64)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.initialBackoff, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.initialBackoff", v))
|
|
return initialBackoff
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.InitialBackoff.Init(base.mgr)
|
|
|
|
maxBackoff := fmt.Sprintf("%f", DefaultMaxBackoff)
|
|
p.MaxBackoff = ParamItem{
|
|
Key: "grpc.client.maxBackoff",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return maxBackoff
|
|
}
|
|
_, err := strconv.ParseFloat(v, 64)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.maxBackoff, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.maxBackoff", v))
|
|
return maxBackoff
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.MaxBackoff.Init(base.mgr)
|
|
|
|
p.BackoffMultiplier = ParamItem{
|
|
Key: "grpc.client.backoffMultiplier",
|
|
Version: "2.5.0",
|
|
DefaultValue: "2.0",
|
|
Export: true,
|
|
}
|
|
p.BackoffMultiplier.Init(base.mgr)
|
|
|
|
compressionEnabled := fmt.Sprintf("%t", DefaultCompressionEnabled)
|
|
p.CompressionEnabled = ParamItem{
|
|
Key: "grpc.client.compressionEnabled",
|
|
Version: "2.0.0",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return compressionEnabled
|
|
}
|
|
_, err := strconv.ParseBool(v)
|
|
if err != nil {
|
|
log.Warn("Failed to convert int when parsing grpc.client.compressionEnabled, set to default",
|
|
zap.String("role", p.Domain),
|
|
zap.String("grpc.client.compressionEnabled", v))
|
|
return compressionEnabled
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.CompressionEnabled.Init(base.mgr)
|
|
|
|
p.MinResetInterval = ParamItem{
|
|
Key: "grpc.client.minResetInterval",
|
|
DefaultValue: "1000",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return "1000"
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.client.minResetInterval, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.client.minResetInterval", v),
|
|
zap.Error(err))
|
|
return "1000"
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.MinResetInterval.Init(base.mgr)
|
|
|
|
p.MinSessionCheckInterval = ParamItem{
|
|
Key: "grpc.client.minSessionCheckInterval",
|
|
DefaultValue: "200",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return "200"
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.client.minSessionCheckInterval, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.client.minSessionCheckInterval", v),
|
|
zap.Error(err))
|
|
return "200"
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.MinSessionCheckInterval.Init(base.mgr)
|
|
|
|
p.MaxCancelError = ParamItem{
|
|
Key: "grpc.client.maxCancelError",
|
|
DefaultValue: "32",
|
|
Formatter: func(v string) string {
|
|
if v == "" {
|
|
return "32"
|
|
}
|
|
_, err := strconv.Atoi(v)
|
|
if err != nil {
|
|
log.Warn("Failed to parse grpc.client.maxCancelError, set to default",
|
|
zap.String("role", p.Domain), zap.String("grpc.client.maxCancelError", v),
|
|
zap.Error(err))
|
|
return "32"
|
|
}
|
|
return v
|
|
},
|
|
Export: true,
|
|
}
|
|
p.MaxCancelError.Init(base.mgr)
|
|
}
|
|
|
|
// GetDialOptionsFromConfig returns grpc dial options from config.
|
|
func (p *GrpcClientConfig) GetDialOptionsFromConfig() []grpc.DialOption {
|
|
compress := ""
|
|
if p.CompressionEnabled.GetAsBool() {
|
|
compress = "zstd"
|
|
}
|
|
return []grpc.DialOption{
|
|
grpc.WithDefaultCallOptions(
|
|
grpc.MaxCallRecvMsgSize(p.ClientMaxRecvSize.GetAsInt()),
|
|
grpc.MaxCallSendMsgSize(p.ClientMaxSendSize.GetAsInt()),
|
|
grpc.UseCompressor(compress),
|
|
),
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: p.KeepAliveTime.GetAsDuration(time.Millisecond),
|
|
Timeout: p.KeepAliveTimeout.GetAsDuration(time.Millisecond),
|
|
PermitWithoutStream: true,
|
|
}),
|
|
grpc.WithConnectParams(grpc.ConnectParams{
|
|
Backoff: backoff.Config{
|
|
BaseDelay: 100 * time.Millisecond,
|
|
Multiplier: 1.6,
|
|
Jitter: 0.2,
|
|
MaxDelay: 3 * time.Second,
|
|
},
|
|
MinConnectTimeout: p.DialTimeout.GetAsDuration(time.Millisecond),
|
|
}),
|
|
}
|
|
}
|
|
|
|
// GetDefaultRetryPolicy returns default grpc retry policy.
|
|
func (p *GrpcClientConfig) GetDefaultRetryPolicy() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"maxAttempts": p.MaxAttempts.GetAsInt(),
|
|
"initialBackoff": fmt.Sprintf("%fs", p.InitialBackoff.GetAsFloat()),
|
|
"maxBackoff": fmt.Sprintf("%fs", p.MaxBackoff.GetAsFloat()),
|
|
"backoffMultiplier": p.BackoffMultiplier.GetAsFloat(),
|
|
}
|
|
}
|
|
|
|
type InternalTLSConfig struct {
|
|
InternalTLSEnabled ParamItem `refreshable:"false"`
|
|
InternalTLSServerPemPath ParamItem `refreshable:"false"`
|
|
InternalTLSServerKeyPath ParamItem `refreshable:"false"`
|
|
InternalTLSCaPemPath ParamItem `refreshable:"false"`
|
|
InternalTLSSNI ParamItem `refreshable:"false"`
|
|
}
|
|
|
|
func (p *InternalTLSConfig) Init(base *BaseTable) {
|
|
p.InternalTLSEnabled = ParamItem{
|
|
Key: "common.security.internaltlsEnabled",
|
|
Version: "2.5.0",
|
|
DefaultValue: "false",
|
|
Export: true,
|
|
}
|
|
p.InternalTLSEnabled.Init(base.mgr)
|
|
|
|
p.InternalTLSServerPemPath = ParamItem{
|
|
Key: "internaltls.serverPemPath",
|
|
Version: "2.5.0",
|
|
Export: true,
|
|
}
|
|
p.InternalTLSServerPemPath.Init(base.mgr)
|
|
|
|
p.InternalTLSServerKeyPath = ParamItem{
|
|
Key: "internaltls.serverKeyPath",
|
|
Version: "2.5.0",
|
|
Export: true,
|
|
}
|
|
p.InternalTLSServerKeyPath.Init(base.mgr)
|
|
|
|
p.InternalTLSCaPemPath = ParamItem{
|
|
Key: "internaltls.caPemPath",
|
|
Version: "2.5.0",
|
|
Export: true,
|
|
}
|
|
p.InternalTLSCaPemPath.Init(base.mgr)
|
|
|
|
p.InternalTLSSNI = ParamItem{
|
|
Key: "internaltls.sni",
|
|
Version: "2.5.0",
|
|
Export: true,
|
|
Doc: "The server name indication (SNI) for internal TLS, should be the same as the name provided by the certificates ref: https://en.wikipedia.org/wiki/Server_Name_Indication",
|
|
}
|
|
p.InternalTLSSNI.Init(base.mgr)
|
|
}
|