remove sync segmentLastExpire every time when assigning(#25271) (#25316) (#25557)

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/25718/head
MrPresent-Han 2023-07-24 14:11:07 +08:00 committed by GitHub
parent fc19b85a40
commit f4e72cb170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 192 additions and 63 deletions

View File

@ -316,7 +316,12 @@ dataCoord:
maxSize: 512 # Maximum size of a segment in MB
diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index
sealProportion: 0.23
assignmentExpiration: 2000 # The time of the assignment expiration in ms
# The time of the assignment expiration in ms
# Warning! this parameter is an expert variable and closely related to data integrity. Without specific
# target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter
# this parameter, make sure that the newly changed value is larger than the previous value used before restart
# otherwise there could be a large possibility of data loss
assignmentExpiration: 2000
maxLife: 86400 # The max lifetime of segment in seconds, 24*60*60
# If a segment didn't accept dml records in maxIdleTime and the size of segment is greater than
# minSizeFromIdleToSealed, Milvus will automatically seal it.

View File

@ -913,24 +913,13 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
curSegInfo := m.segments.GetSegment(segmentID)
if curSegInfo == nil {
// TODO: Error handling.
log.Warn("meta update: add allocation failed - segment not found",
zap.Int64("segmentID", segmentID))
log.Warn("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID))
return nil
}
// Persist segment updates first.
clonedSegment := curSegInfo.Clone(AddAllocation(allocation))
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil {
log.Error("meta update: add allocation failed",
zap.Int64("segmentID", segmentID),
zap.Error(err))
return err
}
}
// Update in-memory meta.
// As we use global segment lastExpire to guarantee data correctness after restart
// there is no need to persist allocation to meta store, only update allocation in-memory meta.
m.segments.AddAllocation(segmentID, allocation)
log.Info("meta update: add allocation - complete",
zap.Int64("segmentID", segmentID))
log.Info("meta update: add allocation - complete", zap.Int64("segmentID", segmentID))
return nil
}
@ -950,6 +939,16 @@ func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
m.segments.SetCurrentRows(segmentID, rows)
}
// SetLastExpire set lastExpire time for segment
// Note that last is not necessary to store in KV meta
func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) {
m.Lock()
defer m.Unlock()
clonedSegment := m.segments.GetSegment(segmentID).Clone()
clonedSegment.LastExpireTime = lastExpire
m.segments.SetSegment(segmentID, clonedSegment)
}
// SetLastFlushTime set LastFlushTime for segment with provided `segmentID`
// Note that lastFlushTime is not persisted in KV store
func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {

View File

@ -29,8 +29,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -113,7 +113,6 @@ type SegmentManager struct {
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
rcc types.RootCoord
}
type allocHelper struct {
@ -198,7 +197,7 @@ func defaultFlushPolicy() flushPolicy {
}
// newSegmentManager should be the only way to retrieve SegmentManager.
func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opts ...allocOption) *SegmentManager {
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*SegmentManager, error) {
manager := &SegmentManager{
meta: meta,
allocator: allocator,
@ -209,13 +208,15 @@ func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opt
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
rcc: rcc,
}
for _, opt := range opts {
opt.apply(manager)
}
manager.loadSegmentsFromMeta()
return manager
if err := manager.maybeResetLastExpireForSegments(); err != nil {
return nil, err
}
return manager, nil
}
// loadSegmentsFromMeta generate corresponding segment status for each segment from meta
@ -228,6 +229,32 @@ func (s *SegmentManager) loadSegmentsFromMeta() {
s.segments = segmentsID
}
func (s *SegmentManager) maybeResetLastExpireForSegments() error {
//for all sealed and growing segments, need to reset last expire
if len(s.segments) > 0 {
var latestTs uint64
allocateErr := retry.Do(context.Background(), func() error {
ts, tryErr := s.genExpireTs(context.Background(), false)
log.Warn("failed to get ts from rootCoord for globalLastExpire", zap.Error(tryErr))
if tryErr != nil {
return tryErr
}
latestTs = ts
return nil
}, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond))
if allocateErr != nil {
log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr))
return errors.New("global max expire ts is unavailable for segment manager")
}
for _, sID := range s.segments {
if segment := s.meta.GetSegment(sID); segment != nil && segment.GetState() == commonpb.SegmentState_Growing {
s.meta.SetLastExpire(sID, latestTs)
}
}
}
return nil
}
// AllocSegment allocate segment per request collcation, partication, channel and rows
func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
@ -308,12 +335,6 @@ func (s *SegmentManager) allocSegmentForImport(ctx context.Context, collectionID
if err != nil {
return nil, err
}
// ReportImport with the new segment so RootCoord can add segment ref lock onto it.
// TODO: This is a hack and will be removed once the whole ImportManager is migrated from RootCoord to DataCoord.
if s.rcc == nil {
log.Error("RootCoord client not set")
return nil, errors.New("RootCoord client not set")
}
allocation.ExpireTime = expireTs
allocation.SegmentID = segment.GetID()

View File

@ -29,9 +29,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
mockkv "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metautil"
)
@ -41,8 +43,7 @@ func TestManagerOptions(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta()
assert.NoError(t, err)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
t.Run("test with alloc helper", func(t *testing.T) {
opt := withAllocHelper(allocHelper{})
opt.apply(segmentManager)
@ -99,10 +100,11 @@ func TestManagerOptions(t *testing.T) {
func TestAllocSegment(t *testing.T) {
ctx := context.Background()
Params.Init()
Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1")
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta()
assert.NoError(t, err)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
@ -121,30 +123,117 @@ func TestAllocSegment(t *testing.T) {
t.Run("allocation fails 1", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
allocIDSucceed: false,
}
segmentManager := newSegmentManager(meta, failsAllocator, nil)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c2", 100)
segmentManager, err := newSegmentManager(meta, failsAllocator)
assert.NoError(t, err)
_, err = segmentManager.AllocSegment(ctx, collID, 100, "c2", 100)
assert.Error(t, err)
})
t.Run("allocation fails 2", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: false,
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator, nil)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
segmentManager, err := newSegmentManager(meta, failsAllocator)
assert.Error(t, err)
assert.Nil(t, segmentManager)
})
}
func TestLastExpireReset(t *testing.T) {
//set up meta on dc
ctx := context.Background()
Params.Init()
Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1")
Params.Save(Params.DataCoordCfg.SegmentMaxSize.Key, "1")
mockAllocator := newRootCoordAllocator(newMockRootCoordService())
etcdCli, _ := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
rootPath := "/test/segment/last/expire"
metaKV := etcdkv.NewEtcdKV(etcdCli, rootPath)
metaKV.RemoveWithPrefix("")
catalog := datacoord.NewCatalog(metaKV, "", "")
meta, err := newMeta(context.TODO(), catalog, nil)
assert.Nil(t, err)
// add collection
channelName := "c1"
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
assert.Nil(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
//assign segments, set max segment to only 1MB, equalling to 10485 rows
var bigRows, smallRows int64 = 10000, 1000
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocs, _ := segmentManager.AllocSegment(context.Background(), collID, 0, channelName, bigRows)
segmentID1, expire1 := allocs[0].SegmentID, allocs[0].ExpireTime
time.Sleep(100 * time.Millisecond)
allocs, _ = segmentManager.AllocSegment(context.Background(), collID, 0, channelName, bigRows)
segmentID2, expire2 := allocs[0].SegmentID, allocs[0].ExpireTime
time.Sleep(100 * time.Millisecond)
allocs, _ = segmentManager.AllocSegment(context.Background(), collID, 0, channelName, smallRows)
segmentID3, expire3 := allocs[0].SegmentID, allocs[0].ExpireTime
//simulate handleTimeTick op on dataCoord
meta.SetCurrentRows(segmentID1, bigRows)
meta.SetCurrentRows(segmentID2, bigRows)
meta.SetCurrentRows(segmentID3, smallRows)
segmentManager.tryToSealSegment(expire1, channelName)
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID1).GetState())
assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID2).GetState())
assert.Equal(t, commonpb.SegmentState_Growing, meta.GetSegment(segmentID3).GetState())
//pretend that dataCoord break down
metaKV.Close()
etcdCli.Close()
//dataCoord restart
newEtcdCli, _ := etcd.GetEtcdClient(Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(), Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(), Params.EtcdCfg.EtcdTLSCACert.GetValue(), Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
newMetaKV := etcdkv.NewEtcdKV(newEtcdCli, rootPath)
defer newMetaKV.RemoveWithPrefix("")
newCatalog := datacoord.NewCatalog(newMetaKV, "", "")
restartedMeta, err := newMeta(context.TODO(), newCatalog, nil)
restartedMeta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
assert.Nil(t, err)
newSegmentManager, _ := newSegmentManager(restartedMeta, mockAllocator)
//reset row number to avoid being cleaned by empty segment
restartedMeta.SetCurrentRows(segmentID1, bigRows)
restartedMeta.SetCurrentRows(segmentID2, bigRows)
restartedMeta.SetCurrentRows(segmentID3, smallRows)
//verify lastExpire of growing and sealed segments
segment1, segment2, segment3 := restartedMeta.GetSegment(segmentID1), restartedMeta.GetSegment(segmentID2), restartedMeta.GetSegment(segmentID3)
//segmentState should not be altered but growing segment's lastExpire has been reset to the latest
assert.Equal(t, commonpb.SegmentState_Sealed, segment1.GetState())
assert.Equal(t, commonpb.SegmentState_Sealed, segment2.GetState())
assert.Equal(t, commonpb.SegmentState_Growing, segment3.GetState())
assert.Equal(t, expire1, segment1.GetLastExpireTime())
assert.Equal(t, expire2, segment2.GetLastExpireTime())
assert.True(t, segment3.GetLastExpireTime() > expire3)
flushableSegIds, _ := newSegmentManager.GetFlushableSegments(context.Background(), channelName, expire3)
assert.ElementsMatch(t, []UniqueID{segmentID1, segmentID2}, flushableSegIds) // segment1 and segment2 can be flushed
newAlloc, err := newSegmentManager.AllocSegment(context.Background(), collID, 0, channelName, 2000)
assert.Nil(t, err)
assert.Equal(t, segmentID3, newAlloc[0].SegmentID) // segment3 still can be used to allocate
}
func TestAllocSegmentForImport(t *testing.T) {
ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta()
assert.NoError(t, err)
ms := newMockRootCoordService()
segmentManager := newSegmentManager(meta, mockAllocator, ms)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
@ -164,7 +253,7 @@ func TestAllocSegmentForImport(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator, ms)
segmentManager, _ := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.Error(t, err)
})
@ -173,13 +262,7 @@ func TestAllocSegmentForImport(t *testing.T) {
failsAllocator := &FailsAllocator{
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator, ms)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.Error(t, err)
})
t.Run("nil RootCoord", func(t *testing.T) {
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.Error(t, err)
})
@ -231,7 +314,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
err = meta.AddSegment(NewSegmentInfo(flushedSegment))
assert.NoError(t, err)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
segments := segmentManager.segments
assert.EqualValues(t, 2, len(segments))
}
@ -246,7 +329,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -268,7 +351,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -290,7 +373,7 @@ func TestDropSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -317,7 +400,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) {
var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) {
return 1, nil
}
segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy))
segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 2, len(allocations))
@ -339,7 +422,7 @@ func TestExpireAllocation(t *testing.T) {
var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) {
return 10000000, nil
}
segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy))
segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
// alloc 100 times and expire
var maxts Timestamp
var id int64 = -1
@ -378,8 +461,7 @@ func TestCleanExpiredBulkloadSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
ms := newMockRootCoordService()
segmentManager := newSegmentManager(meta, mockAllocator, ms)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocation, err := segmentManager.allocSegmentForImport(context.TODO(), collID, 0, "c1", 2, 1)
assert.NoError(t, err)
@ -409,7 +491,7 @@ func TestGetFlushableSegments(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -455,7 +537,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -480,7 +562,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -505,7 +587,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil,
segmentManager, _ := newSegmentManager(meta, mockAllocator,
withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64)),
withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
@ -532,7 +614,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segmentManager, _ := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -619,7 +701,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))
@ -648,7 +730,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.NoError(t, err)
meta.AddCollection(&collectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations))

View File

@ -354,7 +354,10 @@ func (s *Server) initDataCoord() error {
s.createCompactionHandler()
s.createCompactionTrigger()
}
s.initSegmentManager()
if err = s.initSegmentManager(); err != nil {
return err
}
s.initGarbageCollection(storageCli)
s.initIndexBuilder(storageCli)
@ -516,10 +519,15 @@ func (s *Server) initServiceDiscovery() error {
return nil
}
func (s *Server) initSegmentManager() {
func (s *Server) initSegmentManager() error {
if s.segmentManager == nil {
s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient)
manager, err := newSegmentManager(s.meta, s.allocator)
if err != nil {
return err
}
s.segmentManager = manager
}
return nil
}
func (s *Server) initMeta(chunkManager storage.ChunkManager) error {

View File

@ -1919,7 +1919,8 @@ type dataCoordConfig struct {
SegmentMaxSize ParamItem `refreshable:"false"`
DiskSegmentMaxSize ParamItem `refreshable:"true"`
SegmentSealProportion ParamItem `refreshable:"false"`
SegAssignmentExpiration ParamItem `refreshable:"true"`
SegAssignmentExpiration ParamItem `refreshable:"false"`
AllocLatestExpireAttempt ParamItem `refreshable:"true"`
SegmentMaxLifetime ParamItem `refreshable:"false"`
SegmentMaxIdleTime ParamItem `refreshable:"false"`
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
@ -2025,6 +2026,15 @@ func (p *dataCoordConfig) init(base *BaseTable) {
}
p.SegAssignmentExpiration.Init(base.mgr)
p.AllocLatestExpireAttempt = ParamItem{
Key: "dataCoord.segment.allocLatestExpireAttempt",
Version: "2.2.0",
DefaultValue: "200",
Doc: "The time attempting to alloc latest lastExpire from rootCoord after restart",
Export: true,
}
p.AllocLatestExpireAttempt.Init(base.mgr)
p.SegmentMaxLifetime = ParamItem{
Key: "dataCoord.segment.maxLife",
Version: "2.0.0",

View File

@ -109,6 +109,10 @@ func (pi *ParamItem) GetAsInt32() int32 {
return int32(getAsInt64(pi.GetValue()))
}
func (pi *ParamItem) GetAsUint() uint {
return uint(getAsInt64(pi.GetValue()))
}
func (pi *ParamItem) GetAsUint32() uint32 {
return uint32(getAsInt64(pi.GetValue()))
}