mirror of https://github.com/milvus-io/milvus.git
972 lines
27 KiB
972 lines
27 KiB
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
type (
primaryKey = storage.PrimaryKey
int64PrimaryKey = storage.Int64PrimaryKey
varCharPrimaryKey = storage.VarCharPrimaryKey
var (
newInt64PrimaryKey = storage.NewInt64PrimaryKey
newVarCharPrimaryKey = storage.NewVarCharPrimaryKey
// Channel is DataNode unique replication
type Channel interface {
getCollectionID() UniqueID
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error)
getChannelName(segID UniqueID) string
addSegment(req addSegmentReq) error
getSegment(segID UniqueID) *Segment
removeSegments(segID ...UniqueID)
hasSegment(segID UniqueID, countFlushed bool) bool
InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error
RollPKstats(segID UniqueID, stats *storage.PrimaryKeyStats)
listAllSegmentIDs() []UniqueID
listNotFlushedSegmentIDs() []UniqueID
listPartitionSegments(partID UniqueID) []UniqueID
filterSegments(partitionID UniqueID) []*Segment
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
transferNewSegments(segmentIDs []UniqueID)
updateSegmentPKRange(segID UniqueID, ids storage.FieldData)
mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error
listCompactedSegmentIDs() map[UniqueID][]UniqueID
listSegmentIDsToSync(ts Timestamp) []UniqueID
updateSegmentRowNumber(segID UniqueID, numRows int64)
updateSingleSegmentMemorySize(segID UniqueID)
getSegmentStatisticsUpdates(segID UniqueID) (*commonpb.SegmentStats, error)
segmentFlushed(segID UniqueID)
getChannelCheckpoint(ttPos *msgpb.MsgPosition) *msgpb.MsgPosition
getCurInsertBuffer(segmentID UniqueID) (*BufferData, bool)
setCurInsertBuffer(segmentID UniqueID, buf *BufferData)
rollInsertBuffer(segmentID UniqueID)
evictHistoryInsertBuffer(segmentID UniqueID, endPos *msgpb.MsgPosition)
getCurDeleteBuffer(segmentID UniqueID) (*DelDataBuf, bool)
setCurDeleteBuffer(segmentID UniqueID, buf *DelDataBuf)
rollDeleteBuffer(segmentID UniqueID)
evictHistoryDeleteBuffer(segmentID UniqueID, endPos *msgpb.MsgPosition)
// getTotalMemorySize returns the sum of memory sizes of segments.
getTotalMemorySize() int64
setIsHighMemory(b bool)
getIsHighMemory() bool
getFlushTs() Timestamp
setFlushTs(ts Timestamp)
// ChannelMeta contains channel meta and the latest segments infos of the channel.
type ChannelMeta struct {
collectionID UniqueID
channelName string
collSchema *schemapb.CollectionSchema
schemaMut sync.RWMutex
segMu sync.RWMutex
segments map[UniqueID]*Segment
// isHighMemory is intended to trigger the syncing of segments
// when segment's buffer consumes a significant amount of memory.
isHighMemory *atomic.Bool
// flushTs is intended to trigger:
// 1. the syncing of segments when consumed ts exceeds flushTs;
// 2. the updating of channelCP when channelCP exceeds flushTs.
flushTs *atomic.Uint64
syncPolicies []segmentSyncPolicy
metaService *metaService
chunkManager storage.ChunkManager
workerPool *conc.Pool[any]
closed *atomic.Bool
type addSegmentReq struct {
segType datapb.SegmentType
segID, collID, partitionID UniqueID
numOfRows int64
startPos, endPos *msgpb.MsgPosition
statsBinLogs []*datapb.FieldBinlog
binLogs []*datapb.FieldBinlog
recoverTs Timestamp
importing bool
var _ Channel = &ChannelMeta{}
func newChannel(channelName string, collID UniqueID, schema *schemapb.CollectionSchema, rc types.RootCoord, cm storage.ChunkManager) *ChannelMeta {
metaService := newMetaService(rc, collID)
channel := ChannelMeta{
collectionID: collID,
collSchema: schema,
channelName: channelName,
segments: make(map[UniqueID]*Segment),
isHighMemory: atomic.NewBool(false),
flushTs: atomic.NewUint64(math.MaxUint64),
syncPolicies: []segmentSyncPolicy{
metaService: metaService,
chunkManager: cm,
workerPool: getOrCreateStatsPool(),
closed: atomic.NewBool(false),
return &channel
// segmentFlushed transfers a segment from *New* or *Normal* into *Flushed*.
func (c *ChannelMeta) segmentFlushed(segID UniqueID) {
defer c.segMu.Unlock()
if seg, ok := c.segments[segID]; ok {
// new2NormalSegment transfers a segment from *New* to *Normal*.
// make sure the segID is in the channel before call this func
func (c *ChannelMeta) new2NormalSegment(segID UniqueID) {
if seg, ok := c.segments[segID]; ok && seg.getType() == datapb.SegmentType_New {
func (c *ChannelMeta) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) {
defer c.segMu.RUnlock()
if seg, ok := c.segments[segID]; ok && seg.isValid() {
return seg.collectionID, seg.partitionID, nil
return 0, 0, fmt.Errorf("cannot find segment, id = %d", segID)
func (c *ChannelMeta) getChannelName(segID UniqueID) string {
return c.channelName
// addSegment adds the segment to current channel. Segments can be added as *new*, *normal* or *flushed*.
// Make sure to verify `channel.hasSegment(segID)` == false before calling `channel.addSegment()`.
func (c *ChannelMeta) addSegment(req addSegmentReq) error {
if req.collID != c.collectionID {
log.Warn("failed to addSegment, collection mismatch",
zap.Int64("current collection ID", req.collID),
zap.Int64("expected collection ID", c.collectionID))
return merr.WrapErrParameterInvalid(c.collectionID, req.collID, "collection not match")
log.Info("adding segment",
zap.String("type", req.segType.String()),
zap.Int64("segmentID", req.segID),
zap.Int64("collectionID", req.collID),
zap.Int64("partitionID", req.partitionID),
zap.String("channel", c.channelName),
zap.Any("startPosition", req.startPos),
zap.Any("endPosition", req.endPos),
zap.Uint64("recoverTs", req.recoverTs),
zap.Bool("importing", req.importing),
seg := &Segment{
collectionID: req.collID,
partitionID: req.partitionID,
segmentID: req.segID,
numRows: req.numOfRows, // 0 if segType == NEW
historyInsertBuf: make([]*BufferData, 0),
historyDeleteBuf: make([]*DelDataBuf, 0),
startPos: req.startPos,
// Set up pk stats
err := c.InitPKstats(context.TODO(), seg, req.statsBinLogs, req.recoverTs)
if err != nil {
log.Error("failed to init bloom filter",
zap.Int64("segmentID", req.segID),
return err
c.segments[req.segID] = seg
return nil
func (c *ChannelMeta) getSegment(segID UniqueID) *Segment {
defer c.segMu.RUnlock()
seg, ok := c.segments[segID]
if !ok {
return nil
return seg
func (c *ChannelMeta) listCompactedSegmentIDs() map[UniqueID][]UniqueID {
defer c.segMu.RUnlock()
compactedTo2From := make(map[UniqueID][]UniqueID)
for segID, seg := range c.segments {
if !seg.isValid() {
compactedTo2From[seg.compactedTo] = append(compactedTo2From[seg.compactedTo], segID)
return compactedTo2From
func (c *ChannelMeta) listSegmentIDsToSync(ts Timestamp) []UniqueID {
defer c.segMu.RUnlock()
validSegs := make([]*Segment, 0)
for _, seg := range c.segments {
// flushed segments also need to update cp
if !seg.isValid() {
validSegs = append(validSegs, seg)
segIDsToSync := typeutil.NewUniqueSet()
for _, policy := range c.syncPolicies {
segments := policy(validSegs, c, ts)
for _, segID := range segments {
return segIDsToSync.Collect()
// filterSegments return segments with same partitionID for all segments
// get all segments
func (c *ChannelMeta) filterSegments(partitionID UniqueID) []*Segment {
defer c.segMu.RUnlock()
var results []*Segment
for _, seg := range c.segments {
if seg.isValid() &&
partitionID == common.InvalidPartitionID || seg.partitionID == partitionID {
results = append(results, seg)
return results
func (c *ChannelMeta) InitPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
// mark segment lazy loading
c.submitLoadStatsTask(s, statsBinlogs, ts)
return nil
return c.initPKstats(ctx, s, statsBinlogs, ts)
func (c *ChannelMeta) submitLoadStatsTask(s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) {
log := log.Ctx(context.TODO()).With(
zap.Int64("segmentID", s.segmentID),
zap.Int64("collectionID", s.collectionID),
if c.closed.Load() {
// stop retry and resubmit if channel meta closed
// do submitting in a goroutine in case of task pool is full
go func() {
c.workerPool.Submit(func() (any, error) {
stats, err := c.loadStats(context.Background(), s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
// TODO if not retryable, add rebuild statslog logic
log.Warn("failed to lazy load statslog for segment", zap.Int64("segment", s.segmentID), zap.Error(err))
if c.retryableLoadError(err) && !s.isReleased() {
time.Sleep(100 * time.Millisecond)
log.Warn("failed to lazy load statslog for segment, retrying...", zap.Int64("segment", s.segmentID), zap.Error(err))
c.submitLoadStatsTask(s, statsBinlogs, ts)
return nil, err
// get segment lock here
// it's ok that segment is dropped here
defer c.segMu.Unlock()
s.historyStats = append(s.historyStats, stats...)
log.Info("lazy loading segment statslog complete")
return nil, nil
func (c *ChannelMeta) retryableLoadError(err error) bool {
switch {
case errors.Is(err, merr.ErrParameterInvalid):
// statslog corrupted
return false
case errors.Is(err, storage.ErrNoSuchKey):
// statslog not found
return false
return true
func (c *ChannelMeta) loadStats(ctx context.Context, segmentID int64, collectionID int64, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) ([]*storage.PkStatistics, error) {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))
schema, err := c.getCollectionSchema(collectionID, ts)
if err != nil {
log.Warn("failed to initPKBloomFilter, get schema return error", zap.Error(err))
return nil, err
// get pkfield id
pkField := int64(-1)
for _, field := range schema.Fields {
if field.IsPrimaryKey {
pkField = field.FieldID
// filter stats binlog files which is pk field stats log
bloomFilterFiles := []string{}
logType := storage.DefaultStatsType
for _, binlog := range statsBinlogs {
if binlog.FieldID != pkField {
for _, log := range binlog.GetBinlogs() {
_, logidx := path.Split(log.GetLogPath())
// if special status log exist
// only load one file
switch logidx {
case storage.CompoundStatsType.LogIdx():
bloomFilterFiles = []string{log.GetLogPath()}
logType = storage.CompoundStatsType
break Loop
bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath())
// no stats log to parse, initialize a new BF
if len(bloomFilterFiles) == 0 {
log.Warn("no stats files to load")
return nil, nil
// read historical PK filter
values, err := c.chunkManager.MultiRead(ctx, bloomFilterFiles)
if err != nil {
log.Warn("failed to load bloom filter files", zap.Error(err))
return nil, err
blobs := make([]*Blob, 0)
for i := 0; i < len(values); i++ {
blobs = append(blobs, &Blob{Value: values[i]})
var stats []*storage.PrimaryKeyStats
if logType == storage.CompoundStatsType {
stats, err = storage.DeserializeStatsList(blobs[0])
if err != nil {
log.Warn("failed to deserialize stats list", zap.Error(err))
return nil, err
} else {
stats, err = storage.DeserializeStats(blobs)
if err != nil {
log.Warn("failed to deserialize stats", zap.Error(err))
return nil, err
var size uint
result := make([]*storage.PkStatistics, 0, len(stats))
for _, stat := range stats {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
size += stat.BF.Cap()
result = append(result, pkStat)
log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size))
return result, nil
func (c *ChannelMeta) initPKstats(ctx context.Context, s *Segment, statsBinlogs []*datapb.FieldBinlog, ts Timestamp) error {
stats, err := c.loadStats(ctx, s.segmentID, s.collectionID, statsBinlogs, ts)
if err != nil {
return err
s.historyStats = stats
return nil
func (c *ChannelMeta) RollPKstats(segID UniqueID, stat *storage.PrimaryKeyStats) {
if stat == nil {
log.Warn("sync but no any pk stats", zap.Int64("segmentID", segID))
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
log.Info("roll pk stats", zap.Int64("segmentID", segID))
if ok && seg.notFlushed() {
pkStat := &storage.PkStatistics{
PkFilter: stat.BF,
MinPK: stat.MinPk,
MaxPK: stat.MaxPk,
seg.historyStats = append(seg.historyStats, pkStat)
seg.currentStat = nil
// should not happen at all
if ok {
log.Warn("only growing segment should roll PK stats", zap.Int64("segment", segID), zap.Any("type", seg.sType))
} else {
log.Warn("can not find segment", zap.Int64("segment", segID))
// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
func (c *ChannelMeta) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
defer c.segMu.Unlock()
var result []*datapb.SegmentStartPosition
for id, seg := range c.segments {
if seg.getType() == datapb.SegmentType_New {
result = append(result, &datapb.SegmentStartPosition{
SegmentID: id,
StartPosition: seg.startPos,
return result
// transferNewSegments make new segment transfer to normal segments.
func (c *ChannelMeta) transferNewSegments(segmentIDs []UniqueID) {
defer c.segMu.Unlock()
for _, segmentID := range segmentIDs {
func (c *ChannelMeta) updateSegmentPKRange(segID UniqueID, ids storage.FieldData) {
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
if ok && seg.isValid() {
log.Warn("No match segment to update PK range", zap.Int64("ID", segID))
func (c *ChannelMeta) removeSegments(segIDs ...UniqueID) {
defer c.segMu.Unlock()
log.Info("remove segments if exist", zap.Int64s("segmentIDs", segIDs))
cnt := 0
for _, segID := range segIDs {
if seg, ok := c.segments[segID]; ok {
if seg.notFlushed() {
// free memory
seg.curInsertBuf = nil
seg.curDeleteBuf = nil
seg.historyInsertBuf = nil
seg.historyDeleteBuf = nil
delete(c.segments, segID)
// hasSegment checks whether this channel has a segment according to segment ID.
func (c *ChannelMeta) hasSegment(segID UniqueID, countFlushed bool) bool {
defer c.segMu.RUnlock()
seg, ok := c.segments[segID]
if !ok {
return false
if !seg.isValid() ||
(!countFlushed && seg.getType() == datapb.SegmentType_Flushed) {
return false
return true
// updateStatistics updates the number of rows of a segment in channel.
func (c *ChannelMeta) updateSegmentRowNumber(segID UniqueID, numRows int64) {
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
if ok && seg.notFlushed() {
seg.numRows += numRows
log.Info("updated segment num row", zap.Int64("segmentID", segID),
zap.Int64("addedNumRows", numRows), zap.Int64("numRowsSum", seg.numRows))
log.Warn("update segment num row not exist", zap.Int64("segmentID", segID))
func (c *ChannelMeta) calculateSegmentMemorySize(segID UniqueID) int64 {
var memorySize int64
if ibf, ok := c.getCurInsertBuffer(segID); ok {
memorySize += ibf.memorySize()
if dbf, ok := c.getCurDeleteBuffer(segID); ok {
memorySize += dbf.GetMemorySize()
return memorySize
func (c *ChannelMeta) updateSingleSegmentMemorySize(segID UniqueID) {
memorySize := c.calculateSegmentMemorySize(segID)
defer c.segMu.Unlock()
seg, ok := c.segments[segID]
if ok {
seg.memorySize = memorySize
log.Info("updated segment memorySize", zap.Int64("segmentID", segID),
zap.Int64("memorySize", seg.memorySize))
log.Warn("update segment memorySize not exist", zap.Int64("segmentID", segID))
// getSegmentStatisticsUpdates gives current segment's statistics updates.
func (c *ChannelMeta) getSegmentStatisticsUpdates(segID UniqueID) (*commonpb.SegmentStats, error) {
defer c.segMu.RUnlock()
if seg, ok := c.segments[segID]; ok && seg.isValid() {
return &commonpb.SegmentStats{SegmentID: segID, NumRows: seg.numRows}, nil
return nil, fmt.Errorf("error, there's no segment %d", segID)
func (c *ChannelMeta) getCollectionID() UniqueID {
return c.collectionID
// getCollectionSchema gets collection schema from rootcoord for a certain timestamp.
// If you want the latest collection schema, ts should be 0.
func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
if collID != c.collectionID {
log.Warn("failed to getCollectionSchema, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return nil, merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match")
if c.collSchema == nil {
defer c.schemaMut.Unlock()
if c.collSchema == nil {
sch, err := c.metaService.getCollectionSchema(context.Background(), collID, ts)
if err != nil {
return nil, err
c.collSchema = sch
} else {
defer c.schemaMut.RUnlock()
return c.collSchema, nil
func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, planID UniqueID, compactedFrom []UniqueID) error {
log := log.Ctx(ctx).With(
zap.Int64("segmentID", seg.segmentID),
zap.Int64("collectionID", seg.collectionID),
zap.Int64("partitionID", seg.partitionID),
zap.Int64s("compacted from", compactedFrom),
zap.Int64("planID", planID),
zap.String("channelName", c.channelName))
if seg.collectionID != c.collectionID {
log.Warn("failed to mergeFlushedSegments, collection mismatch",
zap.Int64("current collection ID", seg.collectionID),
zap.Int64("expected collection ID", c.collectionID))
return merr.WrapErrParameterInvalid(c.collectionID, seg.collectionID, "collection not match")
var inValidSegments []UniqueID
for _, ID := range compactedFrom {
// no such segments in channel or the segments are unflushed.
if !c.hasSegment(ID, true) || c.hasSegment(ID, false) {
inValidSegments = append(inValidSegments, ID)
if len(inValidSegments) > 0 {
log.Warn("no match flushed segments to merge from", zap.Int64s("invalid segmentIDs", inValidSegments))
compactedFrom = lo.Without(compactedFrom, inValidSegments...)
log.Info("merge flushed segments")
defer c.segMu.Unlock()
select {
case <-ctx.Done():
log.Warn("the context has been closed", zap.Error(ctx.Err()))
return errors.New("invalid context")
for _, ID := range compactedFrom {
// the existent of the segments are already checked
s := c.segments[ID]
s.compactedTo = seg.segmentID
// release bloom filter
s.currentStat = nil
s.historyStats = nil
// only store segments with numRows > 0
if seg.numRows > 0 {
c.segments[seg.segmentID] = seg
return nil
// for tests only
func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, numOfRows int64, ids storage.FieldData) error {
if collID != c.collectionID {
log.Warn("failed to addFlushedSegmentWithPKs, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match")
log.Info("Add Flushed segment",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partID),
zap.String("channelName", c.channelName),
seg := &Segment{
collectionID: collID,
partitionID: partID,
segmentID: segID,
numRows: numOfRows,
c.segments[segID] = seg
return nil
func (c *ChannelMeta) listAllSegmentIDs() []UniqueID {
defer c.segMu.RUnlock()
var segIDs []UniqueID
for _, seg := range c.segments {
if seg.isValid() {
segIDs = append(segIDs, seg.segmentID)
return segIDs
func (c *ChannelMeta) listPartitionSegments(partID UniqueID) []UniqueID {
defer c.segMu.RUnlock()
var segIDs []UniqueID
for _, seg := range c.segments {
if seg.isValid() && seg.partitionID == partID {
segIDs = append(segIDs, seg.segmentID)
return segIDs
func (c *ChannelMeta) listNotFlushedSegmentIDs() []UniqueID {
defer c.segMu.RUnlock()
var segIDs []UniqueID
for sID, seg := range c.segments {
if seg.notFlushed() {
segIDs = append(segIDs, sID)
return segIDs
func (c *ChannelMeta) getChannelCheckpoint(ttPos *msgpb.MsgPosition) *msgpb.MsgPosition {
log := log.With().WithRateGroup("ChannelMeta", 1, 60)
defer c.segMu.RUnlock()
channelCP := &msgpb.MsgPosition{Timestamp: math.MaxUint64}
// 1. find the earliest startPos in current buffer and history buffer
for _, seg := range c.segments {
if seg.curInsertBuf != nil && seg.curInsertBuf.startPos != nil && seg.curInsertBuf.startPos.Timestamp < channelCP.Timestamp {
channelCP = seg.curInsertBuf.startPos
if seg.curDeleteBuf != nil && seg.curDeleteBuf.startPos != nil && seg.curDeleteBuf.startPos.Timestamp < channelCP.Timestamp {
channelCP = seg.curDeleteBuf.startPos
for _, ib := range seg.historyInsertBuf {
if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < channelCP.Timestamp {
channelCP = ib.startPos
for _, db := range seg.historyDeleteBuf {
if db != nil && db.startPos != nil && db.startPos.Timestamp < channelCP.Timestamp {
channelCP = db.startPos
log.RatedDebug(10, "getChannelCheckpoint for segment", zap.Int64("segmentID", seg.segmentID),
zap.Bool("isCurIBEmpty", seg.curInsertBuf == nil),
zap.Bool("isCurDBEmpty", seg.curDeleteBuf == nil),
zap.Int("len(hisIB)", len(seg.historyInsertBuf)),
zap.Int("len(hisDB)", len(seg.historyDeleteBuf)),
zap.Any("newChannelCpTs", channelCP.GetTimestamp()))
// 2. if no data in buffer, use the current tt as channelCP
if channelCP.MsgID == nil {
channelCP = ttPos
return channelCP
func (c *ChannelMeta) getCurInsertBuffer(segmentID UniqueID) (*BufferData, bool) {
defer c.segMu.RUnlock()
seg, ok := c.segments[segmentID]
if ok {
return seg.curInsertBuf, seg.curInsertBuf != nil
return nil, false
func (c *ChannelMeta) setCurInsertBuffer(segmentID UniqueID, buf *BufferData) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
log.Warn("cannot find segment when setCurInsertBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) rollInsertBuffer(segmentID UniqueID) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
log.Warn("cannot find segment when rollInsertBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) evictHistoryInsertBuffer(segmentID UniqueID, endPos *msgpb.MsgPosition) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
log.Warn("cannot find segment when evictHistoryInsertBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) getCurDeleteBuffer(segmentID UniqueID) (*DelDataBuf, bool) {
defer c.segMu.RUnlock()
seg, ok := c.segments[segmentID]
if ok {
return seg.curDeleteBuf, seg.curDeleteBuf != nil
return nil, false
func (c *ChannelMeta) setCurDeleteBuffer(segmentID UniqueID, buf *DelDataBuf) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
seg.curDeleteBuf = buf
log.Warn("cannot find segment when setCurDeleteBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) rollDeleteBuffer(segmentID UniqueID) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
log.Warn("cannot find segment when rollDeleteBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) evictHistoryDeleteBuffer(segmentID UniqueID, endPos *msgpb.MsgPosition) {
defer c.segMu.Unlock()
seg, ok := c.segments[segmentID]
if ok {
log.Warn("cannot find segment when evictHistoryDeleteBuffer", zap.Int64("segmentID", segmentID))
func (c *ChannelMeta) setIsHighMemory(b bool) {
func (c *ChannelMeta) getIsHighMemory() bool {
return c.isHighMemory.Load()
func (c *ChannelMeta) getTotalMemorySize() int64 {
defer c.segMu.RUnlock()
var res int64
for _, segment := range c.segments {
res += segment.memorySize
return res
func (c *ChannelMeta) getFlushTs() Timestamp {
return c.flushTs.Load()
func (c *ChannelMeta) setFlushTs(ts Timestamp) {
func (c *ChannelMeta) close() {