Add a seal policy which restrict the lifetime of a segment (#7172)

issue: #7164
Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/7197/head
sunby 2021-08-20 15:42:12 +08:00 committed by GitHub
parent 9b7c782016
commit 6e34f4c7f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 23 deletions

View File

@ -21,12 +21,6 @@ import (
"go.uber.org/zap"
)
type clusterDeltaChange struct {
newNodes []string
offlines []string
restarts []string
}
// data node register func, simple func wrapping policy
type dataNodeRegisterPolicy func(cluster []*NodeInfo, session *NodeInfo, buffer []*datapb.ChannelStatus) ([]*NodeInfo, []*datapb.ChannelStatus)

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -81,9 +82,6 @@ type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool
// segmentSealPolicy seal policy applies to segment
type segmentSealPolicy func(segment *SegmentInfo, ts Timestamp) bool
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
// getSegmentCapacityPolicy get segmentSealPolicy with segment size factor policy
func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
@ -96,12 +94,18 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy {
}
// getLastExpiresLifetimePolicy get segmentSealPolicy with lifetime limit compares ts - segment.lastExpireTime
func getLastExpiresLifetimePolicy(lifetime uint64) segmentSealPolicy {
func sealByLifetimePolicy(lifetime time.Duration) segmentSealPolicy {
return func(segment *SegmentInfo, ts Timestamp) bool {
return (ts - segment.GetLastExpireTime()) > lifetime
pts, _ := tsoutil.ParseTS(ts)
epts, _ := tsoutil.ParseTS(segment.GetLastExpireTime())
d := pts.Sub(epts)
return d >= lifetime
}
}
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
// getChannelCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
@ -121,10 +125,6 @@ func sortSegmentsByLastExpires(segs []*SegmentInfo) {
})
}
func sealPolicyV1(maxCount, writtenCount, allocatedCount int64) bool {
return float64(writtenCount) >= Params.SegmentSealProportion*float64(maxCount)
}
type flushPolicy func(segment *SegmentInfo, t Timestamp) bool
const flushInterval = 2 * time.Second

View File

@ -0,0 +1,35 @@
package datacoord
import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/stretchr/testify/assert"
)
func TestSealSegmentPolicy(t *testing.T) {
t.Run("test seal segment by lifetime", func(t *testing.T) {
lifetime := 2 * time.Second
now := time.Now()
curTS := now.UnixNano() / int64(time.Millisecond)
nosealTs := (now.Add(lifetime / 2)).UnixNano() / int64(time.Millisecond)
sealTs := (now.Add(lifetime)).UnixNano() / int64(time.Millisecond)
p := sealByLifetimePolicy(lifetime)
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
LastExpireTime: tsoutil.ComposeTS(curTS, 0),
},
}
shouldSeal := p(segment, tsoutil.ComposeTS(nosealTs, 0))
assert.False(t, shouldSeal)
shouldSeal = p(segment, tsoutil.ComposeTS(sealTs, 0))
assert.True(t, shouldSeal)
})
}

View File

@ -29,6 +29,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
)
const segmentMaxLifetime = 24 * time.Hour
// Manager manage segment related operations.
type Manager interface {
// AllocSegment allocate rows and record the allocation.
@ -133,12 +135,11 @@ func defaultAlocatePolicy() AllocatePolicy {
return AllocatePolicyV1
}
func defaultSealPolicy() sealPolicy {
return sealPolicyV1
}
func defaultSegmentSealPolicy() segmentSealPolicy {
return getSegmentCapacityPolicy(Params.SegmentSealProportion)
func defaultSegmentSealPolicy() []segmentSealPolicy {
return []segmentSealPolicy{
sealByLifetimePolicy(segmentMaxLifetime),
getSegmentCapacityPolicy(Params.SegmentSealProportion),
}
}
func defaultFlushPolicy() flushPolicy {
@ -154,8 +155,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
segments: make([]UniqueID, 0),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAlocatePolicy(),
segmentSealPolicies: []segmentSealPolicy{defaultSegmentSealPolicy()}, // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
allocPool: sync.Pool{
New: func() interface{} {