mirror of https://github.com/milvus-io/milvus.git
Make all logs configurable for standalone/embedded Milvus. (#15926)
Resolves: #15919, #15708 Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>pull/15967/head
parent
4f1d462da3
commit
d5bae6710f
66
cmd/main.go
66
cmd/main.go
|
@ -21,13 +21,16 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
syslog "log"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"go.uber.org/automaxprocs/maxprocs"
|
||||
|
||||
// use auto max procs to set container CPU quota
|
||||
_ "go.uber.org/automaxprocs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/cmd/roles"
|
||||
|
@ -48,19 +51,19 @@ var (
|
|||
GoVersion = "unknown"
|
||||
)
|
||||
|
||||
func printBanner() {
|
||||
fmt.Println()
|
||||
fmt.Println(" __ _________ _ ____ ______ ")
|
||||
fmt.Println(" / |/ / _/ /| | / / / / / __/ ")
|
||||
fmt.Println(" / /|_/ // // /_| |/ / /_/ /\\ \\ ")
|
||||
fmt.Println(" /_/ /_/___/____/___/\\____/___/ ")
|
||||
fmt.Println()
|
||||
fmt.Println("Welcome to use Milvus!")
|
||||
fmt.Println("Version: " + BuildTags)
|
||||
fmt.Println("Built: " + BuildTime)
|
||||
fmt.Println("GitCommit: " + GitCommit)
|
||||
fmt.Println("GoVersion: " + GoVersion)
|
||||
fmt.Println()
|
||||
func printBanner(w io.Writer) {
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintln(w, " __ _________ _ ____ ______ ")
|
||||
fmt.Fprintln(w, " / |/ / _/ /| | / / / / / __/ ")
|
||||
fmt.Fprintln(w, " / /|_/ // // /_| |/ / /_/ /\\ \\ ")
|
||||
fmt.Fprintln(w, " /_/ /_/___/____/___/\\____/___/ ")
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintln(w, "Welcome to use Milvus!")
|
||||
fmt.Fprintln(w, "Version: "+BuildTags)
|
||||
fmt.Fprintln(w, "Built: "+BuildTime)
|
||||
fmt.Fprintln(w, "GitCommit: "+GitCommit)
|
||||
fmt.Fprintln(w, "GoVersion: "+GoVersion)
|
||||
fmt.Fprintln(w)
|
||||
}
|
||||
|
||||
func injectVariablesToEnv() {
|
||||
|
@ -103,20 +106,20 @@ func getPidFileName(serverType string, alias string) string {
|
|||
return filename
|
||||
}
|
||||
|
||||
func createPidFile(filename string, runtimeDir string) (*os.File, error) {
|
||||
func createPidFile(w io.Writer, filename string, runtimeDir string) (*os.File, error) {
|
||||
fileFullName := path.Join(runtimeDir, filename)
|
||||
|
||||
fd, err := os.OpenFile(fileFullName, os.O_CREATE|os.O_RDWR, 0664)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
|
||||
}
|
||||
fmt.Println("open pid file:", fileFullName)
|
||||
fmt.Fprintln(w, "open pid file:", fileFullName)
|
||||
|
||||
err = syscall.Flock(int(fd.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("file %s is locked, error = %w", filename, err)
|
||||
}
|
||||
fmt.Println("lock pid file:", fileFullName)
|
||||
fmt.Fprintln(w, "lock pid file:", fileFullName)
|
||||
|
||||
fd.Truncate(0)
|
||||
_, err = fd.WriteString(fmt.Sprintf("%d", os.Getpid()))
|
||||
|
@ -213,6 +216,15 @@ func main() {
|
|||
flags.BoolVar(&enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
|
||||
flags.BoolVar(&enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")
|
||||
|
||||
// Discard Milvus welcome logs, init logs and maxprocs logs in embedded Milvus.
|
||||
if serverType == typeutil.EmbeddedRole {
|
||||
flags.SetOutput(io.Discard)
|
||||
// Initialize maxprocs while discarding log.
|
||||
maxprocs.Set(maxprocs.Logger(nil))
|
||||
} else {
|
||||
// Initialize maxprocs.
|
||||
maxprocs.Set(maxprocs.Logger(syslog.Printf))
|
||||
}
|
||||
flags.Usage = func() {
|
||||
fmt.Fprintf(flags.Output(), "Usage of %s:\n", os.Args[0])
|
||||
switch {
|
||||
|
@ -253,7 +265,7 @@ func main() {
|
|||
role.EnableIndexCoord = true
|
||||
case typeutil.IndexNodeRole:
|
||||
role.EnableIndexNode = true
|
||||
case typeutil.StandaloneRole:
|
||||
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
|
||||
role.EnableRootCoord = true
|
||||
role.EnableProxy = true
|
||||
role.EnableQueryCoord = true
|
||||
|
@ -273,6 +285,20 @@ func main() {
|
|||
os.Exit(-1)
|
||||
}
|
||||
|
||||
// Setup logger in advance for standalone and embedded Milvus.
|
||||
// Any log from this point on is under control.
|
||||
if serverType == typeutil.StandaloneRole || serverType == typeutil.EmbeddedRole {
|
||||
var params paramtable.BaseTable
|
||||
if serverType == typeutil.EmbeddedRole {
|
||||
params.GlobalInitWithYaml("embedded-milvus.yaml")
|
||||
} else {
|
||||
params.Init()
|
||||
}
|
||||
params.SetLogConfig()
|
||||
params.RoleName = serverType
|
||||
params.SetLogger(0)
|
||||
}
|
||||
|
||||
runtimeDir := "/run/milvus"
|
||||
if err := makeRuntimeDir(runtimeDir); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Set runtime dir at %s failed, set it to /tmp/milvus directory\n", runtimeDir)
|
||||
|
@ -286,9 +312,9 @@ func main() {
|
|||
filename := getPidFileName(serverType, svrAlias)
|
||||
switch command {
|
||||
case "run":
|
||||
printBanner()
|
||||
printBanner(flags.Output())
|
||||
injectVariablesToEnv()
|
||||
fd, err := createPidFile(filename, runtimeDir)
|
||||
fd, err := createPidFile(flags.Output(), filename, runtimeDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,258 @@
|
|||
# Licensed to the LF AI & Data foundation under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Related configuration of etcd, used to store Milvus metadata.
|
||||
etcd:
|
||||
endpoints:
|
||||
- localhost:2379
|
||||
rootPath: by-dev # The root path where data is stored in etcd
|
||||
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
|
||||
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
|
||||
log:
|
||||
# path is one of:
|
||||
# - "default" as os.Stderr,
|
||||
# - "stderr" as os.Stderr,
|
||||
# - "stdout" as os.Stdout,
|
||||
# - file path to append server logs to.
|
||||
path: /tmp/milvus/logs/etcd.log
|
||||
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
|
||||
use:
|
||||
embed: true # Whether to enable embedded Etcd (an in-process EtcdServer).
|
||||
|
||||
# Related configuration of minio, which is responsible for data persistence for Milvus.
|
||||
minio:
|
||||
address: localhost # Address of MinIO/S3
|
||||
port: 9000 # Port of MinIO/S3
|
||||
accessKeyID: minioadmin # accessKeyID of MinIO/S3
|
||||
secretAccessKey: minioadmin # MinIO/S3 encryption string
|
||||
useSSL: false # Access to MinIO/S3 with SSL
|
||||
bucketName: "a-bucket" # Bucket name in MinIO/S3
|
||||
rootPath: files # The root path where the message is stored in MinIO/S3
|
||||
|
||||
# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
|
||||
pulsar:
|
||||
address: localhost # Address of pulsar
|
||||
port: 6650 # Port of pulsar
|
||||
maxMessageSize: 5242880 # 5 * 1024 * 1024 Bytes, Maximum size of each message in pulsar.
|
||||
|
||||
rocksmq:
|
||||
path: /var/lib/milvus/rdb_data # The path where the message is stored in rocksmq
|
||||
rocksmqPageSize: 2147483648 # 2 GB, 2 * 1024 * 1024 * 1024 bytes, The size of each page of messages in rocksmq
|
||||
retentionTimeInMinutes: 10080 # 7 days, 7 * 24 * 60 minutes, The retention time of the message in rocksmq.
|
||||
retentionSizeInMB: 8192 # 8 GB, 8 * 1024 MB, The retention size of the message in rocksmq.
|
||||
|
||||
# Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests
|
||||
rootCoord:
|
||||
address: localhost
|
||||
port: 53100
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32, Maximum data size received by the server
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32, Maximum data size sent by the server
|
||||
clientMaxRecvSize: 104857600 # 100 MB, Maximum data size received by the client
|
||||
clientMaxSendSize: 104857600 # 100 MB, Maximum data size sent by the client
|
||||
|
||||
dmlChannelNum: 256 # The number of dml channels created at system startup
|
||||
maxPartitionNum: 4096 # Maximum number of partitions in a collection
|
||||
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
|
||||
|
||||
# Related configuration of proxy, used to validate client requests and reduce the returned results.
|
||||
proxy:
|
||||
port: 19530
|
||||
|
||||
http:
|
||||
enabled: true # Whether to enable the http server
|
||||
port: 8080 # Whether to enable the http server
|
||||
readTimeout: 30000 # 30000 ms http read timeout
|
||||
writeTimeout: 30000 # 30000 ms http write timeout
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes
|
||||
serverMaxSendSize: 536870912 # 512 MB, 512 * 1024 * 1024 Bytes
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
timeTickInterval: 200 # ms, the interval that proxy synchronize the time tick
|
||||
msgStream:
|
||||
timeTick:
|
||||
bufSize: 512
|
||||
maxNameLength: 255 # Maximum length of name for a collection or alias
|
||||
maxFieldNum: 256 # Maximum number of fields in a collection
|
||||
maxDimension: 32768 # Maximum dimension of a vector
|
||||
maxShardNum: 256 # Maximum number of shards in a collection
|
||||
maxTaskNum: 1024 # max task number of proxy task queue
|
||||
bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop
|
||||
bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop
|
||||
ginLogging: false # Whether to produce gin logs.
|
||||
|
||||
|
||||
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
|
||||
queryCoord:
|
||||
address: localhost
|
||||
port: 19531
|
||||
autoHandoff: true # Enable auto handoff
|
||||
autoBalance: true # Enable auto balance
|
||||
overloadedMemoryThresholdPercentage: 90 # The threshold percentage that memory overload
|
||||
balanceIntervalSeconds: 60
|
||||
memoryUsageMaxDifferencePercentage: 30
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
|
||||
queryNode:
|
||||
cacheSize: 32 # GB, default 32 GB, `cacheSize` is the memory used for caching data for faster query. The `cacheSize` must be less than system memory size.
|
||||
gracefulTime: 0 # Minimum time before the newly inserted data can be searched (in ms)
|
||||
port: 21123
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
stats:
|
||||
publishInterval: 1000 # Interval for querynode to report node information (milliseconds)
|
||||
dataSync:
|
||||
flowGraph:
|
||||
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
|
||||
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
|
||||
msgStream:
|
||||
search:
|
||||
recvBufSize: 512 # msgPack channel buffer size
|
||||
pulsarBufSize: 512 # pulsar channel buffer size
|
||||
searchResult:
|
||||
recvBufSize: 64 # msgPack channel buffer size
|
||||
# Segcore will divide a segment into multiple chunks.
|
||||
segcore:
|
||||
chunkRows: 32768 # The number of vectors in a chunk.
|
||||
|
||||
|
||||
indexCoord:
|
||||
address: localhost
|
||||
port: 31000
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
indexNode:
|
||||
port: 21121
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
dataCoord:
|
||||
address: localhost
|
||||
port: 13333
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
enableCompaction: true # Enable data segment compression
|
||||
enableGarbageCollection: false
|
||||
|
||||
segment:
|
||||
maxSize: 512 # Maximum size of a segment in MB
|
||||
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
|
||||
assignmentExpiration: 2000 # The time of the assignment expiration in ms
|
||||
|
||||
compaction:
|
||||
enableAutoCompaction: true
|
||||
entityExpiration: 9223372037 # Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and 9223372037 is the maximum value of entityExpiration
|
||||
|
||||
gc:
|
||||
interval: 3600 # gc interval in seconds
|
||||
missingTolerance: 86400 # file meta missing tolerance duration in seconds, 60*24
|
||||
dropTolerance: 86400 # file belongs to dropped entity tolerance duration in seconds, 60*24
|
||||
|
||||
|
||||
dataNode:
|
||||
port: 21124
|
||||
|
||||
grpc:
|
||||
serverMaxRecvSize: 2147483647 # math.MaxInt32
|
||||
serverMaxSendSize: 2147483647 # math.MaxInt32
|
||||
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
|
||||
|
||||
dataSync:
|
||||
flowGraph:
|
||||
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
|
||||
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
|
||||
flush:
|
||||
# Max buffer size to flush for a single segment.
|
||||
insertBufSize: 16777216 # Bytes, 16 MB
|
||||
|
||||
# Configure whether to store the vector and the local path when querying/searching in Querynode.
|
||||
localStorage:
|
||||
path: /var/lib/milvus/data/
|
||||
enabled: true
|
||||
|
||||
# Configures the system log output.
|
||||
log:
|
||||
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
|
||||
file:
|
||||
rootPath: /tmp/milvus/logs # default to stdout, stderr
|
||||
maxSize: 300 # MB
|
||||
maxAge: 10 # Maximum time for log retention in day.
|
||||
maxBackups: 20
|
||||
format: text # text/json
|
||||
|
||||
common:
|
||||
# Channel name generation rule: ${namePrefix}-${ChannelIdx}
|
||||
chanNamePrefix:
|
||||
cluster: "by-dev"
|
||||
rootCoordTimeTick: "rootcoord-timetick"
|
||||
rootCoordStatistics: "rootcoord-statistics"
|
||||
rootCoordDml: "rootcoord-dml"
|
||||
rootCoordDelta: "rootcoord-delta"
|
||||
search: "search"
|
||||
searchResult: "searchResult"
|
||||
queryTimeTick: "queryTimeTick"
|
||||
queryNodeStats: "query-node-stats"
|
||||
# Cmd for loadIndex, flush, etc...
|
||||
cmd: "cmd"
|
||||
dataCoordStatistic: "datacoord-statistics-channel"
|
||||
dataCoordTimeTick: "datacoord-timetick-channel"
|
||||
dataCoordSegmentInfo: "segment-info-channel"
|
||||
|
||||
# Sub name generation rule: ${subNamePrefix}-${NodeID}
|
||||
subNamePrefix:
|
||||
rootCoordSubNamePrefix: "rootCoord"
|
||||
proxySubNamePrefix: "proxy"
|
||||
queryNodeSubNamePrefix: "queryNode"
|
||||
dataNodeSubNamePrefix: "dataNode"
|
||||
dataCoordSubNamePrefix: "dataCoord"
|
||||
|
||||
defaultPartitionName: "_default" # default partition name for a collection
|
||||
defaultIndexName: "_default_idx" # default index name
|
||||
retentionDuration: 432000 # 5 days in seconds
|
||||
|
||||
# Default value: auto
|
||||
# Valid values: [auto, avx512, avx2, avx, sse4_2]
|
||||
# This configuration is only used by querynode and indexnode, it selects CPU instruction set for Searching and Index-building.
|
||||
simdType: auto
|
|
@ -22,6 +22,12 @@ etcd:
|
|||
metaSubPath: meta # metaRootPath = rootPath + '/' + metaSubPath
|
||||
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
|
||||
log:
|
||||
# path is one of:
|
||||
# - "default" as os.Stderr,
|
||||
# - "stderr" as os.Stderr,
|
||||
# - "stdout" as os.Stdout,
|
||||
# - file path to append server logs to.
|
||||
path: stdout
|
||||
level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
|
||||
use:
|
||||
embed: false # Whether to enable embedded Etcd (an in-process EtcdServer).
|
||||
|
@ -90,6 +96,7 @@ proxy:
|
|||
maxTaskNum: 1024 # max task number of proxy task queue
|
||||
bufFlagExpireTime: 3600 # second, the time to expire bufFlag from cache in collectResultLoop
|
||||
bufFlagCleanupInterval: 600 # second, the interval to clean bufFlag cache in collectResultLoop
|
||||
ginLogging: true # Whether to produce gin logs.
|
||||
|
||||
|
||||
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.
|
||||
|
|
|
@ -97,6 +97,13 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
|||
// startHTTPServer starts the http server, panic when failed
|
||||
func (s *Server) startHTTPServer(port int) {
|
||||
defer s.wg.Done()
|
||||
// (Embedded Milvus Only) Discard gin logs if logging is disabled.
|
||||
// We might need to put these logs in some files in the further.
|
||||
// But we don't care about these logs now, at least not in embedded Milvus.
|
||||
if !proxy.Params.ProxyCfg.GinLogging {
|
||||
gin.DefaultWriter = io.Discard
|
||||
gin.DefaultErrorWriter = io.Discard
|
||||
}
|
||||
ginHandler := gin.Default()
|
||||
apiv1 := ginHandler.Group("/api/v1")
|
||||
httpserver.NewHandlers(s.proxy).RegisterRoutesTo(apiv1)
|
||||
|
|
|
@ -20,8 +20,6 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
|
@ -30,7 +28,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
"github.com/milvus-io/milvus/internal/proxy"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
"github.com/stretchr/testify/assert"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
@ -899,6 +899,14 @@ func Test_NewServer(t *testing.T) {
|
|||
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Update config and start server again to test with different config set.
|
||||
// This works as config will be initialized only once
|
||||
proxy.Params.ProxyCfg.GinLogging = false
|
||||
err = server.Run()
|
||||
assert.Nil(t, err)
|
||||
err = server.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func Test_NewServer_HTTPServerDisabled(t *testing.T) {
|
||||
|
|
|
@ -17,9 +17,10 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
@ -34,7 +35,7 @@ var EtcdServer *embed.Etcd
|
|||
func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
|
||||
if etcdCfg.UseEmbedEtcd {
|
||||
path := etcdCfg.ConfigPath
|
||||
fmt.Println("path", path, "data", etcdCfg.DataDir)
|
||||
log.Info("Setting Etcd config", zap.String("path", path), zap.String("data", etcdCfg.DataDir))
|
||||
var cfg *embed.Config
|
||||
if len(path) > 0 {
|
||||
cfgFromFile, err := embed.ConfigFromFile(path)
|
||||
|
@ -46,6 +47,7 @@ func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
|
|||
cfg = embed.NewConfig()
|
||||
}
|
||||
cfg.Dir = etcdCfg.DataDir
|
||||
cfg.LogOutputs = []string{etcdCfg.EtcdLogPath}
|
||||
cfg.LogLevel = etcdCfg.EtcdLogLevel
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
|
|
|
@ -108,7 +108,7 @@ var once sync.Once
|
|||
// SetupLogger is used to initialize the log with config.
|
||||
func SetupLogger(cfg *log.Config) {
|
||||
once.Do(func() {
|
||||
// initialize logger
|
||||
// Initialize logger.
|
||||
logger, p, err := log.InitLogger(cfg, zap.AddStacktrace(zap.ErrorLevel))
|
||||
if err == nil {
|
||||
log.ReplaceGlobals(logger, p)
|
||||
|
@ -119,6 +119,9 @@ func SetupLogger(cfg *log.Config) {
|
|||
// initialize grpc and etcd logger
|
||||
wrapper := &zapWrapper{logger}
|
||||
grpclog.SetLoggerV2(wrapper)
|
||||
|
||||
log.Info("Log directory", zap.String("configDir", cfg.File.RootPath))
|
||||
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -18,10 +18,9 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -49,6 +48,8 @@ const (
|
|||
DefaultEnvPrefix = "milvus"
|
||||
)
|
||||
|
||||
var DefaultYaml = "milvus.yaml"
|
||||
|
||||
// Base abstracts BaseTable
|
||||
// TODO: it's never used, consider to substitute BaseTable or to remove it
|
||||
type Base interface {
|
||||
|
@ -62,6 +63,7 @@ type Base interface {
|
|||
|
||||
// BaseTable the basics of paramtable
|
||||
type BaseTable struct {
|
||||
once sync.Once
|
||||
params *memkv.MemoryKV
|
||||
configDir string
|
||||
|
||||
|
@ -70,17 +72,23 @@ type BaseTable struct {
|
|||
LogCfgFunc func(log.Config)
|
||||
}
|
||||
|
||||
// Init initializes the paramtable
|
||||
// GlobalInitWithYaml initializes the param table with the given yaml.
|
||||
// We will update the global DefaultYaml variable directly, once and for all.
|
||||
// GlobalInitWithYaml shall be called at the very beginning before initiating the base table.
|
||||
// GlobalInitWithYaml should be called only in standalone and embedded Milvus.
|
||||
func (gp *BaseTable) GlobalInitWithYaml(yaml string) {
|
||||
gp.once.Do(func() {
|
||||
DefaultYaml = yaml
|
||||
gp.Init()
|
||||
})
|
||||
}
|
||||
|
||||
// Init initializes the param table.
|
||||
func (gp *BaseTable) Init() {
|
||||
gp.params = memkv.NewMemoryKV()
|
||||
|
||||
gp.configDir = gp.initConfPath()
|
||||
log.Debug("config directory", zap.String("configDir", gp.configDir))
|
||||
|
||||
gp.loadFromMilvusYaml()
|
||||
|
||||
gp.loadFromYaml(DefaultYaml)
|
||||
gp.tryLoadFromEnv()
|
||||
|
||||
gp.InitLogCfg()
|
||||
}
|
||||
|
||||
|
@ -118,8 +126,8 @@ func (gp *BaseTable) initConfPath() string {
|
|||
return configDir
|
||||
}
|
||||
|
||||
func (gp *BaseTable) loadFromMilvusYaml() {
|
||||
if err := gp.LoadYaml("milvus.yaml"); err != nil {
|
||||
func (gp *BaseTable) loadFromYaml(file string) {
|
||||
if err := gp.LoadYaml(file); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
@ -378,7 +386,6 @@ func (gp *BaseTable) InitLogCfg() {
|
|||
// SetLogConfig set log config of the base table
|
||||
func (gp *BaseTable) SetLogConfig() {
|
||||
gp.LogCfgFunc = func(cfg log.Config) {
|
||||
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
|
||||
logutil.SetupLogger(&cfg)
|
||||
defer log.Sync()
|
||||
}
|
||||
|
@ -391,7 +398,6 @@ func (gp *BaseTable) SetLogger(id UniqueID) {
|
|||
panic(err)
|
||||
}
|
||||
if rootPath != "" {
|
||||
log.Debug("Set logger ", zap.Int64("id", id), zap.String("role", gp.RoleName))
|
||||
if id < 0 {
|
||||
gp.Log.File.Filename = path.Join(rootPath, gp.RoleName+".log")
|
||||
} else {
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const defaultYaml = "milvus.yaml"
|
||||
|
||||
var baseParams = BaseTable{}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
|
@ -150,10 +152,11 @@ func TestBaseTable_ConfDir(t *testing.T) {
|
|||
// fake dir
|
||||
baseParams.configDir = "./"
|
||||
|
||||
assert.Panics(t, func() { baseParams.loadFromMilvusYaml() })
|
||||
assert.Panics(t, func() { baseParams.loadFromYaml(defaultYaml) })
|
||||
|
||||
baseParams.configDir = rightConfig
|
||||
baseParams.loadFromMilvusYaml()
|
||||
baseParams.loadFromYaml(defaultYaml)
|
||||
baseParams.GlobalInitWithYaml(defaultYaml)
|
||||
}
|
||||
|
||||
func TestBateTable_ConfPath(t *testing.T) {
|
||||
|
|
|
@ -373,6 +373,7 @@ type proxyConfig struct {
|
|||
MaxDimension int64
|
||||
BufFlagExpireTime time.Duration
|
||||
BufFlagCleanupInterval time.Duration
|
||||
GinLogging bool
|
||||
|
||||
// required from QueryCoord
|
||||
SearchResultChannelNames []string
|
||||
|
@ -398,6 +399,7 @@ func (p *proxyConfig) init(base *BaseTable) {
|
|||
p.initMaxTaskNum()
|
||||
p.initBufFlagExpireTime()
|
||||
p.initBufFlagCleanupInterval()
|
||||
p.initGinLogging()
|
||||
}
|
||||
|
||||
// InitAlias initialize Alias member.
|
||||
|
@ -464,6 +466,11 @@ func (p *proxyConfig) initBufFlagCleanupInterval() {
|
|||
p.BufFlagCleanupInterval = time.Duration(interval) * time.Second
|
||||
}
|
||||
|
||||
func (p *proxyConfig) initGinLogging() {
|
||||
// Gin logging is on by default.
|
||||
p.GinLogging = p.Base.ParseBool("proxy.ginLogging", true)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// --- querycoord ---
|
||||
type queryCoordConfig struct {
|
||||
|
|
|
@ -29,6 +29,7 @@ const (
|
|||
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
|
||||
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
|
||||
defaultEtcdLogLevel = "info"
|
||||
defaultEtcdLogPath = "stdout"
|
||||
)
|
||||
|
||||
// ServiceParam is used to quickly and easily access all basic service configurations.
|
||||
|
@ -60,6 +61,7 @@ type EtcdConfig struct {
|
|||
MetaRootPath string
|
||||
KvRootPath string
|
||||
EtcdLogLevel string
|
||||
EtcdLogPath string
|
||||
|
||||
// --- Embed ETCD ---
|
||||
UseEmbedEtcd bool
|
||||
|
@ -83,6 +85,7 @@ func (p *EtcdConfig) LoadCfgToMemory() {
|
|||
p.initMetaRootPath()
|
||||
p.initKvRootPath()
|
||||
p.initEtcdLogLevel()
|
||||
p.initEtcdLogPath()
|
||||
}
|
||||
|
||||
func (p *EtcdConfig) initUseEmbedEtcd() {
|
||||
|
@ -138,6 +141,10 @@ func (p *EtcdConfig) initEtcdLogLevel() {
|
|||
p.EtcdLogLevel = p.Base.LoadWithDefault("etcd.log.level", defaultEtcdLogLevel)
|
||||
}
|
||||
|
||||
func (p *EtcdConfig) initEtcdLogPath() {
|
||||
p.EtcdLogPath = p.Base.LoadWithDefault("etcd.log.path", defaultEtcdLogPath)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// --- pulsar ---
|
||||
type PulsarConfig struct {
|
||||
|
|
|
@ -26,6 +26,8 @@ type IntPrimaryKey = int64
|
|||
type UniqueID = int64
|
||||
|
||||
const (
|
||||
// EmbeddedRole is for embedded Milvus.
|
||||
EmbeddedRole = "embedded"
|
||||
// StandaloneRole is a constant represent Standalone
|
||||
StandaloneRole = "standalone"
|
||||
// RootCoordRole is a constant represent RootCoord
|
||||
|
|
Loading…
Reference in New Issue