add access log for proxy (#19927)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/20488/head
aoiasd 2022-11-10 17:09:06 +08:00 committed by GitHub
parent c71c6378ff
commit fca2b71e28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1483 additions and 10 deletions

View File

@ -161,6 +161,9 @@ proxy:
maxTaskNum: 1024 # max task number of proxy task queue
# please adjust in embedded Milvus: false
ginLogging: true # Whether to produce gin logs.
accessLog:
localPath: /tmp/accesslog
filename: milvus_access_log.log
# Related configuration of queryCoord, used to manage topology and load balancing for the query nodes, and handoff from growing segments to sealed segments.

View File

@ -30,6 +30,7 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/proxy/accesslog"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/gin-gonic/gin"
@ -181,6 +182,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
proxy.UnaryServerInterceptor(proxy.PrivilegeInterceptor),
logutil.UnaryTraceLoggerInterceptor,
proxy.RateLimitInterceptor(limiter),
accesslog.UnaryAccessLoggerInterceptor,
)),
}

View File

@ -26,8 +26,6 @@ const (
// FileLogConfig serializes file log related config in toml/json.
type FileLogConfig struct {
// Log rootpath
RootPath string `toml:"rootpath" json:"rootpath"`
// Log filename, leave empty to disable file log.
Filename string `toml:"filename" json:"filename"`
// Max size for a single file, in MB.
@ -79,7 +77,7 @@ type ZapProperties struct {
}
func newZapTextEncoder(cfg *Config) zapcore.Encoder {
return NewTextEncoder(cfg)
return NewTextEncoderByConfig(cfg)
}
func (cfg *Config) buildOptions(errSink zapcore.WriteSyncer) []zap.Option {

View File

@ -83,7 +83,7 @@ func TestZapTextEncoder(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
encoder := NewTextEncoder(conf)
encoder := NewTextEncoderByConfig(conf)
logger := zap.New(zapcore.NewCore(encoder, zapcore.AddSync(writer), zapcore.InfoLevel)).Sugar()
logger.Info("this is a message from zap")

View File

@ -141,7 +141,7 @@ func TestTimeEncoder(t *testing.T) {
tt := time.Unix(sec, nsec).In(as)
conf := &Config{Level: "debug", File: FileLogConfig{}, DisableTimestamp: true}
enc := NewTextEncoder(conf).(*textEncoder)
enc := NewTextEncoderByConfig(conf).(*textEncoder)
DefaultTimeEncoder(tt, enc)
assert.Equal(t, "2019/01/11 15:45:41.165 +08:00", enc.buf.String())
@ -169,7 +169,7 @@ func TestZapCaller(t *testing.T) {
"undefined",
}
conf := &Config{Level: "deug", File: FileLogConfig{}, DisableTimestamp: true}
enc := NewTextEncoder(conf).(*textEncoder)
enc := NewTextEncoderByConfig(conf).(*textEncoder)
for i, d := range data {
ShortCallerEncoder(d, enc)

View File

@ -105,9 +105,18 @@ type textEncoder struct {
reflectEnc *json.Encoder
}
// NewTextEncoder creates a fast, low-allocation Text encoder. The encoder
func NewTextEncoder(encoderConfig *zapcore.EncoderConfig, spaced bool, disableErrorVerbose bool) zapcore.Encoder {
return &textEncoder{
EncoderConfig: encoderConfig,
buf: _pool.Get(),
spaced: spaced,
disableErrorVerbose: disableErrorVerbose,
}
}
// NewTextEncoderByConfig creates a fast, low-allocation Text encoder with config. The encoder
// appropriately escapes all field keys and values.
func NewTextEncoder(cfg *Config) zapcore.Encoder {
func NewTextEncoderByConfig(cfg *Config) zapcore.Encoder {
cc := zapcore.EncoderConfig{
// Keys can be anything except the empty string.
TimeKey: "time",

View File

@ -0,0 +1,156 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"fmt"
"path"
"sync"
"sync/atomic"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
clientRequestIDKey = "client_request_id"
)
var _globalL, _globalW atomic.Value
var once sync.Once
func A() *zap.Logger {
return _globalL.Load().(*zap.Logger)
}
func W() *RotateLogger {
return _globalW.Load().(*RotateLogger)
}
func SetupAccseeLog(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.MinioConfig) {
once.Do(func() {
_, err := InitAccessLogger(logCfg, minioCfg)
if err != nil {
log.Fatal("initialize access logger error", zap.Error(err))
}
})
}
// InitAccessLogger initializes a zap access logger for proxy
func InitAccessLogger(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.MinioConfig) (*RotateLogger, error) {
var lg *RotateLogger
var err error
if !logCfg.Enable {
return nil, nil
}
var writeSyncer zapcore.WriteSyncer
if len(logCfg.Filename) > 0 {
lg, err = NewRotateLogger(logCfg, minioCfg)
if err != nil {
return nil, err
}
writeSyncer = zapcore.AddSync(lg)
} else {
stdout, _, err := zap.Open([]string{"stdout"}...)
if err != nil {
return nil, err
}
writeSyncer = stdout
}
encoder := NewAccessEncoder()
logger := zap.New(zapcore.NewCore(encoder, writeSyncer, zapcore.DebugLevel))
logger.Info("Access log start successful")
_globalL.Store(logger)
_globalW.Store(lg)
return lg, nil
}
func NewAccessEncoder() zapcore.Encoder {
encoderConfig := zapcore.EncoderConfig{
TimeKey: "ts",
NameKey: "logger",
FunctionKey: zapcore.OmitKey,
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeTime: log.DefaultTimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
}
return log.NewTextEncoder(&encoderConfig, false, false)
}
func PrintAccessInfo(ctx context.Context, resp interface{}, err error, rpcInfo *grpc.UnaryServerInfo, timeCost int64) bool {
if _globalL.Load() == nil {
return false
}
fields := []zap.Field{
//format time cost of task
zap.String("timeCost", fmt.Sprintf("%d ms", timeCost)),
}
//get trace ID of task
traceID, ok := getTraceID(ctx)
if !ok {
log.Warn("access log print failed: cloud not get trace ID")
return false
}
fields = append(fields, zap.String("traceId", traceID))
//get response size of task
responseSize, ok := getResponseSize(resp)
if !ok {
log.Warn("access log print failed: cloud not get response size")
return false
}
fields = append(fields, zap.Int("responseSize", responseSize))
//get err code of task
errCode, ok := getErrCode(resp)
if !ok {
log.Warn("access log print failed: cloud not get error code")
return false
}
fields = append(fields, zap.Int("errorCode", errCode))
//get status of grpc
Status := getGrpcStatus(err)
if Status == "OK" && errCode > 0 {
Status = "TaskFailed"
}
//get method name of grpc
_, methodName := path.Split(rpcInfo.FullMethod)
A().Info(fmt.Sprintf("%v: %s-%s", Status, getAccessAddr(ctx), methodName), fields...)
return true
}
func Rotate() error {
err := W().Rotate()
return err
}

View File

@ -0,0 +1,209 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"net"
"os"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
func TestAccessLogger_NotEnable(t *testing.T) {
var Params paramtable.ComponentParam
closer := trace.InitTracing("test-trace")
defer closer.Close()
Params.Init()
Params.ProxyCfg.AccessLog.Enable = false
InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
rpcInfo := &grpc.UnaryServerInfo{Server: nil, FullMethod: "testMethod"}
ok := PrintAccessInfo(ctx, resp, nil, rpcInfo, 0)
assert.False(t, ok)
}
func TestAccessLogger_Basic(t *testing.T) {
var Params paramtable.ComponentParam
closer := trace.InitTracing("test-trace")
defer closer.Close()
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
defer os.RemoveAll(testPath)
InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
rpcInfo := &grpc.UnaryServerInfo{Server: nil, FullMethod: "testMethod"}
ok := PrintAccessInfo(ctx, resp, nil, rpcInfo, 0)
assert.True(t, ok)
}
func TestAccessLogger_Stdout(t *testing.T) {
var Params paramtable.ComponentParam
closer := trace.InitTracing("test-trace")
defer closer.Close()
Params.Init()
Params.ProxyCfg.AccessLog.Filename = ""
InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
rpcInfo := &grpc.UnaryServerInfo{Server: nil, FullMethod: "testMethod"}
ok := PrintAccessInfo(ctx, resp, nil, rpcInfo, 0)
assert.True(t, ok)
}
func TestAccessLogger_WithMinio(t *testing.T) {
var Params paramtable.ComponentParam
closer := trace.InitTracing("test-trace")
defer closer.Close()
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MinioEnable = true
Params.ProxyCfg.AccessLog.RemotePath = "access_log/"
defer os.RemoveAll(testPath)
InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
rpcInfo := &grpc.UnaryServerInfo{Server: nil, FullMethod: "testMethod"}
ok := PrintAccessInfo(ctx, resp, nil, rpcInfo, 0)
assert.True(t, ok)
W().Rotate()
defer W().handler.Clean()
time.Sleep(time.Duration(1) * time.Second)
logfiles, err := W().handler.listAll()
assert.NoError(t, err)
assert.Equal(t, 1, len(logfiles))
}
func TestAccessLogger_Error(t *testing.T) {
var Params paramtable.ComponentParam
closer := trace.InitTracing("test-trace")
defer closer.Close()
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
defer os.RemoveAll(testPath)
InitAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
ctx := peer.NewContext(
context.Background(),
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
rpcInfo := &grpc.UnaryServerInfo{Server: nil, FullMethod: "testMethod"}
ok := PrintAccessInfo(ctx, nil, nil, rpcInfo, 0)
assert.False(t, ok)
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
ok = PrintAccessInfo(ctx, nil, nil, rpcInfo, 0)
assert.False(t, ok)
}

View File

@ -0,0 +1,12 @@
//go:build !linux
// +build !linux
package accesslog
import (
"os"
)
func chown(_ string, _ os.FileInfo) error {
return nil
}

View File

@ -0,0 +1,19 @@
//go:build linux
// +build linux
package accesslog
import (
"os"
"syscall"
)
func chown(name string, info os.FileInfo) error {
f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, info.Mode())
if err != nil {
return err
}
f.Close()
stat := info.Sys().(*syscall.Stat_t)
return os.Chown(name, int(stat.Uid), int(stat.Gid))
}

View File

@ -0,0 +1,353 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
const megabyte = 1024 * 1024
var CheckBucketRetryAttempts uint = 20
var timeFormat = ".2006-01-02T15-04-05.000"
//a rotated file logger for zap.log and could upload sealed log file to minIO
type RotateLogger struct {
//local path is the path to save log before update to minIO
//use os.TempDir()/accesslog if empty
localPath string
fileName string
//the interval time of update log to minIO
rotatedTime int64
//the max size(Mb) of log file
//if local file large than maxSize will update immediately
//close if empty(zero)
maxSize int
//MaxBackups is the maximum number of old log files to retain
//close retention limit if empty(zero)
maxBackups int
handler *minioHandler
size int64
file *os.File
mu sync.Mutex
millCh chan bool
closeCh chan struct{}
closeWg sync.WaitGroup
closeOnce sync.Once
}
func NewRotateLogger(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.MinioConfig) (*RotateLogger, error) {
logger := &RotateLogger{
localPath: logCfg.LocalPath,
fileName: logCfg.Filename,
rotatedTime: logCfg.RotatedTime,
maxSize: logCfg.MaxSize,
maxBackups: logCfg.MaxBackups,
}
log.Info("Access log save to " + logger.dir())
if logCfg.MinioEnable {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
handler, err := NewMinioHandler(ctx, minioCfg, logCfg.RemotePath, logCfg.MaxBackups)
if err != nil {
return nil, err
}
logger.handler = handler
}
logger.start()
return logger, nil
}
func (l *RotateLogger) Write(p []byte) (n int, err error) {
l.mu.Lock()
defer l.mu.Unlock()
writeLen := int64(len(p))
if writeLen > l.max() {
return 0, fmt.Errorf(
"write length %d exceeds maximum file size %d", writeLen, l.max(),
)
}
if l.file == nil {
if err = l.openFileExistingOrNew(); err != nil {
return 0, err
}
}
if l.size+writeLen > l.max() {
if err := l.rotate(); err != nil {
return 0, err
}
}
n, err = l.file.Write(p)
l.size += int64(n)
return n, err
}
func (l *RotateLogger) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
l.closeOnce.Do(func() {
close(l.closeCh)
if l.handler != nil {
l.handler.Close()
}
l.closeWg.Wait()
})
return l.closeFile()
}
func (l *RotateLogger) Rotate() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.rotate()
}
func (l *RotateLogger) rotate() error {
if err := l.closeFile(); err != nil {
return err
}
if err := l.openNewFile(); err != nil {
return err
}
l.mill()
return nil
}
func (l *RotateLogger) openFileExistingOrNew() error {
l.mill()
filename := l.filename()
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return l.openNewFile()
}
if err != nil {
return fmt.Errorf("file to get log file info: %s", err)
}
file, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return l.openNewFile()
}
l.file = file
l.size = info.Size()
return nil
}
func (l *RotateLogger) openNewFile() error {
err := os.MkdirAll(l.dir(), 0744)
if err != nil {
return fmt.Errorf("make directories for new log file filed: %s", err)
}
name := l.filename()
mode := os.FileMode(0644)
info, err := os.Stat(name)
if err == nil {
mode = info.Mode()
newName := l.newBackupName()
if err := os.Rename(name, newName); err != nil {
return fmt.Errorf("can't rename log file: %s", err)
}
log.Info("seal old log to: " + newName)
if l.handler != nil {
l.handler.Update(newName, path.Base(newName))
}
// for linux
if err := chown(name, info); err != nil {
return err
}
}
f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
if err != nil {
return fmt.Errorf("can't open new logfile: %s", err)
}
l.file = f
l.size = 0
return nil
}
func (l *RotateLogger) closeFile() error {
if l.file == nil {
return nil
}
err := l.file.Close()
l.file = nil
return err
}
func (l *RotateLogger) millRunOnce() error {
files, err := l.oldLogFiles()
if err != nil {
return err
}
if l.maxBackups > 0 && l.maxBackups < len(files) {
for _, f := range files[l.maxBackups:] {
errRemove := os.Remove(path.Join(l.dir(), f.fileName))
if err == nil && errRemove != nil {
err = errRemove
}
}
}
return err
}
// millRun runs in a goroutine to remove old log files out of limit.
func (l *RotateLogger) millRun() {
defer l.closeWg.Done()
for {
select {
case <-l.closeCh:
log.Warn("close Access log mill")
return
case <-l.millCh:
_ = l.millRunOnce()
}
}
}
func (l *RotateLogger) mill() {
select {
case l.millCh <- true:
default:
}
}
func (l *RotateLogger) timeRotating() {
ticker := time.NewTicker(time.Duration(l.rotatedTime * int64(time.Second)))
log.Info("start time rotating of access log")
defer ticker.Stop()
defer l.closeWg.Done()
for {
select {
case <-l.closeCh:
log.Warn("close Access file logger")
return
case <-ticker.C:
l.Rotate()
}
}
}
//start rotate log file by time
func (l *RotateLogger) start() {
l.closeCh = make(chan struct{})
l.closeWg = sync.WaitGroup{}
if l.rotatedTime > 0 {
l.closeWg.Add(1)
go l.timeRotating()
}
if l.maxBackups > 0 {
l.closeWg.Add(1)
l.millCh = make(chan bool, 1)
go l.millRun()
}
}
func (l *RotateLogger) max() int64 {
return int64(l.maxSize) * int64(megabyte)
}
func (l *RotateLogger) dir() string {
if l.localPath == "" {
l.localPath = path.Join(os.TempDir(), "accesslog")
}
return l.localPath
}
func (l *RotateLogger) filename() string {
return path.Join(l.dir(), l.fileName)
}
func (l *RotateLogger) prefixAndExt() (string, string) {
ext := path.Ext(l.fileName)
prefix := l.fileName[:len(l.fileName)-len(ext)]
return prefix, ext
}
func (l *RotateLogger) newBackupName() string {
t := time.Now()
timestamp := t.Format(timeFormat)
name, suffix := l.prefixAndExt()
return path.Join(l.dir(), name+timestamp+suffix)
}
func (l *RotateLogger) oldLogFiles() ([]logInfo, error) {
files, err := ioutil.ReadDir(l.dir())
if err != nil {
return nil, fmt.Errorf("can't read log file directory: %s", err)
}
logFiles := []logInfo{}
prefix, ext := l.prefixAndExt()
for _, f := range files {
if f.IsDir() {
continue
}
if t, err := l.timeFromName(f.Name(), prefix, ext); err == nil {
logFiles = append(logFiles, logInfo{t, f.Name()})
continue
}
}
return logFiles, nil
}
func (l *RotateLogger) timeFromName(filename, prefix, ext string) (time.Time, error) {
if !strings.HasPrefix(filename, prefix) {
return time.Time{}, errors.New("mismatched prefix")
}
if !strings.HasSuffix(filename, ext) {
return time.Time{}, errors.New("mismatched extension")
}
ts := filename[len(prefix) : len(filename)-len(ext)]
return time.Parse(timeFormat, ts)
}
type logInfo struct {
timestamp time.Time
fileName string
}

View File

@ -0,0 +1,183 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"os"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
)
func getText(size int) []byte {
var text = make([]byte, size)
for i := 0; i < size; i++ {
text[i] = byte('-')
}
return text
}
func TestRotateLogger_Basic(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MinioEnable = true
Params.ProxyCfg.AccessLog.RemotePath = "access_log/"
defer os.RemoveAll(testPath)
logger, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.NoError(t, err)
defer logger.handler.Clean()
defer logger.Close()
num := 100
text := getText(num)
n, err := logger.Write(text)
assert.Equal(t, num, n)
assert.NoError(t, err)
err = logger.Rotate()
assert.NoError(t, err)
time.Sleep(time.Duration(1) * time.Second)
logfiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.Equal(t, 1, len(logfiles))
}
func TestRotateLogger_TimeRotate(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MinioEnable = true
Params.ProxyCfg.AccessLog.RemotePath = "access_log/"
Params.ProxyCfg.AccessLog.RotatedTime = 2
//close file retention
Params.ProxyCfg.AccessLog.MaxBackups = 0
defer os.RemoveAll(testPath)
logger, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.NoError(t, err)
defer logger.handler.Clean()
defer logger.Close()
num := 100
text := getText(num)
n, err := logger.Write(text)
assert.Equal(t, num, n)
assert.NoError(t, err)
time.Sleep(time.Duration(4) * time.Second)
logfiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.GreaterOrEqual(t, len(logfiles), 1)
}
func TestRotateLogger_SizeRotate(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MinioEnable = true
Params.ProxyCfg.AccessLog.RemotePath = "access_log/"
Params.ProxyCfg.AccessLog.MaxSize = 1
defer os.RemoveAll(testPath)
logger, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.NoError(t, err)
defer logger.handler.Clean()
defer logger.Close()
num := 1024 * 1024
text := getText(num + 1)
_, err = logger.Write(text)
assert.Error(t, err)
for i := 1; i <= 2; i++ {
text = getText(num)
n, err := logger.Write(text)
assert.Equal(t, num, n)
assert.NoError(t, err)
}
time.Sleep(time.Duration(1) * time.Second)
logfiles, err := logger.handler.listAll()
assert.NoError(t, err)
assert.Equal(t, 1, len(logfiles))
}
func TestRotateLogger_LocalRetention(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := "/tmp/accesstest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MaxBackups = 1
defer os.RemoveAll(testPath)
logger, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.NoError(t, err)
defer logger.Close()
logger.Rotate()
logger.Rotate()
time.Sleep(time.Duration(1) * time.Second)
logFiles, err := logger.oldLogFiles()
assert.NoError(t, err)
assert.Equal(t, 1, len(logFiles))
}
func TestRotateLogger_BasicError(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := ""
Params.ProxyCfg.AccessLog.LocalPath = testPath
logger, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.NoError(t, err)
defer os.RemoveAll(logger.dir())
defer logger.Close()
logger.openFileExistingOrNew()
os.Mkdir(path.Join(logger.dir(), "test"), 0744)
logfile, err := logger.oldLogFiles()
assert.NoError(t, err)
assert.Equal(t, 0, len(logfile))
_, err = logger.timeFromName("a.b", "a", "c")
assert.Error(t, err)
_, err = logger.timeFromName("a.b", "d", "c")
assert.Error(t, err)
}
func TestRotateLogger_InitError(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := ""
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.ProxyCfg.AccessLog.MinioEnable = true
Params.MinioCfg.Address = ""
//init err with invalid minio address
_, err := NewRotateLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
assert.Error(t, err)
}

View File

@ -0,0 +1,228 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"fmt"
"strings"
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.uber.org/zap"
)
type config struct {
address string
bucketName string
accessKeyID string
secretAccessKeyID string
useSSL bool
createBucket bool
useIAM bool
iamEndpoint string
}
//minIO client for upload access log
//TODO file retention on minio
type task struct {
objectName string
filePath string
}
type minioHandler struct {
bucketName string
rootPath string
client *minio.Client
taskCh chan task
closeCh chan struct{}
closeWg sync.WaitGroup
closeOnce sync.Once
}
func NewMinioHandler(ctx context.Context, cfg *paramtable.MinioConfig, rootPath string, queueLen int) (*minioHandler, error) {
handlerCfg := config{
address: cfg.Address,
bucketName: cfg.BucketName,
accessKeyID: cfg.AccessKeyID,
secretAccessKeyID: cfg.SecretAccessKey,
useSSL: cfg.UseSSL,
createBucket: true,
useIAM: cfg.UseIAM,
iamEndpoint: cfg.IAMEndpoint,
}
client, err := newMinioClient(ctx, handlerCfg)
if err != nil {
return nil, err
}
handler := &minioHandler{
bucketName: handlerCfg.bucketName,
rootPath: rootPath,
client: client,
}
handler.start(queueLen)
return handler, nil
}
func newMinioClient(ctx context.Context, cfg config) (*minio.Client, error) {
var creds *credentials.Credentials
if cfg.useIAM {
creds = credentials.NewIAM(cfg.iamEndpoint)
} else {
creds = credentials.NewStaticV4(cfg.accessKeyID, cfg.secretAccessKeyID, "")
}
minioClient, err := minio.New(cfg.address, &minio.Options{
Creds: creds,
Secure: cfg.useSSL,
})
// options nil or invalid formatted endpoint, don't need to retry
if err != nil {
return nil, err
}
var bucketExists bool
// check valid in first query
checkBucketFn := func() error {
bucketExists, err = minioClient.BucketExists(ctx, cfg.bucketName)
if err != nil {
log.Warn("failed to check blob bucket exist", zap.String("bucket", cfg.bucketName), zap.Error(err))
return err
}
if !bucketExists {
if cfg.createBucket {
log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", cfg.bucketName))
err := minioClient.MakeBucket(ctx, cfg.bucketName, minio.MakeBucketOptions{})
if err != nil {
log.Warn("failed to create blob bucket", zap.String("bucket", cfg.bucketName), zap.Error(err))
return err
}
} else {
return fmt.Errorf("bucket %s not Existed", cfg.bucketName)
}
}
return nil
}
err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts))
if err != nil {
return nil, err
}
return minioClient, nil
}
func (c *minioHandler) scheduler() {
defer c.closeWg.Done()
for {
select {
case task := <-c.taskCh:
log.Info("Update access log file to minIO",
zap.String("object name", task.objectName),
zap.String("file path", task.filePath))
c.update(task.objectName, task.filePath)
case <-c.closeCh:
log.Warn("close minio logger handler")
return
}
}
}
func (c *minioHandler) start(queueLen int) error {
c.closeWg = sync.WaitGroup{}
c.closeCh = make(chan struct{})
c.taskCh = make(chan task, queueLen)
c.closeWg.Add(1)
go c.scheduler()
return nil
}
func (c *minioHandler) Update(objectName string, filePath string) {
c.taskCh <- task{
objectName: objectName,
filePath: filePath,
}
taskNum := len(c.taskCh)
if taskNum >= cap(c.taskCh)/2 {
log.Warn("Minio Access log file handler was busy", zap.Int("task num", taskNum))
}
}
//update log file to minio
func (c *minioHandler) update(objectName string, filePath string) error {
path := Join(c.rootPath, filePath)
_, err := c.client.FPutObject(context.Background(), c.bucketName, path, objectName, minio.PutObjectOptions{})
return err
}
func (c *minioHandler) removeWithPrefix(prefix string) error {
objects := c.client.ListObjects(context.Background(), c.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true})
for rErr := range c.client.RemoveObjects(context.Background(), c.bucketName, objects, minio.RemoveObjectsOptions{GovernanceBypass: false}) {
if rErr.Err != nil {
log.Warn("failed to remove objects", zap.String("prefix", prefix), zap.Error(rErr.Err))
return rErr.Err
}
}
return nil
}
func (c *minioHandler) listAll() ([]string, error) {
var objectsKeys []string
log.Info(c.rootPath)
objects := c.client.ListObjects(context.Background(), c.bucketName, minio.ListObjectsOptions{Prefix: c.rootPath, Recursive: false})
for object := range objects {
if object.Err != nil {
log.Warn("failed to list with rootpath", zap.String("rootpath", c.rootPath), zap.Error(object.Err))
return nil, object.Err
}
log.Info(object.Key)
// with tailing "/", object is a "directory"
if strings.HasSuffix(object.Key, "/") {
continue
}
objectsKeys = append(objectsKeys, object.Key)
}
return objectsKeys, nil
}
func (c *minioHandler) Clean() error {
err := c.removeWithPrefix(c.rootPath)
return err
}
func (c *minioHandler) Close() error {
c.closeOnce.Do(func() {
close(c.closeCh)
c.closeWg.Wait()
})
return nil
}
func Join(path1, path2 string) string {
if strings.HasSuffix(path1, "/") {
return path1 + path2
}
return path1 + "/" + path2
}

View File

@ -0,0 +1,49 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"os"
"testing"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/assert"
)
func TestMinioHandler_ConnectError(t *testing.T) {
var Params paramtable.ComponentParam
Params.Init()
testPath := "/tme/miniotest"
Params.ProxyCfg.AccessLog.LocalPath = testPath
Params.MinioCfg.UseIAM = true
Params.MinioCfg.Address = ""
defer os.RemoveAll(testPath)
_, err := NewMinioHandler(
context.Background(),
&Params.MinioCfg,
Params.ProxyCfg.AccessLog.RemotePath,
Params.ProxyCfg.AccessLog.MaxBackups,
)
assert.Error(t, err)
}
func TestMinioHandler_Join(t *testing.T) {
assert.Equal(t, "a/b", Join("a", "b"))
assert.Equal(t, "a/b", Join("a/", "b"))
}

View File

@ -0,0 +1,91 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
type BaseResponse interface {
GetStatus() *commonpb.Status
}
func UnaryAccessLoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
starttime := time.Now()
resp, err := handler(ctx, req)
PrintAccessInfo(ctx, resp, err, info, time.Since(starttime).Milliseconds())
return resp, err
}
func getAccessAddr(ctx context.Context) string {
ip, ok := peer.FromContext(ctx)
if !ok {
return "Unknown"
}
return fmt.Sprintf("%s-%s", ip.Addr.Network(), ip.Addr.String())
}
func getTraceID(ctx context.Context) (id string, ok bool) {
meta, ok := metadata.FromOutgoingContext(ctx)
if ok {
return meta.Get(clientRequestIDKey)[0], true
}
traceID, _, ok := trace.InfoFromContext(ctx)
if ok {
return traceID, true
}
return "", false
}
func getResponseSize(resq interface{}) (int, bool) {
message, ok := resq.(proto.Message)
if !ok {
return 0, false
}
return proto.Size(message), true
}
func getErrCode(resp interface{}) (int, bool) {
baseResp, ok := resp.(BaseResponse)
if !ok {
return 0, false
}
status := baseResp.GetStatus()
return int(status.ErrorCode), true
}
func getGrpcStatus(err error) string {
code := status.Code(err)
if code != codes.OK {
return fmt.Sprintf("Grpc%s", code.String())
}
return code.String()
}

View File

@ -0,0 +1,98 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package accesslog
import (
"context"
"net"
"testing"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
func TestGetAccessAddr(t *testing.T) {
ctx := context.Background()
addr := getAccessAddr(ctx)
assert.Equal(t, "Unknown", addr)
newctx := peer.NewContext(
ctx,
&peer.Peer{
Addr: &net.IPAddr{
IP: net.IPv4(0, 0, 0, 0),
Zone: "test",
},
})
addr = getAccessAddr(newctx)
assert.Equal(t, "ip-0.0.0.0%test", addr)
}
func TestGetTraceID(t *testing.T) {
ctx := context.Background()
_, ok := getTraceID(ctx)
assert.False(t, ok)
traceSpan, traceContext := trace.StartSpanFromContext(ctx)
trueTraceID, _, _ := trace.InfoFromSpan(traceSpan)
ID, ok := getTraceID(traceContext)
assert.True(t, ok)
assert.Equal(t, trueTraceID, ID)
ctx = metadata.AppendToOutgoingContext(ctx, clientRequestIDKey, "test")
ID, ok = getTraceID(ctx)
assert.True(t, ok)
assert.Equal(t, "test", ID)
}
func TestGetResponseSize(t *testing.T) {
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
_, ok := getResponseSize(nil)
assert.False(t, ok)
_, ok = getResponseSize(resp)
assert.True(t, ok)
}
func TestGetErrCode(t *testing.T) {
resp := &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
},
Value: false,
}
_, ok := getErrCode(nil)
assert.False(t, ok)
code, ok := getErrCode(resp)
assert.True(t, ok)
assert.Equal(t, int(commonpb.ErrorCode_UnexpectedError), code)
}

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proxy/accesslog"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -186,6 +187,9 @@ func (node *Proxy) Init() error {
node.factory.Init(Params)
log.Debug("init parameters for factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.ServiceParam))
accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
log.Debug("init access log for Proxy done")
err := node.initRateCollector()
if err != nil {
return err

View File

@ -162,7 +162,6 @@ func SetupLogger(cfg *log.Config) {
wrapper := &zapWrapper{logger, logLevel}
grpclog.SetLoggerV2(wrapper)
log.Info("Log directory", zap.String("configDir", cfg.File.RootPath))
log.Info("Set log file to ", zap.String("path", cfg.File.Filename))
})
}

View File

@ -487,6 +487,25 @@ func (p *rootCoordConfig) init(base *BaseTable) {
// /////////////////////////////////////////////////////////////////////////////
// --- proxy ---
type AccessLogConfig struct {
// if use access log
Enable bool
// if upload sealed access log file to minio
MinioEnable bool
// Log path
LocalPath string
// Log filename, leave empty to disable file log.
Filename string
// Max size for a single file, in MB.
MaxSize int
// Max time for single access log file in seconds
RotatedTime int64
// Maximum number of old log files to retain.
MaxBackups int
//File path in minIO
RemotePath string
}
type proxyConfig struct {
Base *BaseTable
@ -505,6 +524,7 @@ type proxyConfig struct {
GinLogging bool
MaxUserNum int
MaxRoleNum int
AccessLog AccessLogConfig
// required from QueryCoord
SearchResultChannelNames []string
@ -535,6 +555,7 @@ func (p *proxyConfig) init(base *BaseTable) {
p.initMaxRoleNum()
p.initSoPath()
p.initAccessLogConfig()
}
// InitAlias initialize Alias member.
@ -645,7 +666,38 @@ func (p *proxyConfig) initMaxRoleNum() {
p.MaxRoleNum = int(maxRoleNum)
}
// /////////////////////////////////////////////////////////////////////////////
func (p *proxyConfig) initAccessLogConfig() {
enable := p.Base.ParseBool("proxy.accessLog.enable", true)
minioEnable := p.Base.ParseBool("proxy.accessLog.minioEnable", false)
p.AccessLog = AccessLogConfig{
Enable: enable,
MinioEnable: minioEnable,
}
if enable {
p.initAccessLogFileConfig()
}
if minioEnable {
p.initAccessLogMinioConfig()
}
}
func (p *proxyConfig) initAccessLogFileConfig() {
//use os.TempDir() if localPath was empty
p.AccessLog.LocalPath = p.Base.LoadWithDefault("proxy.accessLog.localPath", "")
p.AccessLog.Filename = p.Base.LoadWithDefault("proxy.accessLog.filename", "milvus_access_log.log")
p.AccessLog.MaxSize = p.Base.ParseIntWithDefault("proxy.accessLog.maxSize", 64)
p.AccessLog.MaxBackups = p.Base.ParseIntWithDefault("proxy.accessLog.maxBackups", 8)
p.AccessLog.RotatedTime = p.Base.ParseInt64WithDefault("proxy.accessLog.rotatedTime", 3600)
}
func (p *proxyConfig) initAccessLogMinioConfig() {
p.AccessLog.RemotePath = p.Base.LoadWithDefault("proxy.accessLog.remotePath", "access_log/")
}
///////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type queryCoordConfig struct {
Base *BaseTable

View File

@ -155,6 +155,14 @@ func TestComponentParam(t *testing.T) {
t.Logf("MaxDimension: %d", Params.MaxDimension)
t.Logf("MaxTaskNum: %d", Params.MaxTaskNum)
t.Logf("AccessLog.Enable: %t", Params.AccessLog.Enable)
t.Logf("AccessLog.MaxSize: %d", Params.AccessLog.MaxSize)
t.Logf("AccessLog.MaxBackups: %d", Params.AccessLog.MaxBackups)
t.Logf("AccessLog.MaxDays: %d", Params.AccessLog.RotatedTime)
})
t.Run("test proxyConfig panic", func(t *testing.T) {