milvus/internal/flushcommon/syncmgr/pack_writer_v2_test.go

231 lines
8.4 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package syncmgr
import (
"context"
"fmt"
"math"
"math/rand"
"sync/atomic"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/internal/util/initcore"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/objectstorage"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
)
func TestPackWriterV2Suite(t *testing.T) {
suite.Run(t, new(PackWriterV2Suite))
}
type PackWriterV2Suite struct {
suite.Suite
ctx context.Context
mockID atomic.Int64
logIDAlloc allocator.Interface
mockBinlogIO *mock_util.MockBinlogIO
schema *schemapb.CollectionSchema
cm storage.ChunkManager
rootPath string
maxRowNum int64
chunkSize uint64
}
func (s *PackWriterV2Suite) SetupTest() {
s.ctx = context.Background()
s.logIDAlloc = allocator.NewLocalAllocator(1, math.MaxInt64)
s.rootPath = "/tmp"
initcore.InitLocalArrowFileSystem(s.rootPath)
paramtable.Get().Init(paramtable.NewBaseTable())
s.schema = &schemapb.CollectionSchema{
Name: "sync_task_test_col",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}
s.cm = storage.NewLocalChunkManager(objectstorage.RootPath(s.rootPath))
}
func (s *PackWriterV2Suite) TestPackWriterV2_Write() {
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
rows := 10
bfs := pkoracle.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil)
metacache.UpdateNumOfRows(1000)(seg)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().Collection().Return(collectionID).Maybe()
mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
mc.EXPECT().GetSegmentByID(segmentID).Return(seg, true).Maybe()
mc.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}).Maybe()
mc.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(seg)
}).Return().Maybe()
deletes := &storage.DeleteData{}
for i := 0; i < rows; i++ {
pk := storage.NewInt64PrimaryKey(int64(i + 1))
ts := uint64(100 + i)
deletes.Append(pk, ts)
}
pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData(genInsertData(rows, s.schema)).WithDeleteData(deletes)
bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil)
gotInserts, _, _, _, _, err := bw.Write(context.Background(), pack)
s.NoError(err)
s.Equal(gotInserts[0].Binlogs[0].GetEntriesNum(), int64(rows))
s.Equal(gotInserts[0].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/0/1")
s.Equal(gotInserts[101].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/101/2")
}
func (s *PackWriterV2Suite) TestWriteEmptyInsertData() {
s.logIDAlloc = allocator.NewLocalAllocator(1, 1)
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName)
bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil)
_, _, _, _, _, err := bw.Write(context.Background(), pack)
s.NoError(err)
}
func (s *PackWriterV2Suite) TestNoPkField() {
s.schema = &schemapb.CollectionSchema{
Name: "no pk field",
Fields: []*schemapb.FieldSchema{
{FieldID: common.RowIDField, DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64},
},
}
s.logIDAlloc = allocator.NewLocalAllocator(1, math.MaxInt64)
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
buf, _ := storage.NewInsertData(s.schema)
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(1)
data[common.TimeStampField] = int64(1)
buf.Append(data)
pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData([]*storage.InsertData{buf})
bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil)
_, _, _, _, _, err := bw.Write(context.Background(), pack)
s.Error(err)
}
func (s *PackWriterV2Suite) TestAllocIDExhausedError() {
s.logIDAlloc = allocator.NewLocalAllocator(1, 1)
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
rows := 10
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData(genInsertData(rows, s.schema))
bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil)
_, _, _, _, _, err := bw.Write(context.Background(), pack)
s.Error(err)
}
func (s *PackWriterV2Suite) TestWriteInsertDataError() {
s.logIDAlloc = allocator.NewLocalAllocator(1, math.MaxInt64)
collectionID := int64(123)
partitionID := int64(456)
segmentID := int64(789)
channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID)
mc := metacache.NewMockMetaCache(s.T())
mc.EXPECT().GetSchema(mock.Anything).Return(s.schema).Maybe()
buf, _ := storage.NewInsertData(s.schema)
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(1)
buf.Append(data)
pack := new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithInsertData([]*storage.InsertData{buf})
bw := NewBulkPackWriterV2(mc, s.schema, s.cm, s.logIDAlloc, packed.DefaultWriteBufferSize, 0, nil)
_, _, _, _, _, err := bw.Write(context.Background(), pack)
s.Error(err)
}
func genInsertData(size int, schema *schemapb.CollectionSchema) []*storage.InsertData {
buf, _ := storage.NewInsertData(schema)
for i := 0; i < size; i++ {
data := make(map[storage.FieldID]any)
data[common.RowIDField] = int64(i + 1)
data[common.TimeStampField] = int64(i + 1)
data[100] = int64(i + 1)
vector := lo.RepeatBy(128, func(_ int) float32 {
return rand.Float32()
})
data[101] = vector
buf.Append(data)
}
return []*storage.InsertData{buf}
}