fix: Make FlushTs Sync Policy apply to all buffers (#31839)

See also #30552

FlushTS policy was orignally designed to flushed/L0 segments only, but
in some edge case, new growing segment buffer would by-pass flush
request and hold a buffer before flush ts, which caused flush timeout

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31872/head
congqixia 2024-04-03 11:47:13 +08:00 committed by GitHub
parent ae307af19e
commit 49b8ee4339
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 2 additions and 5 deletions

View File

@ -10,7 +10,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -83,13 +82,11 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
if flushTs != nonFlushTS && ts >= flushTs { if flushTs != nonFlushTS && ts >= flushTs {
// flush segment start pos < flushTs && checkpoint > flushTs // flush segment start pos < flushTs && checkpoint > flushTs
ids := lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { ids := lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
seg, ok := meta.GetSegmentByID(buf.segmentID) _, ok := meta.GetSegmentByID(buf.segmentID)
if !ok { if !ok {
return buf.segmentID, false return buf.segmentID, false
} }
inRange := seg.State() == commonpb.SegmentState_Flushed || return buf.segmentID, buf.MinTimestamp() < flushTs
seg.Level() == datapb.SegmentLevel_L0
return buf.segmentID, inRange && buf.MinTimestamp() < flushTs
}) })
// flush all buffer // flush all buffer