mirror of https://github.com/milvus-io/milvus.git
enhance: Remove not inuse binlog iterators (#39243)
1. Remove datanode/iterators/binlog_iterator 2. Remove storage/binlog_iterator/MergeIterator See also: #39242 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/39489/head
parent
9cb4c4e8ac
commit
7261128df0
|
@ -101,6 +101,7 @@ cwrapper_rocksdb_build/
|
|||
**/data/*
|
||||
|
||||
internal/proto/**/*.pb.go
|
||||
pkg/streaming/**/*.pb.go
|
||||
internal/core/src/pb/*.pb.h
|
||||
internal/core/src/pb/*.pb.cc
|
||||
**/legacypb/*.pb.go
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
package iterator
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"go.uber.org/atomic"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type BinlogIterator struct {
|
||||
disposed atomic.Bool
|
||||
disposedCh chan struct{}
|
||||
disposedOnce sync.Once
|
||||
|
||||
data *storage.InsertData
|
||||
label *Label
|
||||
pkFieldID int64
|
||||
pkType schemapb.DataType
|
||||
pos int
|
||||
}
|
||||
|
||||
var _ Iterator = (*BinlogIterator)(nil)
|
||||
|
||||
// NewInsertBinlogIterator creates a new iterator
|
||||
func NewInsertBinlogIterator(v [][]byte, pkFieldID typeutil.UniqueID, pkType schemapb.DataType, label *Label) (*BinlogIterator, error) {
|
||||
blobs := make([]*storage.Blob, len(v))
|
||||
for i := range blobs {
|
||||
blobs[i] = &storage.Blob{Value: v[i]}
|
||||
}
|
||||
|
||||
reader := storage.NewInsertCodec()
|
||||
_, _, iData, err := reader.Deserialize(blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BinlogIterator{
|
||||
disposedCh: make(chan struct{}),
|
||||
data: iData,
|
||||
pkFieldID: pkFieldID,
|
||||
pkType: pkType,
|
||||
label: label,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HasNext returns true if the iterator have unread record
|
||||
func (i *BinlogIterator) HasNext() bool {
|
||||
return !i.isDisposed() && i.hasNext()
|
||||
}
|
||||
|
||||
func (i *BinlogIterator) Next() (*LabeledRowData, error) {
|
||||
if i.isDisposed() {
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
|
||||
if !i.hasNext() {
|
||||
return nil, ErrNoMoreRecord
|
||||
}
|
||||
|
||||
fields := make(map[int64]interface{})
|
||||
for fieldID, fieldData := range i.data.Data {
|
||||
fields[fieldID] = fieldData.GetRow(i.pos)
|
||||
}
|
||||
|
||||
pk, err := storage.GenPrimaryKeyByRawData(i.data.Data[i.pkFieldID].GetRow(i.pos), i.pkType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
row := &InsertRow{
|
||||
ID: i.data.Data[common.RowIDField].GetRow(i.pos).(int64),
|
||||
Timestamp: uint64(i.data.Data[common.TimeStampField].GetRow(i.pos).(int64)),
|
||||
Pk: pk,
|
||||
Value: fields,
|
||||
}
|
||||
i.pos++
|
||||
return NewLabeledRowData(row, i.label), nil
|
||||
}
|
||||
|
||||
// Dispose disposes the iterator
|
||||
func (i *BinlogIterator) Dispose() {
|
||||
i.disposed.CompareAndSwap(false, true)
|
||||
i.disposedOnce.Do(func() {
|
||||
close(i.disposedCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (i *BinlogIterator) hasNext() bool {
|
||||
return i.pos < i.data.GetRowNum()
|
||||
}
|
||||
|
||||
func (i *BinlogIterator) isDisposed() bool {
|
||||
return i.disposed.Load()
|
||||
}
|
||||
|
||||
// Disposed wait forever for the iterator to dispose
|
||||
func (i *BinlogIterator) WaitForDisposed() {
|
||||
<-i.disposedCh
|
||||
}
|
|
@ -1,338 +0,0 @@
|
|||
package iterator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/proto/etcdpb"
|
||||
)
|
||||
|
||||
func TestInsertBinlogIteratorSuite(t *testing.T) {
|
||||
suite.Run(t, new(InsertBinlogIteratorSuite))
|
||||
}
|
||||
|
||||
const (
|
||||
CollectionID = 10000
|
||||
PartitionID = 10001
|
||||
SegmentID = 10002
|
||||
RowIDField = 0
|
||||
TimestampField = 1
|
||||
BoolField = 100
|
||||
Int8Field = 101
|
||||
Int16Field = 102
|
||||
Int32Field = 103
|
||||
Int64Field = 104
|
||||
FloatField = 105
|
||||
DoubleField = 106
|
||||
StringField = 107
|
||||
BinaryVectorField = 108
|
||||
FloatVectorField = 109
|
||||
ArrayField = 110
|
||||
JSONField = 111
|
||||
Float16VectorField = 112
|
||||
BFloat16VectorField = 113
|
||||
)
|
||||
|
||||
type InsertBinlogIteratorSuite struct {
|
||||
suite.Suite
|
||||
|
||||
i *BinlogIterator
|
||||
}
|
||||
|
||||
func (s *InsertBinlogIteratorSuite) TestBinlogIterator() {
|
||||
insertData, meta := genTestInsertData()
|
||||
writer := storage.NewInsertCodecWithSchema(meta)
|
||||
blobs, err := writer.Serialize(PartitionID, SegmentID, insertData)
|
||||
s.Require().NoError(err)
|
||||
|
||||
values := [][]byte{}
|
||||
for _, b := range blobs {
|
||||
values = append(values, b.Value)
|
||||
}
|
||||
s.Run("invalid blobs", func() {
|
||||
iter, err := NewInsertBinlogIterator([][]byte{}, Int64Field, schemapb.DataType_Int64, nil)
|
||||
s.Error(err)
|
||||
s.Nil(iter)
|
||||
})
|
||||
|
||||
s.Run("invalid pk type", func() {
|
||||
iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Float, &Label{segmentID: 19530})
|
||||
s.NoError(err)
|
||||
|
||||
_, err = iter.Next()
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("normal", func() {
|
||||
iter, err := NewInsertBinlogIterator(values, Int64Field, schemapb.DataType_Int64, &Label{segmentID: 19530})
|
||||
s.NoError(err)
|
||||
|
||||
rows := []interface{}{}
|
||||
var idx int = 0 // row number
|
||||
|
||||
for iter.HasNext() {
|
||||
labeled, err := iter.Next()
|
||||
s.NoError(err)
|
||||
s.Equal(int64(19530), labeled.GetSegmentID())
|
||||
|
||||
rows = append(rows, labeled.data)
|
||||
|
||||
label := labeled.GetLabel()
|
||||
s.NotNil(label)
|
||||
s.EqualValues(19530, label.segmentID)
|
||||
s.EqualValues(19530, labeled.GetSegmentID())
|
||||
|
||||
insertRow, ok := labeled.data.(*InsertRow)
|
||||
s.True(ok)
|
||||
|
||||
s.EqualValues(insertData.Data[TimestampField].GetRow(idx).(int64), labeled.GetTimestamp())
|
||||
s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), labeled.GetPk().GetValue().(int64))
|
||||
s.Equal(insertData.Data[RowIDField].GetRow(idx).(int64), insertRow.ID)
|
||||
s.Equal(insertData.Data[BoolField].GetRow(idx).(bool), insertRow.Value[BoolField].(bool))
|
||||
s.Equal(insertData.Data[Int8Field].GetRow(idx).(int8), insertRow.Value[Int8Field].(int8))
|
||||
s.Equal(insertData.Data[Int16Field].GetRow(idx).(int16), insertRow.Value[Int16Field].(int16))
|
||||
s.Equal(insertData.Data[Int32Field].GetRow(idx).(int32), insertRow.Value[Int32Field].(int32))
|
||||
s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64))
|
||||
s.Equal(insertData.Data[Int64Field].GetRow(idx).(int64), insertRow.Value[Int64Field].(int64))
|
||||
s.Equal(insertData.Data[FloatField].GetRow(idx).(float32), insertRow.Value[FloatField].(float32))
|
||||
s.Equal(insertData.Data[DoubleField].GetRow(idx).(float64), insertRow.Value[DoubleField].(float64))
|
||||
s.Equal(insertData.Data[StringField].GetRow(idx).(string), insertRow.Value[StringField].(string))
|
||||
s.Equal(insertData.Data[ArrayField].GetRow(idx).(*schemapb.ScalarField).GetIntData().Data, insertRow.Value[ArrayField].(*schemapb.ScalarField).GetIntData().Data)
|
||||
s.Equal(insertData.Data[JSONField].GetRow(idx).([]byte), insertRow.Value[JSONField].([]byte))
|
||||
s.Equal(insertData.Data[BinaryVectorField].GetRow(idx).([]byte), insertRow.Value[BinaryVectorField].([]byte))
|
||||
s.Equal(insertData.Data[FloatVectorField].GetRow(idx).([]float32), insertRow.Value[FloatVectorField].([]float32))
|
||||
s.Equal(insertData.Data[Float16VectorField].GetRow(idx).([]byte), insertRow.Value[Float16VectorField].([]byte))
|
||||
s.Equal(insertData.Data[BFloat16VectorField].GetRow(idx).([]byte), insertRow.Value[BFloat16VectorField].([]byte))
|
||||
|
||||
idx++
|
||||
}
|
||||
|
||||
s.Equal(2, len(rows))
|
||||
|
||||
_, err = iter.Next()
|
||||
s.ErrorIs(err, ErrNoMoreRecord)
|
||||
|
||||
iter.Dispose()
|
||||
iter.WaitForDisposed()
|
||||
|
||||
_, err = iter.Next()
|
||||
s.ErrorIs(err, ErrDisposed)
|
||||
})
|
||||
}
|
||||
|
||||
func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
|
||||
meta := &etcdpb.CollectionMeta{
|
||||
ID: CollectionID,
|
||||
CreateTime: 1,
|
||||
SegmentIDs: []int64{SegmentID},
|
||||
PartitionTags: []string{"partition_0", "partition_1"},
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Name: "schema",
|
||||
Description: "schema",
|
||||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: RowIDField,
|
||||
Name: "row_id",
|
||||
IsPrimaryKey: false,
|
||||
Description: "row_id",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: TimestampField,
|
||||
Name: "Timestamp",
|
||||
IsPrimaryKey: false,
|
||||
Description: "Timestamp",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: BoolField,
|
||||
Name: "field_bool",
|
||||
IsPrimaryKey: false,
|
||||
Description: "bool",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
},
|
||||
{
|
||||
FieldID: Int8Field,
|
||||
Name: "field_int8",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int8",
|
||||
DataType: schemapb.DataType_Int8,
|
||||
},
|
||||
{
|
||||
FieldID: Int16Field,
|
||||
Name: "field_int16",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int16",
|
||||
DataType: schemapb.DataType_Int16,
|
||||
},
|
||||
{
|
||||
FieldID: Int32Field,
|
||||
Name: "field_int32",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int32",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
},
|
||||
{
|
||||
FieldID: Int64Field,
|
||||
Name: "field_int64",
|
||||
IsPrimaryKey: true,
|
||||
Description: "int64",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: FloatField,
|
||||
Name: "field_float",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float",
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
{
|
||||
FieldID: DoubleField,
|
||||
Name: "field_double",
|
||||
IsPrimaryKey: false,
|
||||
Description: "double",
|
||||
DataType: schemapb.DataType_Double,
|
||||
},
|
||||
{
|
||||
FieldID: StringField,
|
||||
Name: "field_string",
|
||||
IsPrimaryKey: false,
|
||||
Description: "string",
|
||||
DataType: schemapb.DataType_String,
|
||||
},
|
||||
{
|
||||
FieldID: ArrayField,
|
||||
Name: "field_int32_array",
|
||||
Description: "int32 array",
|
||||
DataType: schemapb.DataType_Array,
|
||||
ElementType: schemapb.DataType_Int32,
|
||||
},
|
||||
{
|
||||
FieldID: JSONField,
|
||||
Name: "field_json",
|
||||
Description: "json",
|
||||
DataType: schemapb.DataType_JSON,
|
||||
},
|
||||
{
|
||||
FieldID: BinaryVectorField,
|
||||
Name: "field_binary_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "binary_vector",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "8"},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: FloatVectorField,
|
||||
Name: "field_float_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float_vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "4"},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: Float16VectorField,
|
||||
Name: "field_float16_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float16_vector",
|
||||
DataType: schemapb.DataType_Float16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "4"},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: BFloat16VectorField,
|
||||
Name: "field_bfloat16_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "bfloat16_vector",
|
||||
DataType: schemapb.DataType_BFloat16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{Key: common.DimKey, Value: "4"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
insertData := storage.InsertData{
|
||||
Data: map[int64]storage.FieldData{
|
||||
RowIDField: &storage.Int64FieldData{
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
TimestampField: &storage.Int64FieldData{
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
BoolField: &storage.BoolFieldData{
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
Int8Field: &storage.Int8FieldData{
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
Int16Field: &storage.Int16FieldData{
|
||||
Data: []int16{3, 4},
|
||||
},
|
||||
Int32Field: &storage.Int32FieldData{
|
||||
Data: []int32{3, 4},
|
||||
},
|
||||
Int64Field: &storage.Int64FieldData{
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
FloatField: &storage.FloatFieldData{
|
||||
Data: []float32{3, 4},
|
||||
},
|
||||
DoubleField: &storage.DoubleFieldData{
|
||||
Data: []float64{3, 4},
|
||||
},
|
||||
StringField: &storage.StringFieldData{
|
||||
Data: []string{"3", "4"},
|
||||
},
|
||||
BinaryVectorField: &storage.BinaryVectorFieldData{
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &storage.FloatVectorFieldData{
|
||||
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
|
||||
Dim: 4,
|
||||
},
|
||||
ArrayField: &storage.ArrayFieldData{
|
||||
ElementType: schemapb.DataType_Int32,
|
||||
Data: []*schemapb.ScalarField{
|
||||
{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{3, 2, 1}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{6, 5, 4}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
JSONField: &storage.JSONFieldData{
|
||||
Data: [][]byte{
|
||||
[]byte(`{"batch":2}`),
|
||||
[]byte(`{"key":"world"}`),
|
||||
},
|
||||
},
|
||||
Float16VectorField: &storage.Float16VectorFieldData{
|
||||
Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255},
|
||||
Dim: 4,
|
||||
},
|
||||
BFloat16VectorField: &storage.BFloat16VectorFieldData{
|
||||
Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255},
|
||||
Dim: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &insertData, meta
|
||||
}
|
|
@ -33,7 +33,7 @@ func (s *DeltalogIteratorSuite) TestDeltalogIteratorIntPK() {
|
|||
}
|
||||
|
||||
dCodec := storage.NewDeleteCodec()
|
||||
blob, err := dCodec.Serialize(CollectionID, 1, 1, dData)
|
||||
blob, err := dCodec.Serialize(10000, 1, 1, dData)
|
||||
s.Require().NoError(err)
|
||||
value := [][]byte{blob.Value}
|
||||
|
||||
|
|
|
@ -126,92 +126,3 @@ func (itr *InsertBinlogIterator) hasNext() bool {
|
|||
func (itr *InsertBinlogIterator) isDisposed() bool {
|
||||
return atomic.LoadInt32(&itr.dispose) == 1
|
||||
}
|
||||
|
||||
// MergeIterator merge iterators.
|
||||
type MergeIterator struct {
|
||||
disposed int32
|
||||
pos int
|
||||
iteraotrs []Iterator
|
||||
tmpRecords []*Value
|
||||
nextRecord *Value
|
||||
}
|
||||
|
||||
// NewMergeIterator return a new MergeIterator.
|
||||
func NewMergeIterator(iterators []Iterator) *MergeIterator {
|
||||
return &MergeIterator{
|
||||
iteraotrs: iterators,
|
||||
tmpRecords: make([]*Value, len(iterators)),
|
||||
}
|
||||
}
|
||||
|
||||
// HasNext returns true if the iterator have unread record
|
||||
func (itr *MergeIterator) HasNext() bool {
|
||||
return !itr.isDisposed() && itr.hasNext()
|
||||
}
|
||||
|
||||
// Next returns the next record
|
||||
func (itr *MergeIterator) Next() (interface{}, error) {
|
||||
if itr.isDisposed() {
|
||||
return nil, ErrDisposed
|
||||
}
|
||||
|
||||
if !itr.hasNext() {
|
||||
return nil, ErrNoMoreRecord
|
||||
}
|
||||
|
||||
tmpRecord := itr.nextRecord
|
||||
itr.nextRecord = nil
|
||||
return tmpRecord, nil
|
||||
}
|
||||
|
||||
// Dispose disposes the iterator
|
||||
func (itr *MergeIterator) Dispose() {
|
||||
if itr.isDisposed() {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tmpItr := range itr.iteraotrs {
|
||||
if tmpItr != nil {
|
||||
tmpItr.Dispose()
|
||||
}
|
||||
}
|
||||
atomic.CompareAndSwapInt32(&itr.disposed, 0, 1)
|
||||
}
|
||||
|
||||
func (itr *MergeIterator) isDisposed() bool {
|
||||
return atomic.LoadInt32(&itr.disposed) == 1
|
||||
}
|
||||
|
||||
func (itr *MergeIterator) hasNext() bool {
|
||||
if itr.nextRecord != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
var minRecord *Value
|
||||
var minPos int
|
||||
for i, tmpRecord := range itr.tmpRecords {
|
||||
if tmpRecord == nil {
|
||||
if itr.iteraotrs[i] != nil && itr.iteraotrs[i].HasNext() {
|
||||
next, _ := itr.iteraotrs[i].Next()
|
||||
itr.tmpRecords[i] = next.(*Value)
|
||||
tmpRecord = itr.tmpRecords[i]
|
||||
}
|
||||
}
|
||||
if tmpRecord == nil {
|
||||
continue
|
||||
}
|
||||
if minRecord == nil || tmpRecord.ID < minRecord.ID {
|
||||
minRecord = tmpRecord
|
||||
minPos = i
|
||||
}
|
||||
}
|
||||
|
||||
if minRecord == nil {
|
||||
// all iterators have no more records
|
||||
return false
|
||||
}
|
||||
|
||||
itr.tmpRecords[minPos] = nil
|
||||
itr.nextRecord = minRecord
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -279,80 +279,3 @@ func TestInsertlogIterator(t *testing.T) {
|
|||
assert.Equal(t, ErrNoMoreRecord, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMergeIterator(t *testing.T) {
|
||||
t.Run("empty iterators", func(t *testing.T) {
|
||||
iterators := make([]Iterator, 0)
|
||||
for i := 0; i < 3; i++ {
|
||||
iterators = append(iterators, &InsertBinlogIterator{
|
||||
data: &InsertData{},
|
||||
})
|
||||
}
|
||||
itr := NewMergeIterator(iterators)
|
||||
assert.False(t, itr.HasNext())
|
||||
_, err := itr.Next()
|
||||
assert.Equal(t, ErrNoMoreRecord, err)
|
||||
})
|
||||
|
||||
t.Run("empty and non-empty iterators", func(t *testing.T) {
|
||||
blobs, err := generateTestData(3)
|
||||
assert.NoError(t, err)
|
||||
insertItr, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
iterators := []Iterator{
|
||||
&InsertBinlogIterator{data: &InsertData{}},
|
||||
insertItr,
|
||||
}
|
||||
|
||||
itr := NewMergeIterator(iterators)
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
assert.True(t, itr.HasNext())
|
||||
v, err := itr.Next()
|
||||
assert.NoError(t, err)
|
||||
value := v.(*Value)
|
||||
assertTestDataInternal(t, i, value, false)
|
||||
}
|
||||
assert.False(t, itr.HasNext())
|
||||
_, err = itr.Next()
|
||||
assert.Equal(t, ErrNoMoreRecord, err)
|
||||
})
|
||||
|
||||
t.Run("non-empty iterators", func(t *testing.T) {
|
||||
blobs, err := generateTestData(3)
|
||||
assert.NoError(t, err)
|
||||
itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
itr2, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
iterators := []Iterator{itr1, itr2}
|
||||
itr := NewMergeIterator(iterators)
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
for j := 0; j < 2; j++ {
|
||||
assert.True(t, itr.HasNext())
|
||||
v, err := itr.Next()
|
||||
assert.NoError(t, err)
|
||||
value := v.(*Value)
|
||||
assertTestDataInternal(t, i, value, false)
|
||||
}
|
||||
}
|
||||
|
||||
assert.False(t, itr.HasNext())
|
||||
_, err = itr.Next()
|
||||
assert.Equal(t, ErrNoMoreRecord, err)
|
||||
})
|
||||
|
||||
t.Run("test dispose", func(t *testing.T) {
|
||||
blobs, err := generateTestData(3)
|
||||
assert.NoError(t, err)
|
||||
itr1, err := NewInsertBinlogIterator(blobs, common.RowIDField, schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
itr := NewMergeIterator([]Iterator{itr1})
|
||||
|
||||
itr.Dispose()
|
||||
assert.False(t, itr.HasNext())
|
||||
_, err = itr.Next()
|
||||
assert.Equal(t, ErrDisposed, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue