milvus/internal/flushcommon/writebuffer/insert_buffer.go

164 lines
3.8 KiB
Go

package writebuffer
import (
"math"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
noLimit int64 = -1
)
type BufferBase struct {
rows int64
rowLimit int64
size int64
sizeLimit int64
TimestampFrom typeutil.Timestamp
TimestampTo typeutil.Timestamp
startPos *msgpb.MsgPosition
endPos *msgpb.MsgPosition
}
func (b *BufferBase) UpdateStatistics(entryNum, size int64, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
b.rows += entryNum
b.size += size
if tr.timestampMin < b.TimestampFrom {
b.TimestampFrom = tr.timestampMin
}
if tr.timestampMax > b.TimestampTo {
b.TimestampTo = tr.timestampMax
}
if b.startPos == nil || startPos.Timestamp < b.startPos.Timestamp {
b.startPos = startPos
}
if b.endPos == nil || endPos.Timestamp > b.endPos.Timestamp {
b.endPos = endPos
}
}
func (b *BufferBase) IsFull() bool {
return (b.rowLimit != noLimit && b.rows >= b.rowLimit) ||
(b.sizeLimit != noLimit && b.size >= b.sizeLimit)
}
func (b *BufferBase) IsEmpty() bool {
return b.rows == 0 && b.size == 0
}
func (b *BufferBase) MinTimestamp() typeutil.Timestamp {
if b.startPos == nil {
return math.MaxUint64
}
return b.startPos.GetTimestamp()
}
func (b *BufferBase) GetTimeRange() *TimeRange {
return NewTimeRange(b.TimestampFrom, b.TimestampTo)
}
type InsertBuffer struct {
BufferBase
collSchema *schemapb.CollectionSchema
buffers []*storage.InsertData
statsBuffer *statsBuffer
}
func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
estSize, err := typeutil.EstimateSizePerRecord(sch)
if err != nil {
log.Warn("failed to estimate size per record", zap.Error(err))
return nil, err
}
if estSize == 0 {
return nil, errors.New("Invalid schema")
}
sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
ib := &InsertBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: sizeLimit,
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
collSchema: sch,
}
if len(sch.GetFunctions()) > 0 {
ib.statsBuffer = newStatsBuffer()
}
return ib, nil
}
func (ib *InsertBuffer) buffer(inData *storage.InsertData, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
// buffer := ib.currentBuffer()
// storage.MergeInsertData(buffer.buffer, inData)
ib.buffers = append(ib.buffers, inData)
}
func (ib *InsertBuffer) Yield() []*storage.InsertData {
result := ib.buffers
// set buffer nil to so that fragmented buffer could get GCed
ib.buffers = nil
return result
}
func (ib *InsertBuffer) YieldStats() map[int64]*storage.BM25Stats {
if ib.statsBuffer == nil {
return nil
}
return ib.statsBuffer.yieldBuffer()
}
func (ib *InsertBuffer) Buffer(inData *InsertData, startPos, endPos *msgpb.MsgPosition) int64 {
bufferedSize := int64(0)
for idx, data := range inData.data {
tsData := inData.tsField[idx]
tr := ib.getTimestampRange(tsData)
ib.buffer(data, tr, startPos, endPos)
// update buffer size
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), tr, startPos, endPos)
bufferedSize += int64(data.GetMemorySize())
}
if inData.bm25Stats != nil {
ib.statsBuffer.Buffer(inData.bm25Stats)
}
return bufferedSize
}
func (ib *InsertBuffer) getTimestampRange(tsData *storage.Int64FieldData) TimeRange {
tr := TimeRange{
timestampMin: math.MaxUint64,
timestampMax: 0,
}
for _, data := range tsData.Data {
if uint64(data) < tr.timestampMin {
tr.timestampMin = typeutil.Timestamp(data)
}
if uint64(data) > tr.timestampMax {
tr.timestampMax = typeutil.Timestamp(data)
}
}
return tr
}