mirror of https://github.com/milvus-io/milvus.git
123 lines
2.8 KiB
Go
123 lines
2.8 KiB
Go
package writebuffer
|
|
|
|
import (
|
|
"math"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
|
)
|
|
|
|
type segmentBuffer struct {
|
|
segmentID int64
|
|
|
|
insertBuffer *InsertBuffer
|
|
deltaBuffer *DeltaBuffer
|
|
}
|
|
|
|
func newSegmentBuffer(segmentID int64, collSchema *schemapb.CollectionSchema) (*segmentBuffer, error) {
|
|
insertBuffer, err := NewInsertBuffer(collSchema)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &segmentBuffer{
|
|
segmentID: segmentID,
|
|
insertBuffer: insertBuffer,
|
|
deltaBuffer: NewDeltaBuffer(),
|
|
}, nil
|
|
}
|
|
|
|
func (buf *segmentBuffer) IsFull() bool {
|
|
return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull()
|
|
}
|
|
|
|
func (buf *segmentBuffer) Yield() (insert []*storage.InsertData, bm25stats map[int64]*storage.BM25Stats, delete *storage.DeleteData) {
|
|
insert = buf.insertBuffer.Yield()
|
|
bm25stats = buf.insertBuffer.YieldStats()
|
|
delete = buf.deltaBuffer.Yield()
|
|
return
|
|
}
|
|
|
|
func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp {
|
|
insertTs := buf.insertBuffer.MinTimestamp()
|
|
deltaTs := buf.deltaBuffer.MinTimestamp()
|
|
|
|
if insertTs < deltaTs {
|
|
return insertTs
|
|
}
|
|
return deltaTs
|
|
}
|
|
|
|
func (buf *segmentBuffer) EarliestPosition() *msgpb.MsgPosition {
|
|
return getEarliestCheckpoint(buf.insertBuffer.startPos, buf.deltaBuffer.startPos)
|
|
}
|
|
|
|
func (buf *segmentBuffer) GetTimeRange() *TimeRange {
|
|
result := &TimeRange{
|
|
timestampMin: math.MaxUint64,
|
|
timestampMax: 0,
|
|
}
|
|
if buf.insertBuffer != nil {
|
|
result.Merge(buf.insertBuffer.GetTimeRange())
|
|
}
|
|
if buf.deltaBuffer != nil {
|
|
result.Merge(buf.deltaBuffer.GetTimeRange())
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// MemorySize returns total memory size of insert buffer & delta buffer.
|
|
func (buf *segmentBuffer) MemorySize() int64 {
|
|
return buf.insertBuffer.size + buf.deltaBuffer.size
|
|
}
|
|
|
|
// TimeRange is a range of timestamp contains the min-timestamp and max-timestamp
|
|
type TimeRange struct {
|
|
timestampMin typeutil.Timestamp
|
|
timestampMax typeutil.Timestamp
|
|
}
|
|
|
|
func NewTimeRange(min, max typeutil.Timestamp) *TimeRange {
|
|
return &TimeRange{
|
|
timestampMin: min,
|
|
timestampMax: max,
|
|
}
|
|
}
|
|
|
|
func (tr *TimeRange) GetMinTimestamp() typeutil.Timestamp {
|
|
return tr.timestampMin
|
|
}
|
|
|
|
func (tr *TimeRange) GetMaxTimestamp() typeutil.Timestamp {
|
|
return tr.timestampMax
|
|
}
|
|
|
|
func (tr *TimeRange) Merge(other *TimeRange) {
|
|
if other.timestampMin < tr.timestampMin {
|
|
tr.timestampMin = other.timestampMin
|
|
}
|
|
if other.timestampMax > tr.timestampMax {
|
|
tr.timestampMax = other.timestampMax
|
|
}
|
|
}
|
|
|
|
func getEarliestCheckpoint(cps ...*msgpb.MsgPosition) *msgpb.MsgPosition {
|
|
var result *msgpb.MsgPosition
|
|
for _, cp := range cps {
|
|
if cp == nil {
|
|
continue
|
|
}
|
|
if result == nil {
|
|
result = cp
|
|
continue
|
|
}
|
|
|
|
if cp.GetTimestamp() < result.GetTimestamp() {
|
|
result = cp
|
|
}
|
|
}
|
|
return result
|
|
}
|