Refactor segment manager (#6441)

issue: #6440
Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/6450/head
sunby 2021-07-12 17:24:25 +08:00 committed by GitHub
parent 239e629e83
commit 83be910baa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 298 additions and 295 deletions

View File

@ -153,7 +153,7 @@ func (s *Server) GetVChanPositions(vchans []vchannel, isAccurate bool) ([]*datap
continue
}
unflushed = append(unflushed, s)
unflushed = append(unflushed, s.SegmentInfo)
if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
useUnflushedPosition = true

View File

@ -275,7 +275,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
resp.Status.Reason = fmt.Sprintf("Failed to get segment %d", id)
return resp, nil
}
infos = append(infos, info)
infos = append(infos, info.SegmentInfo)
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.Infos = infos

View File

@ -58,7 +58,7 @@ func (m *meta) reloadFromKV() error {
if err != nil {
return fmt.Errorf("DataCoord reloadFromKV UnMarshalText datapb.SegmentInfo err:%w", err)
}
m.segments.SetSegment(segmentInfo.GetID(), segmentInfo)
m.segments.SetSegment(segmentInfo.GetID(), NewSegmentInfo(segmentInfo))
}
return nil
@ -93,7 +93,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return ret
}
func (m *meta) AddSegment(segment *datapb.SegmentInfo) error {
func (m *meta) AddSegment(segment *SegmentInfo) error {
m.Lock()
defer m.Unlock()
m.segments.SetSegment(segment.GetID(), segment)
@ -134,7 +134,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
return nil
}
func (m *meta) GetSegment(segID UniqueID) *datapb.SegmentInfo {
func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
return m.segments.GetSegment(segID)
@ -200,10 +200,10 @@ func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool,
return nil
}
func (m *meta) GetSegmentsByChannel(dmlCh string) []*datapb.SegmentInfo {
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
infos := make([]*datapb.SegmentInfo, 0)
infos := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, segment := range segments {
if segment.InsertChannel != dmlCh {
@ -253,10 +253,10 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
return ret
}
func (m *meta) GetUnFlushedSegments() []*datapb.SegmentInfo {
func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
ret := make([]*datapb.SegmentInfo, 0)
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.State != commonpb.SegmentState_Flushing && info.State != commonpb.SegmentState_Flushed {
@ -266,10 +266,10 @@ func (m *meta) GetUnFlushedSegments() []*datapb.SegmentInfo {
return ret
}
func (m *meta) GetFlushingSegments() []*datapb.SegmentInfo {
func (m *meta) GetFlushingSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
ret := make([]*datapb.SegmentInfo, 0)
ret := make([]*SegmentInfo, 0)
segments := m.segments.GetSegments()
for _, info := range segments {
if info.State == commonpb.SegmentState_Flushing {
@ -279,14 +279,36 @@ func (m *meta) GetFlushingSegments() []*datapb.SegmentInfo {
return ret
}
func (m *meta) saveSegmentInfo(segment *datapb.SegmentInfo) error {
segBytes := proto.MarshalTextString(segment)
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
m.Lock()
defer m.Unlock()
m.segments.AddAllocation(segmentID, allocation)
if segInfo := m.segments.GetSegment(segmentID); segInfo != nil {
return m.saveSegmentInfo(segInfo)
}
return nil
}
func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.Lock()
defer m.Unlock()
m.segments.SetAllocations(segmentID, allocations)
}
func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
m.Lock()
defer m.Unlock()
m.segments.SetCurrentRows(segmentID, rows)
}
func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
segBytes := proto.MarshalTextString(segment.SegmentInfo)
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Save(key, segBytes)
}
func (m *meta) removeSegmentInfo(segment *datapb.SegmentInfo) error {
func (m *meta) removeSegmentInfo(segment *SegmentInfo) error {
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Remove(key)
}
@ -307,8 +329,8 @@ func buildPartitionPath(collectionID UniqueID, partitionID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/", segmentPrefix, collectionID, partitionID)
}
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) *SegmentInfo {
info := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
@ -316,4 +338,5 @@ func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueI
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
}
return NewSegmentInfo(info)
}

View File

@ -154,19 +154,21 @@ func TestGetUnFlushedSegments(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
err = meta.AddSegment(&datapb.SegmentInfo{
s1 := &datapb.SegmentInfo{
ID: 0,
CollectionID: 0,
PartitionID: 0,
State: commonpb.SegmentState_Growing,
})
}
err = meta.AddSegment(NewSegmentInfo(s1))
assert.Nil(t, err)
err = meta.AddSegment(&datapb.SegmentInfo{
s2 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
State: commonpb.SegmentState_Flushed,
})
}
err = meta.AddSegment(NewSegmentInfo(s2))
assert.Nil(t, err)
segments := meta.GetUnFlushedSegments()

View File

@ -15,20 +15,13 @@ import (
"sort"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type calUpperLimitPolicy interface {
// apply accept collection schema and return max number of rows per segment
apply(schema *schemapb.CollectionSchema) (int, error)
}
type calUpperLimitPolicy func(schema *schemapb.CollectionSchema) (int, error)
type calBySchemaPolicy struct {
}
func (p *calBySchemaPolicy) apply(schema *schemapb.CollectionSchema) (int, error) {
func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) {
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
return -1, err
@ -37,59 +30,48 @@ func (p *calBySchemaPolicy) apply(schema *schemapb.CollectionSchema) (int, error
return int(threshold / float64(sizePerRecord)), nil
}
func newCalBySchemaPolicy() calUpperLimitPolicy {
return &calBySchemaPolicy{}
}
type allocatePolicy func(segment *SegmentInfo, count int64) bool
type allocatePolicy interface {
apply(maxCount, writtenCount, allocatedCount, count int64) bool
}
type allocatePolicyV1 struct {
}
func (p *allocatePolicyV1) apply(maxCount, writtenCount, allocatedCount, count int64) bool {
free := maxCount - writtenCount - allocatedCount
func allocatePolicyV1(segment *SegmentInfo, count int64) bool {
var allocSize int64
for _, allocation := range segment.allocations {
allocSize += allocation.numOfRows
}
free := segment.GetMaxRowNum() - segment.GetNumOfRows() - allocSize
return free >= count
}
func newAllocatePolicyV1() allocatePolicy {
return &allocatePolicyV1{}
}
type sealPolicy interface {
apply(maxCount, writtenCount, allocatedCount int64) bool
}
type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool
// segmentSealPolicy seal policy applies to segment
type segmentSealPolicy func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool
type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*datapb.SegmentInfo, Timestamp) []*datapb.SegmentInfo
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
return func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool {
return func(segment *SegmentInfo, ts Timestamp) bool {
var allocSize int64
for _, allocation := range status.allocations {
for _, allocation := range segment.allocations {
allocSize += allocation.numOfRows
}
return float64(status.currentRows) >= sizeFactor*float64(info.MaxRowNum)
return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum())
}
}
// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy {
return func(status *segmentStatus, info *datapb.SegmentInfo, ts Timestamp) bool {
return (ts - info.LastExpireTime) > lifetime
return func(segment *SegmentInfo, ts Timestamp) bool {
return (ts - segment.GetLastExpireTime()) > lifetime
}
}
// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*datapb.SegmentInfo, ts Timestamp) []*datapb.SegmentInfo {
return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
if len(segs) <= limit {
return []*datapb.SegmentInfo{}
return []*SegmentInfo{}
}
sortSegmentsByLastExpires(segs)
offLen := len(segs) - limit
@ -98,34 +80,18 @@ func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
}
// sortSegStatusByLastExpires sort segmentStatus with lastExpireTime ascending order
func sortSegmentsByLastExpires(segs []*datapb.SegmentInfo) {
func sortSegmentsByLastExpires(segs []*SegmentInfo) {
sort.Slice(segs, func(i, j int) bool {
return segs[i].LastExpireTime < segs[j].LastExpireTime
})
}
type sealPolicyV1 struct {
}
func (p *sealPolicyV1) apply(maxCount, writtenCount, allocatedCount int64) bool {
func sealPolicyV1(maxCount, writtenCount, allocatedCount int64) bool {
return float64(writtenCount) >= Params.SegmentSealProportion*float64(maxCount)
}
func newSealPolicyV1() sealPolicy {
return &sealPolicyV1{}
}
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
type flushPolicy interface {
apply(info *datapb.SegmentInfo, t Timestamp) bool
}
type flushPolicyV1 struct {
}
func (p *flushPolicyV1) apply(info *datapb.SegmentInfo, t Timestamp) bool {
return info.State == commonpb.SegmentState_Sealed && info.LastExpireTime <= t
}
func newFlushPolicyV1() flushPolicy {
return &flushPolicyV1{}
func flushPolicyV1(segment *SegmentInfo, t Timestamp) bool {
return segment.GetState() == commonpb.SegmentState_Sealed && segment.GetLastExpireTime() <= t
}

View File

@ -8,14 +8,28 @@ import (
)
type SegmentsInfo struct {
segments map[UniqueID]*datapb.SegmentInfo
segments map[UniqueID]*SegmentInfo
}
type SegmentInfo struct {
*datapb.SegmentInfo
currRows int64
allocations []*Allocation
}
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: info,
currRows: 0,
allocations: make([]*Allocation, 0, 16),
}
}
func NewSegmentsInfo() *SegmentsInfo {
return &SegmentsInfo{segments: make(map[UniqueID]*datapb.SegmentInfo)}
return &SegmentsInfo{segments: make(map[UniqueID]*SegmentInfo)}
}
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *datapb.SegmentInfo {
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
segment, ok := s.segments[segmentID]
if !ok {
return nil
@ -23,8 +37,8 @@ func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *datapb.SegmentInfo {
return segment
}
func (s *SegmentsInfo) GetSegments() []*datapb.SegmentInfo {
segments := make([]*datapb.SegmentInfo, 0, len(s.segments))
func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
segments := make([]*SegmentInfo, 0, len(s.segments))
for _, segment := range s.segments {
segments = append(segments, segment)
}
@ -35,7 +49,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
delete(s.segments, segmentID)
}
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *datapb.SegmentInfo) {
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
s.segments[segmentID] = segment
}
@ -69,20 +83,30 @@ func (s *SegmentsInfo) SetStartPosition(segmentID UniqueID, pos *internalpb.MsgP
}
}
func (s *SegmentsInfo) Clone(segment *datapb.SegmentInfo, opts ...SegmentInfoOption) *datapb.SegmentInfo {
dmlPos := proto.Clone(segment.DmlPosition).(*internalpb.MsgPosition)
startPos := proto.Clone(segment.StartPosition).(*internalpb.MsgPosition)
cloned := &datapb.SegmentInfo{
ID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.InsertChannel,
NumOfRows: segment.NumOfRows,
State: segment.State,
DmlPosition: dmlPos,
MaxRowNum: segment.MaxRowNum,
LastExpireTime: segment.LastExpireTime,
StartPosition: startPos,
func (s *SegmentsInfo) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = s.ShadowClone(segment, SetAllocations(allocations))
}
}
func (s *SegmentsInfo) AddAllocation(segmentID UniqueID, allocation *Allocation) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = s.Clone(segment, AddAllocation(allocation))
}
}
func (s *SegmentsInfo) SetCurrentRows(segmentID UniqueID, rows int64) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = s.ShadowClone(segment, SetCurrentRows(rows))
}
}
func (s *SegmentsInfo) Clone(segment *SegmentInfo, opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(segment.SegmentInfo).(*datapb.SegmentInfo)
cloned := &SegmentInfo{
SegmentInfo: info,
currRows: segment.currRows,
allocations: segment.allocations,
}
for _, opt := range opts {
opt(cloned)
@ -90,18 +114,11 @@ func (s *SegmentsInfo) Clone(segment *datapb.SegmentInfo, opts ...SegmentInfoOpt
return cloned
}
func (s *SegmentsInfo) ShadowClone(segment *datapb.SegmentInfo, opts ...SegmentInfoOption) *datapb.SegmentInfo {
cloned := &datapb.SegmentInfo{
ID: segment.ID,
CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID,
InsertChannel: segment.InsertChannel,
NumOfRows: segment.NumOfRows,
State: segment.State,
DmlPosition: segment.DmlPosition,
MaxRowNum: segment.MaxRowNum,
LastExpireTime: segment.LastExpireTime,
StartPosition: segment.StartPosition,
func (s *SegmentsInfo) ShadowClone(segment *SegmentInfo, opts ...SegmentInfoOption) *SegmentInfo {
cloned := &SegmentInfo{
SegmentInfo: segment.SegmentInfo,
currRows: segment.currRows,
allocations: segment.allocations,
}
for _, opt := range opts {
@ -110,34 +127,53 @@ func (s *SegmentsInfo) ShadowClone(segment *datapb.SegmentInfo, opts ...SegmentI
return cloned
}
type SegmentInfoOption func(segment *datapb.SegmentInfo)
type SegmentInfoOption func(segment *SegmentInfo)
func SetRowCount(rowCount int64) SegmentInfoOption {
return func(segment *datapb.SegmentInfo) {
return func(segment *SegmentInfo) {
segment.NumOfRows = rowCount
}
}
func SetExpireTime(expireTs Timestamp) SegmentInfoOption {
return func(segment *datapb.SegmentInfo) {
return func(segment *SegmentInfo) {
segment.LastExpireTime = expireTs
}
}
func SetState(state commonpb.SegmentState) SegmentInfoOption {
return func(segment *datapb.SegmentInfo) {
return func(segment *SegmentInfo) {
segment.State = state
}
}
func SetDmlPositino(pos *internalpb.MsgPosition) SegmentInfoOption {
return func(segment *datapb.SegmentInfo) {
return func(segment *SegmentInfo) {
segment.DmlPosition = pos
}
}
func SetStartPosition(pos *internalpb.MsgPosition) SegmentInfoOption {
return func(segment *datapb.SegmentInfo) {
return func(segment *SegmentInfo) {
segment.StartPosition = pos
}
}
func SetAllocations(allocations []*Allocation) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.allocations = allocations
}
}
func AddAllocation(allocation *Allocation) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.allocations = append(segment.allocations, allocation)
segment.LastExpireTime = allocation.expireTime
}
}
func SetCurrentRows(rows int64) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.currRows = rows
}
}

View File

@ -43,41 +43,29 @@ type Manager interface {
SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
// GetFlushableSegments return flushable segment ids
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// UpdateSegmentStats update segment status
UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates)
// ExpireAllocations notify segment status to expire old allocations
ExpireAllocations(channel string, ts Timestamp) error
}
// segmentStatus stores allocation entries and temporary row count
type segmentStatus struct {
id UniqueID
allocations []*allocation
currentRows int64
}
// allcation entry for segment allocation record
type allocation struct {
// allcation entry for segment Allocation record
type Allocation struct {
numOfRows int64
expireTime Timestamp
}
// SegmentManager handles segment related logic
type SegmentManager struct {
meta *meta
mu sync.RWMutex
allocator allocator
helper allocHelper
stats map[UniqueID]*segmentStatus //segment id -> status
estimatePolicy calUpperLimitPolicy
allocPolicy allocatePolicy
// sealPolicy sealPolicy
meta *meta
mu sync.RWMutex
allocator allocator
helper allocHelper
segments []UniqueID
estimatePolicy calUpperLimitPolicy
allocPolicy allocatePolicy
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
allocPool sync.Pool
allocPool sync.Pool
}
type allocHelper struct {
@ -141,15 +129,15 @@ func withFlushPolicy(policy flushPolicy) allocOption {
}
func defaultCalUpperLimitPolicy() calUpperLimitPolicy {
return newCalBySchemaPolicy()
return calBySchemaPolicy
}
func defaultAlocatePolicy() allocatePolicy {
return newAllocatePolicyV1()
return allocatePolicyV1
}
func defaultSealPolicy() sealPolicy {
return newSealPolicyV1()
return sealPolicyV1
}
func defaultSegmentSealPolicy() segmentSealPolicy {
@ -157,26 +145,24 @@ func defaultSegmentSealPolicy() segmentSealPolicy {
}
func defaultFlushPolicy() flushPolicy {
return newFlushPolicyV1()
return flushPolicyV1
}
// newSegmentManager should be the only way to retrieve SegmentManager
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager {
manager := &SegmentManager{
meta: meta,
allocator: allocator,
helper: defaultAllocHelper(),
stats: make(map[UniqueID]*segmentStatus),
meta: meta,
allocator: allocator,
helper: defaultAllocHelper(),
segments: make([]UniqueID, 0),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAlocatePolicy(),
segmentSealPolicies: []segmentSealPolicy{defaultSegmentSealPolicy()}, // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
allocPool: sync.Pool{
New: func() interface{} {
return &allocation{}
return &Allocation{}
},
},
}
@ -190,37 +176,31 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
func (s *SegmentManager) loadSegmentsFromMeta() {
segments := s.meta.GetUnFlushedSegments()
ids := make([]UniqueID, 0, len(segments))
for _, seg := range segments {
ids = append(ids, seg.ID)
stat := &segmentStatus{
id: seg.ID,
allocations: make([]*allocation, 0, 16),
}
s.stats[seg.ID] = stat
segmentsID := make([]UniqueID, 0, len(segments))
for _, segment := range segments {
segmentsID = append(segmentsID, segment.GetID())
}
log.Debug("Restore segment allocation", zap.Int64s("segments", ids))
s.segments = segmentsID
}
// getAllocation unified way to retrieve allocation struct
func (s *SegmentManager) getAllocation(numOfRows int64, expireTs uint64) *allocation {
func (s *SegmentManager) getAllocation(numOfRows int64) *Allocation {
v := s.allocPool.Get()
if v == nil {
return &allocation{
numOfRows: numOfRows,
expireTime: expireTs,
return &Allocation{
numOfRows: numOfRows,
}
}
a, ok := v.(*allocation)
a, ok := v.(*Allocation)
if !ok {
a = &allocation{}
a = &Allocation{}
}
a.numOfRows, a.expireTime = numOfRows, expireTs
a.numOfRows = numOfRows
return a
}
// putAllocation put allocation for recycling
func (s *SegmentManager) putAllocation(a *allocation) {
func (s *SegmentManager) putAllocation(a *Allocation) {
s.allocPool.Put(a)
}
@ -232,80 +212,73 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
s.mu.Lock()
defer s.mu.Unlock()
var success bool
var status *segmentStatus
var info *datapb.SegmentInfo
for _, segStatus := range s.stats {
info = s.meta.GetSegment(segStatus.id)
if info == nil {
log.Warn("Failed to get seginfo from meta", zap.Int64("id", segStatus.id), zap.Error(err))
var segment *SegmentInfo
var allocation *Allocation
for _, segmentID := range s.segments {
segment = s.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("Failed to get seginfo from meta", zap.Int64("id", segmentID), zap.Error(err))
continue
}
if info.State == commonpb.SegmentState_Sealed || info.CollectionID != collectionID ||
info.PartitionID != partitionID || info.InsertChannel != channelName {
if segment.State == commonpb.SegmentState_Sealed || segment.CollectionID != collectionID ||
segment.PartitionID != partitionID || segment.InsertChannel != channelName {
continue
}
success, err = s.alloc(segStatus, info, requestRows)
allocation, err = s.alloc(segment, requestRows)
if err != nil {
return
}
if success {
status = segStatus
if allocation != nil {
break
}
}
if !success {
status, err = s.openNewSegment(ctx, collectionID, partitionID, channelName)
if allocation == nil {
segment, err = s.openNewSegment(ctx, collectionID, partitionID, channelName)
if err != nil {
return
}
info = s.meta.GetSegment(status.id)
if info == nil {
log.Warn("Failed to get seg into from meta", zap.Int64("id", status.id), zap.Error(err))
segment = s.meta.GetSegment(segment.GetID())
if segment == nil {
log.Warn("Failed to get seg into from meta", zap.Int64("id", segment.GetID()), zap.Error(err))
return
}
success, err = s.alloc(status, info, requestRows)
allocation, err = s.alloc(segment, requestRows)
if err != nil {
return
}
if !success {
if allocation == nil {
err = errRemainInSufficient(requestRows)
return
}
}
segID = status.id
retCount = requestRows
expireTime = info.LastExpireTime
segID = segment.GetID()
retCount = allocation.numOfRows
expireTime = allocation.expireTime
return
}
func (s *SegmentManager) alloc(status *segmentStatus, info *datapb.SegmentInfo, numOfRows int64) (bool, error) {
func (s *SegmentManager) alloc(segment *SegmentInfo, numOfRows int64) (*Allocation, error) {
var allocSize int64
for _, allocItem := range status.allocations {
for _, allocItem := range segment.allocations {
allocSize += allocItem.numOfRows
}
if !s.allocPolicy.apply(info.MaxRowNum, status.currentRows, allocSize, numOfRows) {
return false, nil
if !s.allocPolicy(segment, numOfRows) {
return nil, nil
}
alloc := s.getAllocation(numOfRows)
expireTs, err := s.genExpireTs()
if err != nil {
return false, err
return nil, err
}
alloc := s.getAllocation(numOfRows, expireTs)
alloc.expireTime = expireTs
//safe here since info is a clone, used to pass expireTs out
info.LastExpireTime = expireTs
status.allocations = append(status.allocations, alloc)
if err := s.meta.SetLastExpireTime(status.id, expireTs); err != nil {
return false, err
}
return true, nil
s.meta.AddAllocation(segment.GetID(), alloc)
return alloc, nil
}
func (s *SegmentManager) genExpireTs() (Timestamp, error) {
@ -319,7 +292,7 @@ func (s *SegmentManager) genExpireTs() (Timestamp, error) {
return expireTs, nil
}
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
@ -330,12 +303,6 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
if err != nil {
return nil, err
}
status := &segmentStatus{
id: id,
allocations: make([]*allocation, 0, 16),
currentRows: 0,
}
s.stats[id] = status
segmentInfo := &datapb.SegmentInfo{
ID: id,
@ -353,10 +320,11 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
Timestamp: 0,
},
}
if err := s.meta.AddSegment(segmentInfo); err != nil {
segment := NewSegmentInfo(segmentInfo)
if err := s.meta.AddSegment(segment); err != nil {
return nil, err
}
s.segments = append(s.segments, id)
log.Debug("datacoord: estimateTotalRows: ",
zap.Int64("CollectionID", segmentInfo.CollectionID),
zap.Int64("SegmentID", segmentInfo.ID),
@ -364,7 +332,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
zap.String("Channel", segmentInfo.InsertChannel))
s.helper.afterCreateSegment(segmentInfo)
return status, nil
return segment, nil
}
func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error) {
@ -372,7 +340,7 @@ func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error
if collMeta == nil {
return -1, fmt.Errorf("Failed to get collection %d", collectionID)
}
return s.estimatePolicy.apply(collMeta.Schema)
return s.estimatePolicy(collMeta.Schema)
}
func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
@ -380,13 +348,20 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
stat, ok := s.stats[segmentID]
if ok && stat != nil {
for _, allocation := range stat.allocations {
s.putAllocation(allocation)
for i, id := range s.segments {
if id == segmentID {
s.segments = append(s.segments[:i], s.segments[i+1:]...)
break
}
}
delete(s.stats, segmentID)
segment := s.meta.GetSegment(segmentID)
if segment == nil {
log.Warn("failed to get segment", zap.Int64("id", segmentID))
}
s.meta.SetAllocations(segmentID, []*Allocation{})
for _, allocation := range segment.allocations {
s.putAllocation(allocation)
}
}
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
@ -395,23 +370,23 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
s.mu.Lock()
defer s.mu.Unlock()
ret := make([]UniqueID, 0)
for _, status := range s.stats {
info := s.meta.GetSegment(status.id)
for _, id := range s.segments {
info := s.meta.GetSegment(id)
if info == nil {
log.Warn("Failed to get seg info from meta", zap.Int64("id", status.id))
log.Warn("Failed to get seg info from meta", zap.Int64("id", id))
continue
}
if info.CollectionID != collectionID {
continue
}
if info.State == commonpb.SegmentState_Sealed {
ret = append(ret, status.id)
ret = append(ret, id)
continue
}
if err := s.meta.SetState(status.id, commonpb.SegmentState_Sealed); err != nil {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
ret = append(ret, status.id)
ret = append(ret, id)
}
return ret, nil
}
@ -426,80 +401,58 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin
return nil, err
}
segments := s.meta.GetSegmentsByChannel(channel)
mIDSegment := make(map[UniqueID]*datapb.SegmentInfo)
for _, segment := range segments {
mIDSegment[segment.ID] = segment
}
ret := make([]UniqueID, 0, len(segments))
for _, status := range s.stats {
info, has := mIDSegment[status.id]
if !has {
ret := make([]UniqueID, 0, len(s.segments))
for _, id := range s.segments {
info := s.meta.GetSegment(id)
if info == nil {
continue
}
if s.flushPolicy.apply(info, t) {
ret = append(ret, status.id)
if s.flushPolicy(info, t) {
ret = append(ret, id)
}
}
return ret, nil
}
// UpdateSegmentStats update number of rows in memory
func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates) {
s.mu.Lock()
defer s.mu.Unlock()
segment, ok := s.stats[stat.SegmentID]
if !ok {
return
}
segment.currentRows = stat.NumRows
}
// ExpireAllocations notify segment status to expire old allocations
func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
s.mu.Lock()
defer s.mu.Unlock()
segments := s.meta.GetSegmentsByChannel(channel)
mIDSeg := make(map[UniqueID]struct{})
for _, segment := range segments {
mIDSeg[segment.ID] = struct{}{}
}
for _, status := range s.stats {
_, ok := mIDSeg[status.id]
if !ok {
for _, id := range s.segments {
segment := s.meta.GetSegment(id)
if segment == nil {
continue
}
for i := 0; i < len(status.allocations); i++ {
if status.allocations[i].expireTime <= ts {
a := status.allocations[i]
status.allocations = append(status.allocations[:i], status.allocations[i+1:]...)
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].expireTime <= ts {
a := segment.allocations[i]
segment.allocations = append(segment.allocations[:i], segment.allocations[i+1:]...)
s.putAllocation(a)
}
}
s.meta.SetAllocations(segment.GetID(), segment.allocations)
}
return nil
}
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp) error {
channelInfo := make(map[string][]*datapb.SegmentInfo)
mIDSegment := make(map[UniqueID]*datapb.SegmentInfo)
for _, status := range s.stats {
info := s.meta.GetSegment(status.id)
channelInfo := make(map[string][]*SegmentInfo)
for _, id := range s.segments {
info := s.meta.GetSegment(id)
if info == nil {
log.Warn("Failed to get seg info from meta", zap.Int64("id", status.id))
log.Warn("Failed to get seg info from meta", zap.Int64("id", id))
continue
}
mIDSegment[status.id] = info
channelInfo[info.InsertChannel] = append(channelInfo[info.InsertChannel], info)
if info.State == commonpb.SegmentState_Sealed {
continue
}
// change shouldSeal to segment seal policy logic
for _, policy := range s.segmentSealPolicies {
if policy(status, info, ts) {
if err := s.meta.SetState(status.id, commonpb.SegmentState_Sealed); err != nil {
if policy(info, ts) {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return err
}
break

View File

@ -93,18 +93,16 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
MaxRowNum: 100,
LastExpireTime: 1000,
}
err = meta.AddSegment(sealedSegment)
err = meta.AddSegment(NewSegmentInfo(sealedSegment))
assert.Nil(t, err)
err = meta.AddSegment(growingSegment)
err = meta.AddSegment(NewSegmentInfo(growingSegment))
assert.Nil(t, err)
err = meta.AddSegment(flushedSegment)
err = meta.AddSegment(NewSegmentInfo(flushedSegment))
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
segments := segmentManager.stats
segments := segmentManager.segments
assert.EqualValues(t, 2, len(segments))
assert.NotNil(t, segments[1])
assert.NotNil(t, segments[2])
}
func TestSaveSegmentsToMeta(t *testing.T) {
@ -120,8 +118,6 @@ func TestSaveSegmentsToMeta(t *testing.T) {
segmentManager := newSegmentManager(meta, mockAllocator)
segID, _, expireTs, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
segStatus := segmentManager.stats[segID]
assert.NotNil(t, segStatus)
_, err = segmentManager.SealAllSegments(context.Background(), collID)
assert.Nil(t, err)
segment := meta.GetSegment(segID)
@ -129,3 +125,24 @@ func TestSaveSegmentsToMeta(t *testing.T) {
assert.EqualValues(t, segment.LastExpireTime, expireTs)
assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State)
}
func TestDropSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
segID, _, _, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
segment := meta.GetSegment(segID)
assert.NotNil(t, segment)
segmentManager.DropSegment(context.Background(), segID)
segment = meta.GetSegment(segID)
assert.NotNil(t, segment)
}

View File

@ -280,7 +280,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
log.Debug("Receive DataNode segment statistics update")
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
s.segmentManager.UpdateSegmentStats(stat)
s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
}
}
}
@ -341,7 +341,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
zap.Error(err))
continue
}
segmentInfos = append(segmentInfos, sInfo)
segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
}
if len(segmentInfos) > 0 {
s.cluster.Flush(segmentInfos)
@ -427,7 +427,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
},
Segment: segment,
Segment: segment.SegmentInfo,
}
resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
if err = VerifyResponse(resp, err); err != nil {

View File

@ -112,7 +112,7 @@ func TestFlush(t *testing.T) {
svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0, Schema: schema, Partitions: []int64{}})
segID, _, expireTs, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
assert.Nil(t, err)
resp, err := svr.Flush(context.TODO(), &datapb.FlushRequest{
req := &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Flush,
MsgID: 0,
@ -121,7 +121,8 @@ func TestFlush(t *testing.T) {
},
DbID: 0,
CollectionID: 0,
})
}
resp, err := svr.Flush(context.TODO(), req)
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
@ -182,7 +183,7 @@ func TestGetStatisticsChannel(t *testing.T) {
func TestGetSegmentStates(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
err := svr.meta.AddSegment(&datapb.SegmentInfo{
segment := &datapb.SegmentInfo{
ID: 1000,
CollectionID: 100,
PartitionID: 0,
@ -195,7 +196,8 @@ func TestGetSegmentStates(t *testing.T) {
MsgGroup: "",
Timestamp: 0,
},
})
}
err := svr.meta.AddSegment(NewSegmentInfo(segment))
assert.Nil(t, err)
cases := []struct {
@ -261,7 +263,7 @@ func TestGetSegmentInfo(t *testing.T) {
segInfo := &datapb.SegmentInfo{
ID: 0,
}
svr.meta.AddSegment(segInfo)
svr.meta.AddSegment(NewSegmentInfo(segInfo))
req := &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{0},
@ -282,7 +284,7 @@ func TestChannel(t *testing.T) {
segInfo := &datapb.SegmentInfo{
ID: segID,
}
svr.meta.AddSegment(segInfo)
svr.meta.AddSegment(NewSegmentInfo(segInfo))
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
@ -382,11 +384,12 @@ func TestSaveBinlogPaths(t *testing.T) {
{3, 1, 1},
}
for _, segment := range segments {
err := svr.meta.AddSegment(&datapb.SegmentInfo{
s := &datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
})
}
err := svr.meta.AddSegment(NewSegmentInfo(s))
assert.Nil(t, err)
}
t.Run("Normal SaveRequest", func(t *testing.T) {
@ -566,15 +569,16 @@ func TestGetVChannelPos(t *testing.T) {
ID: 0,
Schema: schema,
})
err := svr.meta.AddSegment(&datapb.SegmentInfo{
s1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
})
}
err := svr.meta.AddSegment(NewSegmentInfo(s1))
assert.Nil(t, err)
err = svr.meta.AddSegment(&datapb.SegmentInfo{
s2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
@ -586,15 +590,17 @@ func TestGetVChannelPos(t *testing.T) {
MsgGroup: "",
Timestamp: 0,
},
})
}
err = svr.meta.AddSegment(NewSegmentInfo(s2))
assert.Nil(t, err)
err = svr.meta.AddSegment(&datapb.SegmentInfo{
s3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
})
}
err = svr.meta.AddSegment(NewSegmentInfo(s3))
assert.Nil(t, err)
t.Run("get unexisted channel", func(t *testing.T) {
@ -675,9 +681,9 @@ func TestGetRecoveryInfo(t *testing.T) {
t.Run("test get largest position of flushed segments as seek position", func(t *testing.T) {
seg1 := createSegment(0, 0, 0, 100, 10, "vchan1", commonpb.SegmentState_Flushed)
seg2 := createSegment(1, 0, 0, 100, 20, "vchan1", commonpb.SegmentState_Flushed)
err := svr.meta.AddSegment(seg1)
err := svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(seg2)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
assert.Nil(t, err)
req := &datapb.GetRecoveryInfoRequest{
@ -696,9 +702,9 @@ func TestGetRecoveryInfo(t *testing.T) {
t.Run("test get recovery of unflushed segments ", func(t *testing.T) {
seg1 := createSegment(3, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Growing)
seg2 := createSegment(4, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Growing)
err := svr.meta.AddSegment(seg1)
err := svr.meta.AddSegment(NewSegmentInfo(seg1))
assert.Nil(t, err)
err = svr.meta.AddSegment(seg2)
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
assert.Nil(t, err)
req := &datapb.GetRecoveryInfoRequest{