mirror of https://github.com/milvus-io/milvus.git
enhance: Use pre-built logger for write buffer frequent ops (#33273)
See also #33266 Each `WriteBuffer` shall have same channel/collection id attribute, so use same logger will do and reduce logger allocation & frequent name composition Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/33267/head
parent
33144a43d4
commit
e1bafd7105
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
|
@ -143,6 +142,7 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
|
|||
}
|
||||
|
||||
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPosition) int64 {
|
||||
log := wb.logger
|
||||
segmentID, ok := wb.l0Segments[partitionID]
|
||||
if !ok {
|
||||
err := retry.Do(context.Background(), func() error {
|
||||
|
@ -168,7 +168,6 @@ func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPo
|
|||
log.Info("Add a new level zero segment",
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.String("level", datapb.SegmentLevel_L0.String()),
|
||||
zap.String("channel", wb.channelName),
|
||||
zap.Any("start position", startPos),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -96,6 +96,10 @@ type writeBufferBase struct {
|
|||
flushTimestamp *atomic.Uint64
|
||||
|
||||
storagev2Cache *metacache.StorageV2Cache
|
||||
|
||||
// pre build logger
|
||||
logger *log.MLogger
|
||||
cpRatedLogger *log.MLogger
|
||||
}
|
||||
|
||||
func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, option *writeBufferOption) (*writeBufferBase, error) {
|
||||
|
@ -127,7 +131,7 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return &writeBufferBase{
|
||||
wb := &writeBufferBase{
|
||||
channelName: channel,
|
||||
collectionID: metacache.Collection(),
|
||||
collSchema: schema,
|
||||
|
@ -140,7 +144,13 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2
|
|||
syncPolicies: option.syncPolicies,
|
||||
flushTimestamp: flushTs,
|
||||
storagev2Cache: storageV2Cache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
wb.logger = log.With(zap.Int64("collectionID", wb.collectionID),
|
||||
zap.String("channel", wb.channelName))
|
||||
wb.cpRatedLogger = wb.logger.WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
|
||||
|
||||
return wb, nil
|
||||
}
|
||||
|
||||
func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
|
||||
|
@ -178,13 +188,10 @@ func (wb *writeBufferBase) MemorySize() int64 {
|
|||
}
|
||||
|
||||
func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
|
||||
log := wb.logger
|
||||
wb.mut.Lock()
|
||||
defer wb.mut.Unlock()
|
||||
|
||||
log := log.Ctx(context.Background()).With(
|
||||
zap.Int64("collectionID", wb.collectionID),
|
||||
zap.String("channel", wb.channelName),
|
||||
)
|
||||
// need valid checkpoint before triggering syncing
|
||||
if wb.checkpoint == nil {
|
||||
log.Warn("evict buffer before buffering data")
|
||||
|
@ -201,9 +208,7 @@ func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
|
|||
}
|
||||
|
||||
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
|
||||
log := log.Ctx(context.Background()).
|
||||
With(zap.String("channel", wb.channelName)).
|
||||
WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
|
||||
log := wb.cpRatedLogger
|
||||
wb.mut.RLock()
|
||||
defer wb.mut.RUnlock()
|
||||
|
||||
|
@ -556,6 +561,7 @@ func (wb *writeBufferBase) getEstBatchSize() uint {
|
|||
}
|
||||
|
||||
func (wb *writeBufferBase) Close(drop bool) {
|
||||
log := wb.logger
|
||||
// sink all data and call Drop for meta writer
|
||||
wb.mut.Lock()
|
||||
defer wb.mut.Unlock()
|
||||
|
@ -583,13 +589,13 @@ func (wb *writeBufferBase) Close(drop bool) {
|
|||
|
||||
err := conc.AwaitAll(futures...)
|
||||
if err != nil {
|
||||
log.Error("failed to sink write buffer data", zap.String("channel", wb.channelName), zap.Error(err))
|
||||
log.Error("failed to sink write buffer data", zap.Error(err))
|
||||
// TODO change to remove channel in the future
|
||||
panic(err)
|
||||
}
|
||||
err = wb.metaWriter.DropChannel(wb.channelName)
|
||||
if err != nil {
|
||||
log.Error("failed to drop channel", zap.String("channel", wb.channelName), zap.Error(err))
|
||||
log.Error("failed to drop channel", zap.Error(err))
|
||||
// TODO change to remove channel in the future
|
||||
panic(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue