// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package paramtable import ( "fmt" "strconv" "time" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/keepalive" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" ) const ( // DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side. DefaultServerMaxSendSize = 512 * 1024 * 1024 // DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side. DefaultServerMaxRecvSize = 256 * 1024 * 1024 // DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side. DefaultClientMaxSendSize = 256 * 1024 * 1024 // DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side. DefaultClientMaxRecvSize = 512 * 1024 * 1024 // DefaultLogLevel defines the log level of grpc DefaultLogLevel = "WARNING" // Grpc Timeout related configs DefaultDialTimeout = 200 DefaultKeepAliveTime = 10000 DefaultKeepAliveTimeout = 20000 // Grpc retry policy DefaultMaxAttempts = 10 DefaultInitialBackoff float64 = 0.2 DefaultMaxBackoff float64 = 10 DefaultCompressionEnabled bool = false ProxyInternalPort = 19529 ProxyExternalPort = 19530 ) // ///////////////////////////////////////////////////////////////////////////// // --- grpc --- type grpcConfig struct { Domain string `refreshable:"false"` IP string `refreshable:"false"` TLSMode ParamItem `refreshable:"false"` IPItem ParamItem `refreshable:"false"` Port ParamItem `refreshable:"false"` InternalPort ParamItem `refreshable:"false"` ServerPemPath ParamItem `refreshable:"false"` ServerKeyPath ParamItem `refreshable:"false"` CaPemPath ParamItem `refreshable:"false"` } func (p *grpcConfig) init(domain string, base *BaseTable) { p.Domain = domain p.IPItem = ParamItem{ Key: p.Domain + ".ip", Version: "2.3.3", Doc: "TCP/IP address of " + p.Domain + ". If not specified, use the first unicastable address", Export: true, } p.IPItem.Init(base.mgr) p.IP = funcutil.GetIP(p.IPItem.GetValue()) p.Port = ParamItem{ Key: p.Domain + ".port", Version: "2.0.0", DefaultValue: strconv.FormatInt(ProxyExternalPort, 10), Doc: "TCP port of " + p.Domain, Export: true, } p.Port.Init(base.mgr) p.InternalPort = ParamItem{ Key: p.Domain + ".internalPort", Version: "2.0.0", DefaultValue: strconv.FormatInt(ProxyInternalPort, 10), } p.InternalPort.Init(base.mgr) p.TLSMode = ParamItem{ Key: "common.security.tlsMode", Version: "2.0.0", DefaultValue: "0", Export: true, } p.TLSMode.Init(base.mgr) p.ServerPemPath = ParamItem{ Key: "tls.serverPemPath", Version: "2.0.0", Export: true, } p.ServerPemPath.Init(base.mgr) p.ServerKeyPath = ParamItem{ Key: "tls.serverKeyPath", Version: "2.0.0", Export: true, } p.ServerKeyPath.Init(base.mgr) p.CaPemPath = ParamItem{ Key: "tls.caPemPath", Version: "2.0.0", Export: true, } p.CaPemPath.Init(base.mgr) } // GetAddress return grpc address func (p *grpcConfig) GetAddress() string { return p.IP + ":" + p.Port.GetValue() } func (p *grpcConfig) GetInternalAddress() string { return p.IP + ":" + p.InternalPort.GetValue() } // GrpcServerConfig is configuration for grpc server. type GrpcServerConfig struct { grpcConfig ServerMaxSendSize ParamItem `refreshable:"false"` ServerMaxRecvSize ParamItem `refreshable:"false"` GracefulStopTimeout ParamItem `refreshable:"true"` } func (p *GrpcServerConfig) Init(domain string, base *BaseTable) { p.grpcConfig.init(domain, base) maxSendSize := strconv.FormatInt(DefaultServerMaxSendSize, 10) p.ServerMaxSendSize = ParamItem{ Key: p.Domain + ".grpc.serverMaxSendSize", DefaultValue: maxSendSize, FallbackKeys: []string{"grpc.serverMaxSendSize"}, Formatter: func(v string) string { if v == "" { return maxSendSize } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.serverMaxSendSize, set to default", zap.String("role", p.Domain), zap.String("grpc.serverMaxSendSize", v), zap.Error(err)) return maxSendSize } return v }, Doc: "The maximum size of each RPC request that the " + domain + " can send, unit: byte", Export: true, } p.ServerMaxSendSize.Init(base.mgr) maxRecvSize := strconv.FormatInt(DefaultServerMaxRecvSize, 10) p.ServerMaxRecvSize = ParamItem{ Key: p.Domain + ".grpc.serverMaxRecvSize", DefaultValue: maxRecvSize, FallbackKeys: []string{"grpc.serverMaxRecvSize"}, Formatter: func(v string) string { if v == "" { return maxRecvSize } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default", zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", v), zap.Error(err)) return maxRecvSize } return v }, Doc: "The maximum size of each RPC request that the " + domain + " can receive, unit: byte", Export: true, } p.ServerMaxRecvSize.Init(base.mgr) p.GracefulStopTimeout = ParamItem{ Key: "grpc.gracefulStopTimeout", Version: "2.3.1", DefaultValue: "10", Doc: "second, time to wait graceful stop finish", Export: true, } p.GracefulStopTimeout.Init(base.mgr) } // GrpcClientConfig is configuration for grpc client. type GrpcClientConfig struct { grpcConfig CompressionEnabled ParamItem `refreshable:"false"` ClientMaxSendSize ParamItem `refreshable:"false"` ClientMaxRecvSize ParamItem `refreshable:"false"` DialTimeout ParamItem `refreshable:"false"` KeepAliveTime ParamItem `refreshable:"false"` KeepAliveTimeout ParamItem `refreshable:"false"` MaxAttempts ParamItem `refreshable:"false"` InitialBackoff ParamItem `refreshable:"false"` MaxBackoff ParamItem `refreshable:"false"` BackoffMultiplier ParamItem `refreshable:"false"` MinResetInterval ParamItem `refreshable:"false"` MaxCancelError ParamItem `refreshable:"false"` MinSessionCheckInterval ParamItem `refreshable:"false"` } func (p *GrpcClientConfig) Init(domain string, base *BaseTable) { p.grpcConfig.init(domain, base) maxSendSize := strconv.FormatInt(DefaultClientMaxSendSize, 10) p.ClientMaxSendSize = ParamItem{ Key: p.Domain + ".grpc.clientMaxSendSize", DefaultValue: maxSendSize, FallbackKeys: []string{"grpc.clientMaxSendSize"}, Formatter: func(v string) string { if v == "" { return maxSendSize } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.clientMaxSendSize, set to default", zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", v), zap.Error(err)) return maxSendSize } return v }, Doc: "The maximum size of each RPC request that the clients on " + domain + " can send, unit: byte", Export: true, } p.ClientMaxSendSize.Init(base.mgr) maxRecvSize := strconv.FormatInt(DefaultClientMaxRecvSize, 10) p.ClientMaxRecvSize = ParamItem{ Key: p.Domain + ".grpc.clientMaxRecvSize", DefaultValue: maxRecvSize, FallbackKeys: []string{"grpc.clientMaxRecvSize"}, Formatter: func(v string) string { if v == "" { return maxRecvSize } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default", zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", v), zap.Error(err)) return maxRecvSize } return v }, Doc: "The maximum size of each RPC request that the clients on " + domain + " can receive, unit: byte", Export: true, } p.ClientMaxRecvSize.Init(base.mgr) dialTimeout := strconv.FormatInt(DefaultDialTimeout, 10) p.DialTimeout = ParamItem{ Key: "grpc.client.dialTimeout", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return dialTimeout } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.dialTimeout, set to default", zap.String("role", p.Domain), zap.String("grpc.client.dialTimeout", v)) return dialTimeout } return v }, Export: true, } p.DialTimeout.Init(base.mgr) keepAliveTimeout := strconv.FormatInt(DefaultKeepAliveTimeout, 10) p.KeepAliveTimeout = ParamItem{ Key: "grpc.client.keepAliveTimeout", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return keepAliveTimeout } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.keepAliveTimeout, set to default", zap.String("role", p.Domain), zap.String("grpc.client.keepAliveTimeout", v)) return keepAliveTimeout } return v }, Export: true, } p.KeepAliveTimeout.Init(base.mgr) keepAliveTime := strconv.FormatInt(DefaultKeepAliveTime, 10) p.KeepAliveTime = ParamItem{ Key: "grpc.client.keepAliveTime", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return keepAliveTime } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.keepAliveTime, set to default", zap.String("role", p.Domain), zap.String("grpc.client.keepAliveTime", v)) return keepAliveTime } return v }, Export: true, } p.KeepAliveTime.Init(base.mgr) maxAttempts := strconv.FormatInt(DefaultMaxAttempts, 10) p.MaxAttempts = ParamItem{ Key: "grpc.client.maxMaxAttempts", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return maxAttempts } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.maxMaxAttempts, set to default", zap.String("role", p.Domain), zap.String("grpc.client.maxMaxAttempts", v)) return maxAttempts } return v }, Export: true, } p.MaxAttempts.Init(base.mgr) initialBackoff := fmt.Sprintf("%f", DefaultInitialBackoff) p.InitialBackoff = ParamItem{ Key: "grpc.client.initialBackoff", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return initialBackoff } _, err := strconv.ParseFloat(v, 64) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.initialBackoff, set to default", zap.String("role", p.Domain), zap.String("grpc.client.initialBackoff", v)) return initialBackoff } return v }, Export: true, } p.InitialBackoff.Init(base.mgr) maxBackoff := fmt.Sprintf("%f", DefaultMaxBackoff) p.MaxBackoff = ParamItem{ Key: "grpc.client.maxBackoff", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return maxBackoff } _, err := strconv.ParseFloat(v, 64) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.maxBackoff, set to default", zap.String("role", p.Domain), zap.String("grpc.client.maxBackoff", v)) return maxBackoff } return v }, Export: true, } p.MaxBackoff.Init(base.mgr) p.BackoffMultiplier = ParamItem{ Key: "grpc.client.backoffMultiplier", Version: "2.5.0", DefaultValue: "2.0", Export: true, } p.BackoffMultiplier.Init(base.mgr) compressionEnabled := fmt.Sprintf("%t", DefaultCompressionEnabled) p.CompressionEnabled = ParamItem{ Key: "grpc.client.compressionEnabled", Version: "2.0.0", Formatter: func(v string) string { if v == "" { return compressionEnabled } _, err := strconv.ParseBool(v) if err != nil { log.Warn("Failed to convert int when parsing grpc.client.compressionEnabled, set to default", zap.String("role", p.Domain), zap.String("grpc.client.compressionEnabled", v)) return compressionEnabled } return v }, Export: true, } p.CompressionEnabled.Init(base.mgr) p.MinResetInterval = ParamItem{ Key: "grpc.client.minResetInterval", DefaultValue: "1000", Formatter: func(v string) string { if v == "" { return "1000" } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.client.minResetInterval, set to default", zap.String("role", p.Domain), zap.String("grpc.client.minResetInterval", v), zap.Error(err)) return "1000" } return v }, Export: true, } p.MinResetInterval.Init(base.mgr) p.MinSessionCheckInterval = ParamItem{ Key: "grpc.client.minSessionCheckInterval", DefaultValue: "200", Formatter: func(v string) string { if v == "" { return "200" } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.client.minSessionCheckInterval, set to default", zap.String("role", p.Domain), zap.String("grpc.client.minSessionCheckInterval", v), zap.Error(err)) return "200" } return v }, Export: true, } p.MinSessionCheckInterval.Init(base.mgr) p.MaxCancelError = ParamItem{ Key: "grpc.client.maxCancelError", DefaultValue: "32", Formatter: func(v string) string { if v == "" { return "32" } _, err := strconv.Atoi(v) if err != nil { log.Warn("Failed to parse grpc.client.maxCancelError, set to default", zap.String("role", p.Domain), zap.String("grpc.client.maxCancelError", v), zap.Error(err)) return "32" } return v }, Export: true, } p.MaxCancelError.Init(base.mgr) } // GetDialOptionsFromConfig returns grpc dial options from config. func (p *GrpcClientConfig) GetDialOptionsFromConfig() []grpc.DialOption { compress := "" if p.CompressionEnabled.GetAsBool() { compress = "zstd" } return []grpc.DialOption{ grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(p.ClientMaxRecvSize.GetAsInt()), grpc.MaxCallSendMsgSize(p.ClientMaxSendSize.GetAsInt()), grpc.UseCompressor(compress), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: p.KeepAliveTime.GetAsDuration(time.Millisecond), Timeout: p.KeepAliveTimeout.GetAsDuration(time.Millisecond), PermitWithoutStream: true, }), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.Config{ BaseDelay: 100 * time.Millisecond, Multiplier: 1.6, Jitter: 0.2, MaxDelay: 3 * time.Second, }, MinConnectTimeout: p.DialTimeout.GetAsDuration(time.Millisecond), }), } } // GetDefaultRetryPolicy returns default grpc retry policy. func (p *GrpcClientConfig) GetDefaultRetryPolicy() map[string]interface{} { return map[string]interface{}{ "maxAttempts": p.MaxAttempts.GetAsInt(), "initialBackoff": fmt.Sprintf("%fs", p.InitialBackoff.GetAsFloat()), "maxBackoff": fmt.Sprintf("%fs", p.MaxBackoff.GetAsFloat()), "backoffMultiplier": p.BackoffMultiplier.GetAsFloat(), } } type InternalTLSConfig struct { InternalTLSEnabled ParamItem `refreshable:"false"` InternalTLSServerPemPath ParamItem `refreshable:"false"` InternalTLSServerKeyPath ParamItem `refreshable:"false"` InternalTLSCaPemPath ParamItem `refreshable:"false"` InternalTLSSNI ParamItem `refreshable:"false"` } func (p *InternalTLSConfig) Init(base *BaseTable) { p.InternalTLSEnabled = ParamItem{ Key: "common.security.internaltlsEnabled", Version: "2.5.0", DefaultValue: "false", Export: true, } p.InternalTLSEnabled.Init(base.mgr) p.InternalTLSServerPemPath = ParamItem{ Key: "internaltls.serverPemPath", Version: "2.5.0", Export: true, } p.InternalTLSServerPemPath.Init(base.mgr) p.InternalTLSServerKeyPath = ParamItem{ Key: "internaltls.serverKeyPath", Version: "2.5.0", Export: true, } p.InternalTLSServerKeyPath.Init(base.mgr) p.InternalTLSCaPemPath = ParamItem{ Key: "internaltls.caPemPath", Version: "2.5.0", Export: true, } p.InternalTLSCaPemPath.Init(base.mgr) p.InternalTLSSNI = ParamItem{ Key: "internaltls.sni", Version: "2.5.0", Export: true, Doc: "The server name indication (SNI) for internal TLS, should be the same as the name provided by the certificates ref: https://en.wikipedia.org/wiki/Server_Name_Indication", } p.InternalTLSSNI.Init(base.mgr) }