mirror of https://github.com/milvus-io/milvus.git
fix: stats log lost after disable stats log loading on flush (#36592)
issue: #36555 Signed-off-by: chyezh <chyezh@outlook.com>pull/36571/head
parent
a6545b2e29
commit
a47abb2f2b
|
@ -132,6 +132,10 @@ func (dsService *DataSyncService) GetMetaCache() metacache.MetaCache {
|
|||
return dsService.metacache
|
||||
}
|
||||
|
||||
func getMetaCacheForStreaming(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) {
|
||||
return initMetaCache(initCtx, params.ChunkManager, info, nil, unflushed, flushed)
|
||||
}
|
||||
|
||||
func getMetaCacheWithTickler(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) {
|
||||
tickler.SetTotal(int32(len(unflushed) + len(flushed)))
|
||||
return initMetaCache(initCtx, params.ChunkManager, info, tickler, unflushed, flushed)
|
||||
|
@ -161,7 +165,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
|
|||
return nil, err
|
||||
}
|
||||
segmentPks.Insert(segment.GetID(), pkoracle.NewBloomFilterSet(stats...))
|
||||
if !streamingutil.IsStreamingServiceEnabled() {
|
||||
if tickler != nil {
|
||||
tickler.Inc()
|
||||
}
|
||||
|
||||
|
@ -180,8 +184,11 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
|
|||
}
|
||||
}
|
||||
|
||||
// growing segments's stats should always be loaded, for generating merged pk bf.
|
||||
loadSegmentStats("growing", unflushed)
|
||||
loadSegmentStats("sealed", flushed)
|
||||
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
|
||||
loadSegmentStats("sealed", flushed)
|
||||
}
|
||||
|
||||
// use fetched segment info
|
||||
info.Vchan.FlushedSegments = flushed
|
||||
|
@ -344,22 +351,10 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
|
|||
}
|
||||
}
|
||||
|
||||
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
|
||||
// In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter.
|
||||
// So, here we skip loading the bloom filter for flushed segments.
|
||||
info.Vchan.FlushedSegments = flushedSegmentInfos
|
||||
info.Vchan.UnflushedSegments = unflushedSegmentInfos
|
||||
metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
|
||||
return pkoracle.NewBloomFilterSet()
|
||||
}, metacache.NoneBm25StatsFactory)
|
||||
} else {
|
||||
// init metaCache meta
|
||||
metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// init metaCache meta
|
||||
if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
|
||||
}
|
||||
|
||||
|
@ -367,6 +362,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
|
|||
// recover segment checkpoints
|
||||
var (
|
||||
err error
|
||||
metaCache metacache.MetaCache
|
||||
unflushedSegmentInfos []*datapb.SegmentInfo
|
||||
flushedSegmentInfos []*datapb.SegmentInfo
|
||||
)
|
||||
|
@ -383,13 +379,10 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
|
|||
}
|
||||
}
|
||||
|
||||
// In streaming service mode, flushed segments no longer maintain a bloom filter.
|
||||
// So, here we skip loading the bloom filter for flushed segments.
|
||||
info.Vchan.UnflushedSegments = unflushedSegmentInfos
|
||||
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
|
||||
return pkoracle.NewBloomFilterSet()
|
||||
}, metacache.NoneBm25StatsFactory)
|
||||
|
||||
// init metaCache meta
|
||||
if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue