enhance: Optimize the performance of stats task (#37374)

1. Increase the writer's `batchSize` to avoid multiple serialization
operations.
2. Perform asynchronous upload of binlog files to prevent blocking the
data processing flow.
3. Reduce multiple calls to `writer.Flush()`.

issue: https://github.com/milvus-io/milvus/issues/37373

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/37524/head
yihao.dai 2024-11-08 10:08:27 +08:00 committed by GitHub
parent bc9562feb1
commit 81879425e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 397 additions and 65 deletions

View File

@ -1280,7 +1280,7 @@ func (t *clusteringCompactionTask) refreshBufferWriterWithPack(buffer *ClusterBu
buffer.currentSegmentRowNum.Store(0)
}
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
if err != nil {
return pack, err
}
@ -1295,7 +1295,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) er
segmentID = buffer.writer.Load().(*SegmentWriter).GetSegmentID()
buffer.bufferMemorySize.Add(int64(buffer.writer.Load().(*SegmentWriter).WrittenMemorySize()))
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
writer, err := NewSegmentWriter(t.plan.GetSchema(), t.plan.MaxSegmentRows, compactionBatchSize, segmentID, t.partitionID, t.collectionID, t.bm25FieldIds)
if err != nil {
return err
}

View File

@ -171,7 +171,7 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
v := storage.Value{
@ -240,7 +240,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {
schema := genCollectionSchemaWithBM25()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID, []int64{102})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{102})
s.Require().NoError(err)
for i := 0; i < 10240; i++ {
@ -453,7 +453,7 @@ func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() {
s.Run("upload failed", func() {
schema := genCollectionSchema()
segWriter, err := NewSegmentWriter(schema, 1000, SegmentID, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, SegmentID, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < 2000; i++ {
v := storage.Value{

View File

@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const compactionBatchSize = 100
func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if ttl <= 0 {

View File

@ -651,7 +651,7 @@ func getRow(magic int64) map[int64]interface{} {
}
func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, magic, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 65535, compactionBatchSize, magic, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := int64(0); i < numRows; i++ {
@ -670,7 +670,7 @@ func (s *MixCompactionTaskSuite) initMultiRowsSegBuffer(magic, numRows, step int
}
func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, magic, PartitionID, CollectionID, []int64{102})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, magic, PartitionID, CollectionID, []int64{102})
s.Require().NoError(err)
v := storage.Value{
@ -686,7 +686,7 @@ func (s *MixCompactionTaskSuite) initSegBufferWithBM25(magic int64) {
}
func (s *MixCompactionTaskSuite) initSegBuffer(size int, seed int64) {
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, seed, PartitionID, CollectionID, []int64{})
segWriter, err := NewSegmentWriter(s.meta.GetSchema(), 100, compactionBatchSize, seed, PartitionID, CollectionID, []int64{})
s.Require().NoError(err)
for i := 0; i < size; i++ {

View File

@ -1,6 +1,18 @@
// SegmentInsertBuffer can be reused to buffer all insert data of one segment
// buffer.Serialize will serialize the InsertBuffer and clear it
// pkstats keeps tracking pkstats of the segment until Finish
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
@ -155,7 +167,7 @@ func (w *MultiSegmentWriter) addNewWriter() error {
if err != nil {
return err
}
writer, err := NewSegmentWriter(w.schema, w.maxRows, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields)
writer, err := NewSegmentWriter(w.schema, w.maxRows, compactionBatchSize, newSegmentID, w.partitionID, w.collectionID, w.bm25Fields)
if err != nil {
return err
}
@ -351,6 +363,7 @@ type SegmentWriter struct {
rowCount *atomic.Int64
syncedSize *atomic.Int64
batchSize int
maxBinlogSize uint64
}
@ -484,8 +497,7 @@ func (w *SegmentWriter) FlushAndIsFull() bool {
return w.writer.WrittenMemorySize() > w.maxBinlogSize
}
func (w *SegmentWriter) FlushAndIsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
w.writer.Flush()
func (w *SegmentWriter) IsFullWithBinlogMaxSize(binLogMaxSize uint64) bool {
return w.writer.WrittenMemorySize() > binLogMaxSize
}
@ -528,15 +540,15 @@ func (w *SegmentWriter) GetTotalSize() int64 {
func (w *SegmentWriter) clear() {
w.syncedSize.Add(int64(w.writer.WrittenMemorySize()))
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch)
writer, closers, _ := newBinlogWriter(w.collectionID, w.partitionID, w.segmentID, w.sch, w.batchSize)
w.writer = writer
w.closers = closers
w.tsFrom = math.MaxUint64
w.tsTo = 0
}
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch)
func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, batchSize int, segID, partID, collID int64, Bm25Fields []int64) (*SegmentWriter, error) {
writer, closers, err := newBinlogWriter(collID, partID, segID, sch, batchSize)
if err != nil {
return nil, err
}
@ -567,6 +579,7 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par
rowCount: atomic.NewInt64(0),
syncedSize: atomic.NewInt64(0),
batchSize: batchSize,
maxBinlogSize: paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}
@ -576,13 +589,13 @@ func NewSegmentWriter(sch *schemapb.CollectionSchema, maxCount int64, segID, par
return &segWriter, nil
}
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema,
func newBinlogWriter(collID, partID, segID int64, schema *schemapb.CollectionSchema, batchSize int,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*storage.Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collID, partID, segID, schema.Fields)
closers = make([]func() (*storage.Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, 100)
writer, err = storage.NewBinlogSerializeWriter(schema, partID, segID, fieldWriters, batchSize)
return
}

View File

@ -0,0 +1,119 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func testSegmentWriterBatchSize(b *testing.B, batchSize int) {
orgLevel := log.GetLevel()
log.SetLevel(zapcore.InfoLevel)
defer log.SetLevel(orgLevel)
paramtable.Init()
const (
dim = 128
numRows = 1000000
)
var (
rId = &schemapb.FieldSchema{FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}
ts = &schemapb.FieldSchema{FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}
pk = &schemapb.FieldSchema{FieldID: 100, Name: "pk", IsPrimaryKey: true, DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "100"}}}
f = &schemapb.FieldSchema{FieldID: 101, Name: "random", DataType: schemapb.DataType_Double}
fVec = &schemapb.FieldSchema{FieldID: 102, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: strconv.Itoa(dim)}}}
)
schema := &schemapb.CollectionSchema{Name: "test-aaa", Fields: []*schemapb.FieldSchema{rId, ts, pk, f, fVec}}
// prepare data values
start := time.Now()
vec := make([]float32, dim)
for j := 0; j < dim; j++ {
vec[j] = rand.Float32()
}
values := make([]*storage.Value, numRows)
for i := 0; i < numRows; i++ {
value := &storage.Value{}
value.Value = make(map[int64]interface{}, len(schema.GetFields()))
m := value.Value.(map[int64]interface{})
for _, field := range schema.GetFields() {
switch field.GetDataType() {
case schemapb.DataType_Int64:
m[field.GetFieldID()] = int64(i)
case schemapb.DataType_VarChar:
k := fmt.Sprintf("test_pk_%d", i)
m[field.GetFieldID()] = k
value.PK = &storage.VarCharPrimaryKey{
Value: k,
}
case schemapb.DataType_Double:
m[field.GetFieldID()] = float64(i)
case schemapb.DataType_FloatVector:
m[field.GetFieldID()] = vec
}
}
value.ID = int64(i)
value.Timestamp = int64(0)
value.IsDeleted = false
value.Value = m
values[i] = value
}
log.Info("prepare data done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start)))
writer, err := NewSegmentWriter(schema, numRows, batchSize, 1, 2, 3, nil)
assert.NoError(b, err)
b.N = 10
b.ResetTimer()
for i := 0; i < b.N; i++ {
start = time.Now()
for _, v := range values {
err = writer.Write(v)
assert.NoError(b, err)
}
log.Info("write done", zap.Int("len", len(values)), zap.Duration("dur", time.Since(start)))
}
b.StopTimer()
}
func Benchmark_SegmentWriter_BatchSize_100(b *testing.B) {
testSegmentWriterBatchSize(b, 100)
}
func Benchmark_SegmentWriter_BatchSize_1000(b *testing.B) {
testSegmentWriterBatchSize(b, 1000)
}
func Benchmark_SegmentWriter_BatchSize_10000(b *testing.B) {
testSegmentWriterBatchSize(b, 10000)
}

View File

@ -44,7 +44,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() {
s.Run("get bm25 field failed", func() {
schema := genCollectionSchemaWithBM25()
// init segment writer with invalid bm25 fieldID
writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{1000})
writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{1000})
s.Require().NoError(err)
v := storage.Value{
@ -59,7 +59,7 @@ func (s *SegmentWriteSuite) TestWriteFailed() {
s.Run("parse bm25 field data failed", func() {
schema := genCollectionSchemaWithBM25()
// init segment writer with wrong field as bm25 sparse field
writer, err := NewSegmentWriter(schema, 1024, 1, s.parititonID, s.collectionID, []int64{101})
writer, err := NewSegmentWriter(schema, 1024, compactionBatchSize, 1, s.parititonID, s.collectionID, []int64{101})
s.Require().NoError(err)
v := storage.Value{

View File

@ -33,7 +33,9 @@ import (
type BinlogIO interface {
Download(ctx context.Context, paths []string) ([][]byte, error)
AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any]
Upload(ctx context.Context, kvs map[string][]byte) error
AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any]
}
type BinlogIoImpl struct {
@ -46,6 +48,18 @@ func NewBinlogIO(cm storage.ChunkManager) BinlogIO {
}
func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) {
futures := b.AsyncDownload(ctx, paths)
err := conc.AwaitAll(futures...)
if err != nil {
return nil, err
}
return lo.Map(futures, func(future *conc.Future[any], _ int) []byte {
return future.Value().([]byte)
}), nil
}
func (b *BinlogIoImpl) AsyncDownload(ctx context.Context, paths []string) []*conc.Future[any] {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Download")
defer span.End()
@ -74,17 +88,15 @@ func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte,
futures = append(futures, future)
}
err := conc.AwaitAll(futures...)
if err != nil {
return nil, err
}
return lo.Map(futures, func(future *conc.Future[any], _ int) []byte {
return future.Value().([]byte)
}), nil
return futures
}
func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error {
futures := b.AsyncUpload(ctx, kvs)
return conc.AwaitAll(futures...)
}
func (b *BinlogIoImpl) AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[any] {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload")
defer span.End()
@ -108,5 +120,5 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error
futures = append(futures, future)
}
return conc.AwaitAll(futures...)
return futures
}

View File

@ -1,10 +1,12 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package io
import (
context "context"
conc "github.com/milvus-io/milvus/pkg/util/conc"
mock "github.com/stretchr/testify/mock"
)
@ -21,10 +23,112 @@ func (_m *MockBinlogIO) EXPECT() *MockBinlogIO_Expecter {
return &MockBinlogIO_Expecter{mock: &_m.Mock}
}
// AsyncDownload provides a mock function with given fields: ctx, paths
func (_m *MockBinlogIO) AsyncDownload(ctx context.Context, paths []string) []*conc.Future[interface{}] {
ret := _m.Called(ctx, paths)
if len(ret) == 0 {
panic("no return value specified for AsyncDownload")
}
var r0 []*conc.Future[interface{}]
if rf, ok := ret.Get(0).(func(context.Context, []string) []*conc.Future[interface{}]); ok {
r0 = rf(ctx, paths)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*conc.Future[interface{}])
}
}
return r0
}
// MockBinlogIO_AsyncDownload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsyncDownload'
type MockBinlogIO_AsyncDownload_Call struct {
*mock.Call
}
// AsyncDownload is a helper method to define mock.On call
// - ctx context.Context
// - paths []string
func (_e *MockBinlogIO_Expecter) AsyncDownload(ctx interface{}, paths interface{}) *MockBinlogIO_AsyncDownload_Call {
return &MockBinlogIO_AsyncDownload_Call{Call: _e.mock.On("AsyncDownload", ctx, paths)}
}
func (_c *MockBinlogIO_AsyncDownload_Call) Run(run func(ctx context.Context, paths []string)) *MockBinlogIO_AsyncDownload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]string))
})
return _c
}
func (_c *MockBinlogIO_AsyncDownload_Call) Return(_a0 []*conc.Future[interface{}]) *MockBinlogIO_AsyncDownload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBinlogIO_AsyncDownload_Call) RunAndReturn(run func(context.Context, []string) []*conc.Future[interface{}]) *MockBinlogIO_AsyncDownload_Call {
_c.Call.Return(run)
return _c
}
// AsyncUpload provides a mock function with given fields: ctx, kvs
func (_m *MockBinlogIO) AsyncUpload(ctx context.Context, kvs map[string][]byte) []*conc.Future[interface{}] {
ret := _m.Called(ctx, kvs)
if len(ret) == 0 {
panic("no return value specified for AsyncUpload")
}
var r0 []*conc.Future[interface{}]
if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) []*conc.Future[interface{}]); ok {
r0 = rf(ctx, kvs)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*conc.Future[interface{}])
}
}
return r0
}
// MockBinlogIO_AsyncUpload_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsyncUpload'
type MockBinlogIO_AsyncUpload_Call struct {
*mock.Call
}
// AsyncUpload is a helper method to define mock.On call
// - ctx context.Context
// - kvs map[string][]byte
func (_e *MockBinlogIO_Expecter) AsyncUpload(ctx interface{}, kvs interface{}) *MockBinlogIO_AsyncUpload_Call {
return &MockBinlogIO_AsyncUpload_Call{Call: _e.mock.On("AsyncUpload", ctx, kvs)}
}
func (_c *MockBinlogIO_AsyncUpload_Call) Run(run func(ctx context.Context, kvs map[string][]byte)) *MockBinlogIO_AsyncUpload_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(map[string][]byte))
})
return _c
}
func (_c *MockBinlogIO_AsyncUpload_Call) Return(_a0 []*conc.Future[interface{}]) *MockBinlogIO_AsyncUpload_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBinlogIO_AsyncUpload_Call) RunAndReturn(run func(context.Context, map[string][]byte) []*conc.Future[interface{}]) *MockBinlogIO_AsyncUpload_Call {
_c.Call.Return(run)
return _c
}
// Download provides a mock function with given fields: ctx, paths
func (_m *MockBinlogIO) Download(ctx context.Context, paths []string) ([][]byte, error) {
ret := _m.Called(ctx, paths)
if len(ret) == 0 {
panic("no return value specified for Download")
}
var r0 [][]byte
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, []string) ([][]byte, error)); ok {
@ -80,6 +184,10 @@ func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []s
func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error {
ret := _m.Called(ctx, kvs)
if len(ret) == 0 {
panic("no return value specified for Upload")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, map[string][]byte) error); ok {
r0 = rf(ctx, kvs)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package syncmgr
@ -25,6 +25,10 @@ func (_m *MockMetaWriter) EXPECT() *MockMetaWriter_Expecter {
func (_m *MockMetaWriter) DropChannel(_a0 context.Context, _a1 string) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DropChannel")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(_a0, _a1)
@ -68,6 +72,10 @@ func (_c *MockMetaWriter_DropChannel_Call) RunAndReturn(run func(context.Context
func (_m *MockMetaWriter) UpdateSync(_a0 context.Context, _a1 *SyncTask) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for UpdateSync")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *SyncTask) error); ok {
r0 = rf(_a0, _a1)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package syncmgr
@ -25,6 +25,10 @@ func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter {
func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) {
ret := _m.Called(ctx, pack)
if len(ret) == 0 {
panic("no return value specified for EncodeBuffer")
}
var r0 Task
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok {

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package syncmgr
@ -26,6 +26,10 @@ func (_m *MockTask) EXPECT() *MockTask_Expecter {
func (_m *MockTask) ChannelName() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for ChannelName")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
@ -67,6 +71,10 @@ func (_c *MockTask_ChannelName_Call) RunAndReturn(run func() string) *MockTask_C
func (_m *MockTask) Checkpoint() *msgpb.MsgPosition {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Checkpoint")
}
var r0 *msgpb.MsgPosition
if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok {
r0 = rf()
@ -143,6 +151,10 @@ func (_c *MockTask_HandleError_Call) RunAndReturn(run func(error)) *MockTask_Han
func (_m *MockTask) IsFlush() bool {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for IsFlush")
}
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
@ -184,6 +196,10 @@ func (_c *MockTask_IsFlush_Call) RunAndReturn(run func() bool) *MockTask_IsFlush
func (_m *MockTask) Run(_a0 context.Context) error {
ret := _m.Called(_a0)
if len(ret) == 0 {
panic("no return value specified for Run")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(_a0)
@ -226,6 +242,10 @@ func (_c *MockTask_Run_Call) RunAndReturn(run func(context.Context) error) *Mock
func (_m *MockTask) SegmentID() int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for SegmentID")
}
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
@ -267,6 +287,10 @@ func (_c *MockTask_SegmentID_Call) RunAndReturn(run func() int64) *MockTask_Segm
func (_m *MockTask) StartPosition() *msgpb.MsgPosition {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for StartPosition")
}
var r0 *msgpb.MsgPosition
if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok {
r0 = rf()

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package writebuffer
@ -30,6 +30,10 @@ func (_m *MockBufferManager) EXPECT() *MockBufferManager_Expecter {
func (_m *MockBufferManager) BufferData(channel string, insertData []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error {
ret := _m.Called(channel, insertData, deleteMsgs, startPos, endPos)
if len(ret) == 0 {
panic("no return value specified for BufferData")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, []*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok {
r0 = rf(channel, insertData, deleteMsgs, startPos, endPos)
@ -143,6 +147,10 @@ func (_c *MockBufferManager_DropPartitions_Call) RunAndReturn(run func(string, [
func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
ret := _m.Called(ctx, channel, flushTs)
if len(ret) == 0 {
panic("no return value specified for FlushChannel")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, uint64) error); ok {
r0 = rf(ctx, channel, flushTs)
@ -187,6 +195,10 @@ func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Con
func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
ret := _m.Called(channel)
if len(ret) == 0 {
panic("no return value specified for GetCheckpoint")
}
var r0 *msgpb.MsgPosition
var r1 bool
var r2 error
@ -289,6 +301,10 @@ func (_m *MockBufferManager) Register(channel string, _a1 metacache.MetaCache, o
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for Register")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, metacache.MetaCache, ...WriteBufferOption) error); ok {
r0 = rf(channel, _a1, opts...)
@ -373,6 +389,10 @@ func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) *
func (_m *MockBufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
ret := _m.Called(ctx, channel, segmentIDs)
if len(ret) == 0 {
panic("no return value specified for SealSegments")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok {
r0 = rf(ctx, channel, segmentIDs)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
// Code generated by mockery v2.46.0. DO NOT EDIT.
package writebuffer
@ -28,6 +28,10 @@ func (_m *MockWriteBuffer) EXPECT() *MockWriteBuffer_Expecter {
func (_m *MockWriteBuffer) BufferData(insertMsgs []*InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error {
ret := _m.Called(insertMsgs, deleteMsgs, startPos, endPos)
if len(ret) == 0 {
panic("no return value specified for BufferData")
}
var r0 error
if rf, ok := ret.Get(0).(func([]*InsertData, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error); ok {
r0 = rf(insertMsgs, deleteMsgs, startPos, endPos)
@ -186,6 +190,10 @@ func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy)
func (_m *MockWriteBuffer) GetCheckpoint() *msgpb.MsgPosition {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetCheckpoint")
}
var r0 *msgpb.MsgPosition
if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok {
r0 = rf()
@ -229,6 +237,10 @@ func (_c *MockWriteBuffer_GetCheckpoint_Call) RunAndReturn(run func() *msgpb.Msg
func (_m *MockWriteBuffer) GetFlushTimestamp() uint64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetFlushTimestamp")
}
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
@ -270,6 +282,10 @@ func (_c *MockWriteBuffer_GetFlushTimestamp_Call) RunAndReturn(run func() uint64
func (_m *MockWriteBuffer) HasSegment(segmentID int64) bool {
ret := _m.Called(segmentID)
if len(ret) == 0 {
panic("no return value specified for HasSegment")
}
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(segmentID)
@ -312,6 +328,10 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M
func (_m *MockWriteBuffer) MemorySize() int64 {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for MemorySize")
}
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
@ -353,6 +373,10 @@ func (_c *MockWriteBuffer_MemorySize_Call) RunAndReturn(run func() int64) *MockW
func (_m *MockWriteBuffer) SealSegments(ctx context.Context, segmentIDs []int64) error {
ret := _m.Called(ctx, segmentIDs)
if len(ret) == 0 {
panic("no return value specified for SealSegments")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok {
r0 = rf(ctx, segmentIDs)

View File

@ -41,6 +41,7 @@ import (
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
_ "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/timerecord"
@ -50,6 +51,8 @@ import (
var _ task = (*statsTask)(nil)
const statsBatchSize = 10000
type statsTask struct {
ident string
ctx context.Context
@ -157,7 +160,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
numRows := st.req.GetNumRows()
bm25FieldIds := compaction.GetBM25FieldIDs(st.req.GetSchema())
writer, err := compaction.NewSegmentWriter(st.req.GetSchema(), numRows, st.req.GetTargetSegmentID(), st.req.GetPartitionID(), st.req.GetCollectionID(), bm25FieldIds)
writer, err := compaction.NewSegmentWriter(st.req.GetSchema(), numRows, statsBatchSize, st.req.GetTargetSegmentID(), st.req.GetPartitionID(), st.req.GetCollectionID(), bm25FieldIds)
if err != nil {
log.Warn("sort segment wrong, unable to init segment writer",
zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
@ -165,22 +168,23 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
}
var (
flushBatchCount int // binlog batch count
unFlushedRowCount int64 = 0
flushBatchCount int // binlog batch count
// All binlog meta of a segment
allBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
allBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog) // All binlog meta of a segment
uploadFutures = make([]*conc.Future[any], 0)
downloadCost time.Duration
serWriteTimeCost time.Duration
sortTimeCost time.Duration
)
serWriteTimeCost := time.Duration(0)
uploadTimeCost := time.Duration(0)
sortTimeCost := time.Duration(0)
downloadStart := time.Now()
values, err := st.downloadData(ctx, numRows, writer.GetPkID(), bm25FieldIds)
if err != nil {
log.Warn("download data failed", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
}
downloadCost = time.Since(downloadStart)
sortStart := time.Now()
sort.Slice(values, func(i, j int) bool {
@ -188,15 +192,14 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
})
sortTimeCost += time.Since(sortStart)
for _, v := range values {
for i, v := range values {
err := writer.Write(v)
if err != nil {
log.Warn("write value wrong, failed to writer row", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
}
unFlushedRowCount++
if (unFlushedRowCount+1)%100 == 0 && writer.FlushAndIsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) {
if (i+1)%statsBatchSize == 0 && writer.IsFullWithBinlogMaxSize(st.req.GetBinlogMaxSize()) {
serWriteStart := time.Now()
binlogNum, kvs, partialBinlogs, err := serializeWrite(ctx, st.req.GetStartLogID()+st.logIDOffset, writer)
if err != nil {
@ -205,17 +208,10 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
}
serWriteTimeCost += time.Since(serWriteStart)
uploadStart := time.Now()
if err := st.binlogIO.Upload(ctx, kvs); err != nil {
log.Warn("stats wrong, failed to upload kvs", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
}
uploadTimeCost += time.Since(uploadStart)
uploadFutures = append(uploadFutures, st.binlogIO.AsyncUpload(ctx, kvs)...)
mergeFieldBinlogs(allBinlogs, partialBinlogs)
flushBatchCount++
unFlushedRowCount = 0
st.logIDOffset += binlogNum
if st.req.GetStartLogID()+st.logIDOffset >= st.req.GetEndLogID() {
log.Warn("binlog files too much, log is not enough", zap.Int64("taskID", st.req.GetTaskID()),
@ -236,16 +232,17 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
serWriteTimeCost += time.Since(serWriteStart)
st.logIDOffset += binlogNum
uploadStart := time.Now()
if err := st.binlogIO.Upload(ctx, kvs); err != nil {
return nil, err
}
uploadTimeCost += time.Since(uploadStart)
uploadFutures = append(uploadFutures, st.binlogIO.AsyncUpload(ctx, kvs)...)
mergeFieldBinlogs(allBinlogs, partialBinlogs)
flushBatchCount++
}
err = conc.AwaitAll(uploadFutures...)
if err != nil {
log.Warn("stats wrong, failed to upload kvs", zap.Int64("taskID", st.req.GetTaskID()), zap.Error(err))
return nil, err
}
serWriteStart := time.Now()
binlogNums, sPath, err := statSerializeWrite(ctx, st.binlogIO, st.req.GetStartLogID()+st.logIDOffset, writer, numRows)
if err != nil {
@ -302,7 +299,7 @@ func (st *statsTask) sortSegment(ctx context.Context) ([]*datapb.FieldBinlog, er
zap.Int64("old rows", numRows),
zap.Int("valid rows", len(values)),
zap.Int("binlog batch count", flushBatchCount),
zap.Duration("upload binlogs elapse", uploadTimeCost),
zap.Duration("download elapse", downloadCost),
zap.Duration("sort elapse", sortTimeCost),
zap.Duration("serWrite elapse", serWriteTimeCost),
zap.Duration("total elapse", totalElapse))

View File

@ -66,7 +66,7 @@ func (s *TaskStatsSuite) SetupSubTest() {
}
func (s *TaskStatsSuite) GenSegmentWriterWithBM25(magic int64) {
segWriter, err := compaction.NewSegmentWriter(s.schema, 100, magic, s.partitionID, s.collectionID, []int64{102})
segWriter, err := compaction.NewSegmentWriter(s.schema, 100, statsBatchSize, magic, s.partitionID, s.collectionID, []int64{102})
s.Require().NoError(err)
v := storage.Value{
@ -115,6 +115,7 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
return result, nil
})
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockBinlogIO.EXPECT().AsyncUpload(mock.Anything, mock.Anything).Return(nil)
ctx, cancel := context.WithCancel(context.Background())
@ -157,8 +158,8 @@ func (s *TaskStatsSuite) TestSortSegmentWithBM25() {
}
return result, nil
})
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once()
s.mockBinlogIO.EXPECT().AsyncUpload(mock.Anything, mock.Anything).Return(nil)
ctx, cancel := context.WithCancel(context.Background())