Rename SegmentAllocator to SegmentManager (#5559)

Add numRows to segmentStatus and Rename SegmentAllocator to
SegmentManager. Remove SegmentAllocStats.

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/5779/head
sunby 2021-06-03 19:06:33 +08:00 committed by zhenshan.cao
parent 275881dbdb
commit aa8a038305
8 changed files with 167 additions and 394 deletions

View File

@ -46,7 +46,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
resp.Reason = "server is closed"
return resp, nil
}
if err := s.segAllocator.SealAllSegments(ctx, req.CollectionID); err != nil {
if err := s.segmentManager.SealAllSegments(ctx, req.CollectionID); err != nil {
resp.Reason = fmt.Sprintf("Seal all segments error %s", err)
return resp, nil
}
@ -94,7 +94,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
s.cluster.watchIfNeeded(r.ChannelName, r.CollectionID)
segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(ctx,
segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
errMsg := fmt.Sprintf("allocation of collection %d, partition %d, channel %s, count %d error: %s",
@ -325,7 +325,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", meta))
s.segAllocator.DropSegment(ctx, req.SegmentID)
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
resp.ErrorCode = commonpb.ErrorCode_Success

View File

@ -16,7 +16,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/stretchr/testify/assert"
)
@ -209,23 +208,6 @@ func TestMeta_Basic(t *testing.T) {
err = meta.AddSegment(segInfo0)
assert.Nil(t, err)
// update seg1 to 300 rows
stat := &internalpb.SegmentStatisticsUpdates{}
stat.SegmentID = segInfo0.ID
stat.NumRows = rowCount1
err = meta.UpdateSegmentStatistic(stat)
assert.Nil(t, err)
nums, err = meta.GetNumRowsOfCollection(collID)
assert.Nil(t, err)
assert.EqualValues(t, rowCount1, nums)
// check update non-exist segment
stat.SegmentID, err = mockAllocator.allocID()
assert.Nil(t, err)
err = meta.UpdateSegmentStatistic(stat)
assert.NotNil(t, err)
// add seg2 with 300 rows
segID1, err := mockAllocator.allocID()
assert.Nil(t, err)
@ -238,10 +220,10 @@ func TestMeta_Basic(t *testing.T) {
// check partition/collection statistics
nums, err = meta.GetNumRowsOfPartition(collID, partID0)
assert.Nil(t, err)
assert.EqualValues(t, (rowCount1 + rowCount1), nums)
assert.EqualValues(t, (rowCount0 + rowCount1), nums)
nums, err = meta.GetNumRowsOfCollection(collID)
assert.Nil(t, err)
assert.EqualValues(t, (rowCount1 + rowCount1), nums)
assert.EqualValues(t, (rowCount0 + rowCount1), nums)
})
t.Run("Test Invalid", func(t *testing.T) {

View File

@ -58,13 +58,13 @@ func newSealPolicyV1() sealPolicy {
}
type flushPolicy interface {
apply(status *segAllocStatus, t Timestamp) bool
apply(status *segmentStatus, t Timestamp) bool
}
type flushPolicyV1 struct {
}
func (p *flushPolicyV1) apply(status *segAllocStatus, t Timestamp) bool {
func (p *flushPolicyV1) apply(status *segmentStatus, t Timestamp) bool {
return status.sealed && status.lastExpireTime <= t
}

View File

@ -1,153 +0,0 @@
package dataservice
import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
)
type segAllocStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
sealed bool
total int64
insertChannel string
allocations []*allocation
lastExpireTime Timestamp
}
type allocation struct {
rowNums int64
expireTime Timestamp
}
func (s *segAllocStatus) getAllocationSize() int64 {
var totalOfAllocations int64
for _, allocation := range s.allocations {
totalOfAllocations += allocation.rowNums
}
return totalOfAllocations
}
func (s *segAllocStatus) appendAllocation(rowNums int64, expireTime Timestamp) {
alloc := &allocation{
rowNums: rowNums,
expireTime: expireTime,
}
s.lastExpireTime = expireTime
s.allocations = append(s.allocations, alloc)
}
type segAllocStats struct {
meta *meta
stats map[UniqueID]*segAllocStatus //segment id -> status
}
func newAllocStats(meta *meta) *segAllocStats {
s := &segAllocStats{
meta: meta,
stats: make(map[UniqueID]*segAllocStatus),
}
s.loadSegmentsFromMeta()
return s
}
func (s *segAllocStats) loadSegmentsFromMeta() {
// load unflushed segments from meta
segments := s.meta.GetUnFlushedSegments()
for _, seg := range segments {
stat := &segAllocStatus{
id: seg.ID,
collectionID: seg.CollectionID,
partitionID: seg.PartitionID,
total: seg.MaxRowNum,
allocations: []*allocation{},
insertChannel: seg.InsertChannel,
lastExpireTime: seg.LastExpireTime,
sealed: seg.State == commonpb.SegmentState_Sealed,
}
s.stats[seg.ID] = stat
}
}
func (s *segAllocStats) getSegments(collectionID UniqueID, partitionID UniqueID, channelName string) []*segAllocStatus {
ret := make([]*segAllocStatus, 0)
for _, segment := range s.stats {
if segment.sealed || segment.collectionID != collectionID || segment.partitionID != partitionID || segment.insertChannel != channelName {
continue
}
ret = append(ret, segment)
}
return ret
}
func (s *segAllocStats) appendAllocation(segmentID UniqueID, numRows int64, expireTime Timestamp) error {
segStatus := s.stats[segmentID]
segStatus.appendAllocation(numRows, expireTime)
return s.meta.SetLastExpireTime(segStatus.id, expireTime)
}
func (s *segAllocStats) sealSegment(id UniqueID) error {
s.stats[id].sealed = true
return s.meta.SealSegment(id)
}
func (s *segAllocStats) sealSegmentsBy(collectionID UniqueID) error {
for _, status := range s.stats {
if status.collectionID == collectionID {
if status.sealed {
continue
}
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
status.sealed = true
}
}
return nil
}
func (s *segAllocStats) dropSegment(id UniqueID) {
delete(s.stats, id)
}
func (s *segAllocStats) expire(t Timestamp) {
for _, segStatus := range s.stats {
for i := 0; i < len(segStatus.allocations); i++ {
if t < segStatus.allocations[i].expireTime {
continue
}
log.Debug("dataservice::ExpireAllocations: ",
zap.Any("segStatus.id", segStatus.id),
zap.Any("segStatus.allocations.rowNums", segStatus.allocations[i].rowNums))
segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...)
i--
}
}
}
func (s *segAllocStats) getAllSegments() []*segAllocStatus {
ret := make([]*segAllocStatus, 0)
for _, status := range s.stats {
ret = append(ret, status)
}
return ret
}
func (s *segAllocStats) getSegmentBy(id UniqueID) *segAllocStatus {
return s.stats[id]
}
func (s *segAllocStats) addSegment(segment *datapb.SegmentInfo) error {
s.stats[segment.ID] = &segAllocStatus{
id: segment.ID,
collectionID: segment.CollectionID,
partitionID: segment.PartitionID,
sealed: false,
total: segment.MaxRowNum,
insertChannel: segment.InsertChannel,
allocations: []*allocation{},
lastExpireTime: segment.LastExpireTime,
}
return s.meta.AddSegment(segment)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
@ -39,8 +40,8 @@ func (err errRemainInSufficient) Error() string {
return fmt.Sprintf("segment remaining is insufficient for %d", err.requestRows)
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocator interface {
// Manager manage segment related operations.
type Manager interface {
// AllocSegment allocate rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (UniqueID, int64, Timestamp, error)
// DropSegment drop the segment from allocator.
@ -49,14 +50,33 @@ type segmentAllocator interface {
SealAllSegments(ctx context.Context, collectionID UniqueID) error
// GetFlushableSegments return flushable segment ids
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// UpdateSegmentStats update segment status
UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates)
}
type channelSegmentAllocator struct {
mt *meta
mu sync.RWMutex
allocator allocator
helper allocHelper
allocStats *segAllocStats
type segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
sealed bool
total int64
insertChannel string
allocations []*allocation
lastExpireTime Timestamp
currentRows int64
}
type allocation struct {
rowNums int64
expireTime Timestamp
}
type SegmentManager struct {
meta *meta
mu sync.RWMutex
allocator allocator
helper allocHelper
stats map[UniqueID]*segmentStatus //segment id -> status
estimatePolicy calUpperLimitPolicy
allocPolicy allocatePolicy
@ -69,12 +89,12 @@ type allocHelper struct {
}
type allocOption struct {
apply func(alloc *channelSegmentAllocator)
apply func(manager *SegmentManager)
}
func withAllocHelper(helper allocHelper) allocOption {
return allocOption{
apply: func(alloc *channelSegmentAllocator) { alloc.helper = helper },
apply: func(manager *SegmentManager) { manager.helper = helper },
}
}
@ -86,25 +106,25 @@ func defaultAllocHelper() allocHelper {
func withCalUpperLimitPolicy(policy calUpperLimitPolicy) allocOption {
return allocOption{
apply: func(alloc *channelSegmentAllocator) { alloc.estimatePolicy = policy },
apply: func(manager *SegmentManager) { manager.estimatePolicy = policy },
}
}
func withAllocPolicy(policy allocatePolicy) allocOption {
return allocOption{
apply: func(alloc *channelSegmentAllocator) { alloc.allocPolicy = policy },
apply: func(manager *SegmentManager) { manager.allocPolicy = policy },
}
}
func withSealPolicy(policy sealPolicy) allocOption {
return allocOption{
apply: func(alloc *channelSegmentAllocator) { alloc.sealPolicy = policy },
apply: func(manager *SegmentManager) { manager.sealPolicy = policy },
}
}
func withFlushPolicy(policy flushPolicy) allocOption {
return allocOption{
apply: func(alloc *channelSegmentAllocator) { alloc.flushPolicy = policy },
apply: func(manager *SegmentManager) { manager.flushPolicy = policy },
}
}
@ -124,12 +144,12 @@ func defaultFlushPolicy() flushPolicy {
return newFlushPolicyV1()
}
func newSegmentAllocator(meta *meta, allocator allocator, opts ...allocOption) *channelSegmentAllocator {
alloc := &channelSegmentAllocator{
mt: meta,
allocator: allocator,
helper: defaultAllocHelper(),
allocStats: newAllocStats(meta),
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager {
manager := &SegmentManager{
meta: meta,
allocator: allocator,
helper: defaultAllocHelper(),
stats: make(map[UniqueID]*segmentStatus),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAlocatePolicy(),
@ -137,12 +157,30 @@ func newSegmentAllocator(meta *meta, allocator allocator, opts ...allocOption) *
flushPolicy: defaultFlushPolicy(),
}
for _, opt := range opts {
opt.apply(alloc)
opt.apply(manager)
}
return alloc
manager.loadSegmentsFromMeta()
return manager
}
func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
func (s *SegmentManager) loadSegmentsFromMeta() {
// load unflushed segments from meta
segments := s.meta.GetUnFlushedSegments()
for _, seg := range segments {
stat := &segmentStatus{
id: seg.ID,
collectionID: seg.CollectionID,
partitionID: seg.PartitionID,
total: seg.MaxRowNum,
allocations: []*allocation{},
insertChannel: seg.InsertChannel,
lastExpireTime: seg.LastExpireTime,
sealed: seg.State == commonpb.SegmentState_Sealed,
}
s.stats[seg.ID] = stat
}
}
func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) (segID UniqueID, retCount int64, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
@ -150,9 +188,12 @@ func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID
defer s.mu.Unlock()
var success bool
var status *segAllocStatus
segments := s.allocStats.getSegments(collectionID, partitionID, channelName)
for _, segStatus := range segments {
var status *segmentStatus
for _, segStatus := range s.stats {
if segStatus.sealed || segStatus.collectionID != collectionID ||
segStatus.partitionID != partitionID || segStatus.insertChannel != channelName {
continue
}
success, err = s.alloc(segStatus, requestRows)
if err != nil {
return
@ -185,13 +226,12 @@ func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID
return
}
func (s *channelSegmentAllocator) alloc(segStatus *segAllocStatus, numRows int64) (bool, error) {
info, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, error) {
var allocSize int64
for _, allocation := range segStatus.allocations {
allocSize += allocation.rowNums
}
allocSize := segStatus.getAllocationSize()
if !s.allocPolicy.apply(segStatus.total, info.NumRows, allocSize, numRows) {
if !s.allocPolicy.apply(segStatus.total, segStatus.currentRows, allocSize, numRows) {
return false, nil
}
@ -199,14 +239,21 @@ func (s *channelSegmentAllocator) alloc(segStatus *segAllocStatus, numRows int64
if err != nil {
return false, err
}
if err := s.allocStats.appendAllocation(segStatus.id, numRows, expireTs); err != nil {
alloc := &allocation{
rowNums: numRows,
expireTime: expireTs,
}
segStatus.lastExpireTime = expireTs
segStatus.allocations = append(segStatus.allocations, alloc)
if err := s.meta.SetLastExpireTime(segStatus.id, expireTs); err != nil {
return false, err
}
return true, nil
}
func (s *channelSegmentAllocator) genExpireTs() (Timestamp, error) {
func (s *SegmentManager) genExpireTs() (Timestamp, error) {
ts, err := s.allocator.allocTimestamp()
if err != nil {
return 0, err
@ -217,7 +264,7 @@ func (s *channelSegmentAllocator) genExpireTs() (Timestamp, error) {
return expireTs, nil
}
func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segAllocStatus, error) {
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
@ -228,6 +275,19 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection
if err != nil {
return nil, err
}
segStatus := &segmentStatus{
id: id,
collectionID: collectionID,
partitionID: partitionID,
sealed: false,
total: int64(totalRows),
insertChannel: channelName,
allocations: []*allocation{},
lastExpireTime: 0,
currentRows: 0,
}
s.stats[id] = segStatus
segmentInfo := &datapb.SegmentInfo{
ID: id,
CollectionID: collectionID,
@ -239,7 +299,7 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection
LastExpireTime: 0,
}
if err := s.allocStats.addSegment(segmentInfo); err != nil {
if err := s.meta.AddSegment(segmentInfo); err != nil {
return nil, err
}
@ -250,35 +310,45 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection
s.helper.afterCreateSegment(segmentInfo)
return s.allocStats.getSegmentBy(segmentInfo.ID), nil
return segStatus, nil
}
func (s *channelSegmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.mt.GetCollection(collectionID)
func (s *SegmentManager) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := s.meta.GetCollection(collectionID)
if err != nil {
return -1, err
}
return s.estimatePolicy.apply(collMeta.Schema)
}
func (s *channelSegmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
s.allocStats.dropSegment(segmentID)
delete(s.stats, segmentID)
}
func (s *channelSegmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) error {
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
s.allocStats.sealSegmentsBy(collectionID)
for _, status := range s.stats {
if status.collectionID == collectionID {
if status.sealed {
continue
}
if err := s.meta.SealSegment(status.id); err != nil {
return err
}
status.sealed = true
}
}
return nil
}
func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, channel string,
func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string,
t Timestamp) ([]UniqueID, error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -289,8 +359,7 @@ func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, chan
}
ret := make([]UniqueID, 0)
segments := s.allocStats.getAllSegments()
for _, segStatus := range segments {
for _, segStatus := range s.stats {
if segStatus.insertChannel != channel {
continue
}
@ -302,9 +371,18 @@ func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, chan
return ret, nil
}
func (s *channelSegmentAllocator) tryToSealSegment() error {
segments := s.allocStats.getAllSegments()
for _, segStatus := range segments {
func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates) {
s.mu.Lock()
defer s.mu.Unlock()
segment, ok := s.stats[stat.SegmentID]
if !ok {
return
}
segment.currentRows = stat.NumRows
}
func (s *SegmentManager) tryToSealSegment() error {
for _, segStatus := range s.stats {
if segStatus.sealed {
continue
}
@ -315,31 +393,34 @@ func (s *channelSegmentAllocator) tryToSealSegment() error {
if !sealed {
continue
}
if err := s.allocStats.sealSegment(segStatus.id); err != nil {
if err := s.meta.SealSegment(segStatus.id); err != nil {
return err
}
segStatus.sealed = true
}
return nil
}
func (s *channelSegmentAllocator) checkSegmentSealed(segStatus *segAllocStatus) (bool, error) {
segMeta, err := s.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
func (s *SegmentManager) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
var allocSize int64
for _, allocation := range segStatus.allocations {
allocSize += allocation.rowNums
}
ret := s.sealPolicy.apply(segStatus.total, segMeta.NumRows, segStatus.getAllocationSize())
ret := s.sealPolicy.apply(segStatus.total, segStatus.currentRows, allocSize)
return ret, nil
}
// only for test
func (s *channelSegmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
s.allocStats.sealSegment(segmentID)
if err := s.meta.SealSegment(segmentID); err != nil {
return err
}
s.stats[segmentID].sealed = true
return nil
}

View File

@ -26,7 +26,7 @@ func TestAllocSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segAllocator := newSegmentAllocator(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID()
@ -47,7 +47,7 @@ func TestAllocSegment(t *testing.T) {
{collID, 100, "c1", math.MaxInt64, false},
}
for _, c := range cases {
id, count, expireTime, err := segAllocator.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows)
id, count, expireTime, err := segmentManager.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows)
if c.expectResult {
assert.Nil(t, err)
assert.EqualValues(t, c.requestRows, count)
@ -108,11 +108,11 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
err = meta.AddSegment(flushedSegment)
assert.Nil(t, err)
segAllocator := newSegmentAllocator(meta, mockAllocator)
segments := segAllocator.allocStats.getAllSegments()
segmentManager := newSegmentManager(meta, mockAllocator)
segments := segmentManager.stats
assert.EqualValues(t, 2, len(segments))
assert.NotNil(t, segments[0])
assert.NotNil(t, segments[1])
assert.NotNil(t, segments[2])
}
func TestSaveSegmentsToMeta(t *testing.T) {
@ -130,12 +130,12 @@ func TestSaveSegmentsToMeta(t *testing.T) {
})
assert.Nil(t, err)
allocator := newSegmentAllocator(meta, mockAllocator)
segID, _, expireTs, err := allocator.AllocSegment(context.Background(), collID, 0, "c1", 1000)
segmentManager := newSegmentManager(meta, mockAllocator)
segID, _, expireTs, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
segStatus := allocator.allocStats.getSegmentBy(segID)
segStatus := segmentManager.stats[segID]
assert.NotNil(t, segStatus)
err = allocator.SealAllSegments(context.Background(), collID)
err = segmentManager.SealAllSegments(context.Background(), collID)
assert.Nil(t, err)
segment, err := meta.GetSegment(segID)

View File

@ -55,7 +55,7 @@ type Server struct {
kvClient *etcdkv.EtcdKV
meta *meta
segmentInfoStream msgstream.MsgStream
segAllocator segmentAllocator
segmentManager Manager
allocator allocator
cluster *cluster
masterClient types.MasterService
@ -135,7 +135,7 @@ func (s *Server) Start() error {
s.allocator = newAllocator(s.masterClient)
s.startSegmentAllocator()
s.startSegmentManager()
if err = s.initFlushMsgStream(); err != nil {
return err
}
@ -187,9 +187,9 @@ func (s *Server) initServiceDiscovery() error {
return nil
}
func (s *Server) startSegmentAllocator() {
func (s *Server) startSegmentManager() {
helper := createNewSegmentHelper(s.segmentInfoStream)
s.segAllocator = newSegmentAllocator(s.meta, s.allocator, withAllocHelper(helper))
s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
}
func (s *Server) initSegmentInfoChannel() error {
@ -260,19 +260,9 @@ func (s *Server) startStatsChannel(ctx context.Context) {
defer s.serverLoopWg.Done()
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeStats)
log.Debug("load last pos of stats channel", zap.Any("pos", pos), zap.Error(err))
if err == nil {
err = statsStream.Seek([]*internalpb.MsgPosition{pos})
if err != nil {
log.Error("Failed to seek to last pos for statsStream",
zap.String("StatisticsChanName", Params.StatisticsChannelName),
zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName),
zap.Error(err))
}
}
log.Debug("dataservce stats stream",
zap.String("channelName", Params.StatisticsChannelName),
zap.String("descriptionName", Params.DataServiceSubscriptionName))
statsStream.Start()
defer statsStream.Close()
for {
@ -293,22 +283,7 @@ func (s *Server) startStatsChannel(ctx context.Context) {
}
ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
for _, stat := range ssMsg.SegStats {
if err := s.meta.UpdateSegmentStatistic(stat); err != nil {
log.Error("handle segment stat error",
zap.Int64("segmentID", stat.SegmentID),
zap.Error(err))
continue
}
}
if ssMsg.MsgPosition != nil {
err := s.storeStreamPos(streamTypeStats, ssMsg.MsgPosition)
if err != nil {
log.Error("Fail to store current success pos for Stats stream",
zap.Stringer("pos", ssMsg.MsgPosition),
zap.Error(err))
}
} else {
log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID()))
s.segmentManager.UpdateSegmentStats(stat)
}
}
}
@ -349,7 +324,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
segments, err := s.segAllocator.GetFlushableSegments(ctx, ch, ts)
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
continue

View File

@ -13,12 +13,10 @@ package dataservice
import (
"context"
"math"
"math/rand"
"path"
"testing"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -29,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)
func TestGetSegmentInfoChannel(t *testing.T) {
@ -167,7 +164,7 @@ func TestFlush(t *testing.T) {
Partitions: []int64{},
})
assert.Nil(t, err)
segID, _, expireTs, err := svr.segAllocator.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
segID, _, expireTs, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1)
assert.Nil(t, err)
resp, err := svr.Flush(context.TODO(), &datapb.FlushRequest{
Base: &commonpb.MsgBase{
@ -181,7 +178,7 @@ func TestFlush(t *testing.T) {
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode)
ids, err := svr.segAllocator.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, segID, ids[0])
@ -649,115 +646,6 @@ func TestDataNodeTtChannel(t *testing.T) {
}
func TestResumeChannel(t *testing.T) {
Params.Init()
segmentIDs := make([]int64, 0, 1000)
t.Run("Prepare Resume test set", func(t *testing.T) {
svr := newTestServer(t, nil)
defer svr.Stop()
i := int64(-1)
cnt := 0
for ; cnt < 1000; i-- {
svr.meta.RLock()
_, has := svr.meta.segments[i]
svr.meta.RUnlock()
if has {
continue
}
err := svr.meta.AddSegment(&datapb.SegmentInfo{
ID: i,
CollectionID: -1,
})
assert.Nil(t, err)
segmentIDs = append(segmentIDs, i)
cnt++
}
})
t.Run("Test ResumeSegmentStatsChannel", func(t *testing.T) {
svr := newTestServer(t, nil)
segRows := rand.Int63n(1000)
statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
statsStream.AsProducer([]string{Params.StatisticsChannelName})
statsStream.Start()
defer statsStream.Close()
genMsg := func(msgType commonpb.MsgType, t Timestamp, stats *internalpb.SegmentStatisticsUpdates) *msgstream.SegmentStatisticsMsg {
return &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentStatistics: internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegStats: []*internalpb.SegmentStatisticsUpdates{stats},
},
}
}
ch := make(chan struct{})
go func() {
for _, segID := range segmentIDs {
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
NumRows: segRows,
}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, uint64(time.Now().Unix()), stats))
err := statsStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Millisecond * 5)
}
ch <- struct{}{}
}()
time.Sleep(time.Second)
svr.Stop()
time.Sleep(time.Millisecond * 50)
svr = newTestServer(t, nil)
defer svr.Stop()
<-ch
//wait for Server processing last messages
time.Sleep(time.Second)
svr.meta.RLock()
defer svr.meta.RUnlock()
for _, segID := range segmentIDs {
seg, has := svr.meta.segments[segID]
log.Debug("check segment in meta", zap.Any("id", seg.ID), zap.Any("has", has))
assert.True(t, has)
if has {
log.Debug("compare num rows", zap.Any("id", seg.ID), zap.Any("expected", segRows), zap.Any("actual", seg.NumRows))
assert.Equal(t, segRows, seg.NumRows)
}
}
})
t.Run("Clean up test segments", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
var err error
for _, segID := range segmentIDs {
err = svr.meta.DropSegment(segID)
assert.Nil(t, err)
}
})
}
func TestGetVChannelPos(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)