enhance: Support access log use stdout with write cache and auto flush as time (#32213)

relate: https://github.com/milvus-io/milvus/issues/28948

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/33083/head
aoiasd 2024-05-16 10:05:34 +08:00 committed by GitHub
parent 9a56cba7af
commit 875ad88d84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 178 additions and 14 deletions

View File

@ -20,6 +20,7 @@ import (
"io"
"strconv"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -175,18 +176,15 @@ func initFormatter(logCfg *paramtable.AccessLogConfig) (*FormatterManger, error)
// initAccessLogger initializes a zap access logger for proxy
func initWriter(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.MinioConfig) (io.Writer, error) {
var lg *RotateWriter
var err error
if len(logCfg.Filename.GetValue()) > 0 {
lg, err = NewRotateWriter(logCfg, minioCfg)
lg, err := NewRotateWriter(logCfg, minioCfg)
if err != nil {
return nil, err
}
if logCfg.CacheSize.GetAsInt() > 0 {
blg := NewCacheWriter(lg, logCfg.CacheSize.GetAsInt())
return blg, nil
clg := NewCacheWriterWithCloser(lg, lg, logCfg.CacheSize.GetAsInt(), logCfg.CacheFlushInterval.GetAsDuration(time.Second))
return clg, nil
}
return lg, nil
}
@ -197,5 +195,10 @@ func initWriter(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.MinioCo
return nil, err
}
if logCfg.CacheSize.GetAsInt() > 0 {
lg := NewCacheWriter(stdout, logCfg.CacheSize.GetAsInt(), logCfg.CacheFlushInterval.GetAsDuration(time.Second))
return lg, nil
}
return stdout, nil
}

View File

@ -131,6 +131,7 @@ func TestAccessLogger_Basic(t *testing.T) {
Params.Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))
testPath := "/tmp/accesstest"
Params.Save(Params.ProxyCfg.AccessLog.Enable.Key, "true")
Params.Save(Params.ProxyCfg.AccessLog.CacheSize.Key, "1024")
Params.Save(Params.ProxyCfg.AccessLog.LocalPath.Key, testPath)
defer os.RemoveAll(testPath)

View File

@ -41,22 +41,93 @@ var (
type CacheWriter struct {
mu sync.Mutex
writer io.Writer
writer *bufio.Writer
closer io.Closer
// interval of auto flush
flushInterval time.Duration
closed bool
closeOnce sync.Once
closeCh chan struct{}
closeWg sync.WaitGroup
}
func NewCacheWriter(writer io.Writer, cacheSize int) *CacheWriter {
return &CacheWriter{
writer: bufio.NewWriterSize(writer, cacheSize),
func NewCacheWriter(writer io.Writer, cacheSize int, flushInterval time.Duration) *CacheWriter {
c := &CacheWriter{
writer: bufio.NewWriterSize(writer, cacheSize),
flushInterval: flushInterval,
closeCh: make(chan struct{}),
}
c.Start()
return c
}
func NewCacheWriterWithCloser(writer io.Writer, closer io.Closer, cacheSize int, flushInterval time.Duration) *CacheWriter {
c := &CacheWriter{
writer: bufio.NewWriterSize(writer, cacheSize),
flushInterval: flushInterval,
closer: closer,
closeCh: make(chan struct{}),
}
c.Start()
return c
}
func (l *CacheWriter) Write(p []byte) (n int, err error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.closed {
return 0, fmt.Errorf("write to closed writer")
}
return l.writer.Write(p)
}
func (l *CacheWriter) Flush() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.writer.Flush()
}
func (l *CacheWriter) Start() {
l.closeWg.Add(1)
go func() {
defer l.closeWg.Done()
if l.flushInterval == 0 {
return
}
ticker := time.NewTicker(l.flushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
l.Flush()
case <-l.closeCh:
return
}
}
}()
}
func (l *CacheWriter) Close() {
l.mu.Lock()
defer l.mu.Unlock()
l.closeOnce.Do(func() {
l.closed = true
close(l.closeCh)
l.closeWg.Wait()
// flush remaining bytes
l.writer.Flush()
if l.closer != nil {
l.closer.Close()
}
})
}
// a rotated file writer
type RotateWriter struct {
// local path is the path to save log before update to minIO
@ -94,6 +165,7 @@ func NewRotateWriter(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.Mi
rotatedTime: logCfg.RotatedTime.GetAsInt64(),
maxSize: logCfg.MaxSize.GetAsInt(),
maxBackups: logCfg.MaxBackups.GetAsInt(),
closeCh: make(chan struct{}),
}
log.Info("Access log save to " + logger.dir())
if logCfg.MinioEnable.GetAsBool() {
@ -306,8 +378,6 @@ func (l *RotateWriter) timeRotating() {
// start rotate log file by time
func (l *RotateWriter) start() {
l.closeCh = make(chan struct{})
l.closeWg = sync.WaitGroup{}
if l.rotatedTime > 0 {
l.closeWg.Add(1)
go l.timeRotating()

View File

@ -17,8 +17,12 @@
package accesslog
import (
"bytes"
"io"
"os"
"path"
"strings"
"sync"
"testing"
"time"
@ -216,3 +220,79 @@ func TestRotateWriter_Close(t *testing.T) {
_, err = logger.Write([]byte("test"))
assert.Error(t, err)
}
func TestCacheWriter_Normal(t *testing.T) {
buffer := bytes.NewBuffer(make([]byte, 0))
writer := NewCacheWriter(buffer, 512, 0)
writer.Write([]byte("111\n"))
_, err := buffer.ReadByte()
assert.Error(t, err, io.EOF)
writer.Flush()
b, err := buffer.ReadBytes('\n')
assert.Equal(t, 4, len(b))
assert.NoError(t, err)
writer.Write([]byte(strings.Repeat("1", 512) + "\n"))
b, err = buffer.ReadBytes('\n')
assert.Equal(t, 513, len(b))
assert.NoError(t, err)
writer.Close()
// writer to closed writer
_, err = writer.Write([]byte(strings.Repeat("1", 512) + "\n"))
assert.Error(t, err)
}
type TestWriter struct {
closed bool
buffer *bytes.Buffer
mu sync.Mutex
}
func (w *TestWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.buffer.Write(p)
}
func (w *TestWriter) ReadBytes(delim byte) (line []byte, err error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.buffer.ReadBytes(delim)
}
func (w *TestWriter) ReadByte() (byte, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.buffer.ReadByte()
}
func (w *TestWriter) Close() error {
w.closed = true
return nil
}
func TestCacheWriter_WithAutoFlush(t *testing.T) {
buffer := &TestWriter{buffer: bytes.NewBuffer(make([]byte, 0))}
writer := NewCacheWriterWithCloser(buffer, buffer, 512, 1*time.Second)
writer.Write([]byte("111\n"))
_, err := buffer.ReadByte()
assert.Error(t, err, io.EOF)
assert.Eventually(t, func() bool {
b, err := buffer.ReadBytes('\n')
if err != nil {
return false
}
assert.Equal(t, 4, len(b))
return true
}, 3*time.Second, 1*time.Second)
writer.Close()
assert.True(t, buffer.closed)
}

View File

@ -994,12 +994,14 @@ type AccessLogConfig struct {
LocalPath ParamItem `refreshable:"false"`
Filename ParamItem `refreshable:"false"`
MaxSize ParamItem `refreshable:"false"`
CacheSize ParamItem `refreshable:"false"`
RotatedTime ParamItem `refreshable:"false"`
MaxBackups ParamItem `refreshable:"false"`
RemotePath ParamItem `refreshable:"false"`
RemoteMaxTime ParamItem `refreshable:"false"`
Formatter ParamGroup `refreshable:"false"`
CacheSize ParamItem `refreshable:"false"`
CacheFlushInterval ParamItem `refreshable:"false"`
}
type proxyConfig struct {
@ -1255,11 +1257,19 @@ please adjust in embedded Milvus: false`,
Key: "proxy.accessLog.cacheSize",
Version: "2.3.2",
DefaultValue: "10240",
Doc: "Size of log of memory cache, in B",
Doc: "Size of log of memory cache, in B. (Close write cache if szie was 0",
Export: true,
}
p.AccessLog.CacheSize.Init(base.mgr)
p.AccessLog.CacheFlushInterval = ParamItem{
Key: "proxy.accessLog.cacheSize",
Version: "2.4.0",
DefaultValue: "3",
Doc: "time interval of auto flush memory cache, in Seconds. (Close auto flush if interval was 0)",
}
p.AccessLog.CacheFlushInterval.Init(base.mgr)
p.AccessLog.MaxBackups = ParamItem{
Key: "proxy.accessLog.maxBackups",
Version: "2.2.0",