mirror of https://github.com/milvus-io/milvus.git
enhance: adding a streaming deserialize reader for binlogs (#30860)
See #30863 --------- Signed-off-by: Ted Xu <ted.xu@zilliz.com>pull/30963/head
parent
4b0c3dd377
commit
71adafa933
|
@ -17,6 +17,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -30,23 +31,67 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
|
||||
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
|
||||
{FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool},
|
||||
{FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8},
|
||||
{FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16},
|
||||
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
|
||||
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
|
||||
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
|
||||
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
|
||||
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
|
||||
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
|
||||
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
|
||||
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
|
||||
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector},
|
||||
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector},
|
||||
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector},
|
||||
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector},
|
||||
}}
|
||||
insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: schema})
|
||||
|
||||
var (
|
||||
field0 []int64
|
||||
field1 []int64
|
||||
field0 []int64
|
||||
field1 []int64
|
||||
|
||||
field10 []bool
|
||||
field11 []int8
|
||||
field12 []int16
|
||||
field13 []int64
|
||||
field14 []float32
|
||||
field15 []float64
|
||||
field16 []string
|
||||
field17 []string
|
||||
field18 []*schemapb.ScalarField
|
||||
field19 [][]byte
|
||||
|
||||
field101 []int32
|
||||
field102 []float32
|
||||
field103 []byte
|
||||
|
||||
field104 []byte
|
||||
field105 []byte
|
||||
)
|
||||
|
||||
for i := 1; i <= num; i++ {
|
||||
field0 = append(field0, int64(i))
|
||||
field1 = append(field1, int64(i))
|
||||
field10 = append(field10, true)
|
||||
field11 = append(field11, int8(i))
|
||||
field12 = append(field12, int16(i))
|
||||
field13 = append(field13, int64(i))
|
||||
field14 = append(field14, float32(i))
|
||||
field15 = append(field15, float64(i))
|
||||
field16 = append(field16, fmt.Sprint(i))
|
||||
field17 = append(field17, fmt.Sprint(i))
|
||||
|
||||
arr := &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}},
|
||||
},
|
||||
}
|
||||
field18 = append(field18, arr)
|
||||
|
||||
field19 = append(field19, []byte{byte(i)})
|
||||
field101 = append(field101, int32(i))
|
||||
|
||||
f102 := make([]float32, 8)
|
||||
|
@ -55,13 +100,31 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
}
|
||||
|
||||
field102 = append(field102, f102...)
|
||||
field103 = append(field103, byte(i))
|
||||
field103 = append(field103, 0xff)
|
||||
|
||||
f104 := make([]byte, 8)
|
||||
for j := range f104 {
|
||||
f104[j] = byte(i)
|
||||
}
|
||||
field104 = append(field104, f104...)
|
||||
field105 = append(field105, f104...)
|
||||
}
|
||||
|
||||
data := &InsertData{Data: map[FieldID]FieldData{
|
||||
common.RowIDField: &Int64FieldData{Data: field0},
|
||||
common.TimeStampField: &Int64FieldData{Data: field1},
|
||||
101: &Int32FieldData{Data: field101},
|
||||
|
||||
10: &BoolFieldData{Data: field10},
|
||||
11: &Int8FieldData{Data: field11},
|
||||
12: &Int16FieldData{Data: field12},
|
||||
13: &Int64FieldData{Data: field13},
|
||||
14: &FloatFieldData{Data: field14},
|
||||
15: &DoubleFieldData{Data: field15},
|
||||
16: &StringFieldData{Data: field16},
|
||||
17: &StringFieldData{Data: field17},
|
||||
18: &ArrayFieldData{Data: field18},
|
||||
19: &JSONFieldData{Data: field19},
|
||||
101: &Int32FieldData{Data: field101},
|
||||
102: &FloatVectorFieldData{
|
||||
Data: field102,
|
||||
Dim: 8,
|
||||
|
@ -70,6 +133,14 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
Data: field103,
|
||||
Dim: 8,
|
||||
},
|
||||
104: &Float16VectorFieldData{
|
||||
Data: field104,
|
||||
Dim: 4,
|
||||
},
|
||||
105: &BFloat16VectorFieldData{
|
||||
Data: field105,
|
||||
Dim: 4,
|
||||
},
|
||||
}}
|
||||
|
||||
blobs, err := insertCodec.Serialize(1, 1, data)
|
||||
|
@ -77,6 +148,46 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
return blobs
|
||||
}
|
||||
|
||||
// Verify value of index i (1-based numbering) in data generated by generateTestData
|
||||
func assertTestData(t *testing.T, i int, value *Value) {
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
f104 := make([]byte, 8)
|
||||
for j := range f104 {
|
||||
f104[j] = byte(i)
|
||||
}
|
||||
|
||||
assert.EqualValues(t, &Value{
|
||||
int64(i),
|
||||
&Int64PrimaryKey{Value: int64(i)},
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{
|
||||
common.TimeStampField: int64(i),
|
||||
common.RowIDField: int64(i),
|
||||
|
||||
10: true,
|
||||
11: int8(i),
|
||||
12: int16(i),
|
||||
13: int64(i),
|
||||
14: float32(i),
|
||||
15: float64(i),
|
||||
16: fmt.Sprint(i),
|
||||
17: fmt.Sprint(i),
|
||||
18: &schemapb.ScalarField{Data: &schemapb.ScalarField_IntData{IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}}},
|
||||
19: []byte{byte(i)},
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{0xff},
|
||||
104: f104,
|
||||
105: f104,
|
||||
},
|
||||
}, value)
|
||||
}
|
||||
|
||||
func TestInsertlogIterator(t *testing.T) {
|
||||
t.Run("empty iterator", func(t *testing.T) {
|
||||
itr := &InsertBinlogIterator{
|
||||
|
@ -108,29 +219,7 @@ func TestInsertlogIterator(t *testing.T) {
|
|||
v, err := itr.Next()
|
||||
assert.NoError(t, err)
|
||||
value := v.(*Value)
|
||||
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
pk := &Int64PrimaryKey{
|
||||
Value: int64(i),
|
||||
}
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
pk,
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{
|
||||
common.TimeStampField: int64(i),
|
||||
common.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
assert.EqualValues(t, expected, value)
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
|
||||
assert.False(t, itr.HasNext())
|
||||
|
@ -169,28 +258,7 @@ func TestMergeIterator(t *testing.T) {
|
|||
v, err := itr.Next()
|
||||
assert.NoError(t, err)
|
||||
value := v.(*Value)
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
pk := &Int64PrimaryKey{
|
||||
Value: int64(i),
|
||||
}
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
pk,
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{
|
||||
common.TimeStampField: int64(i),
|
||||
common.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
assert.EqualValues(t, expected, value)
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
assert.False(t, itr.HasNext())
|
||||
_, err = itr.Next()
|
||||
|
@ -207,33 +275,12 @@ func TestMergeIterator(t *testing.T) {
|
|||
itr := NewMergeIterator(iterators)
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
pk := &Int64PrimaryKey{
|
||||
Value: int64(i),
|
||||
}
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
pk,
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{
|
||||
common.TimeStampField: int64(i),
|
||||
common.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
for j := 0; j < 2; j++ {
|
||||
assert.True(t, itr.HasNext())
|
||||
v, err := itr.Next()
|
||||
assert.NoError(t, err)
|
||||
value := v.(*Value)
|
||||
assert.EqualValues(t, expected, value)
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -83,11 +84,15 @@ func (s BlobList) Len() int {
|
|||
|
||||
// Less implements Less in sort.Interface
|
||||
func (s BlobList) Less(i, j int) bool {
|
||||
leftValues := strings.Split(s[i].Key, "/")
|
||||
rightValues := strings.Split(s[j].Key, "/")
|
||||
left, _ := strconv.ParseInt(leftValues[len(leftValues)-1], 0, 10)
|
||||
right, _ := strconv.ParseInt(rightValues[len(rightValues)-1], 0, 10)
|
||||
return left < right
|
||||
_, _, _, _, iLog, ok := metautil.ParseInsertLogPath(s[i].Key)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, _, _, _, jLog, ok := metautil.ParseInsertLogPath(s[j].Key)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return iLog < jLog
|
||||
}
|
||||
|
||||
// Swap implements Swap in sort.Interface
|
||||
|
@ -1115,7 +1120,6 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
|
|||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64Ts)))
|
||||
|
||||
err = writer.Finish()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,412 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
)
|
||||
|
||||
type Record interface {
|
||||
Schema() map[FieldID]schemapb.DataType
|
||||
Column(i FieldID) arrow.Array
|
||||
Len() int
|
||||
Release()
|
||||
}
|
||||
|
||||
type RecordReader interface {
|
||||
Next() error
|
||||
Record() Record
|
||||
Close()
|
||||
}
|
||||
|
||||
// compositeRecord is a record being composed of multiple records, in which each only have 1 column
|
||||
type compositeRecord struct {
|
||||
recs map[FieldID]arrow.Record
|
||||
schema map[FieldID]schemapb.DataType
|
||||
}
|
||||
|
||||
func (r *compositeRecord) Column(i FieldID) arrow.Array {
|
||||
return r.recs[i].Column(0)
|
||||
}
|
||||
|
||||
func (r *compositeRecord) Len() int {
|
||||
for _, rec := range r.recs {
|
||||
return rec.Column(0).Len()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (r *compositeRecord) Release() {
|
||||
for _, rec := range r.recs {
|
||||
rec.Release()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *compositeRecord) Schema() map[FieldID]schemapb.DataType {
|
||||
return r.schema
|
||||
}
|
||||
|
||||
type compositeRecordReader struct {
|
||||
RecordReader
|
||||
blobs [][]*Blob
|
||||
|
||||
blobPos int
|
||||
rrs []array.RecordReader
|
||||
closers []func()
|
||||
fields []FieldID
|
||||
|
||||
r compositeRecord
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) iterateNextBatch() error {
|
||||
if crr.closers != nil {
|
||||
for _, close := range crr.closers {
|
||||
if close != nil {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
crr.blobPos++
|
||||
if crr.blobPos >= len(crr.blobs[0]) {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
for i, b := range crr.blobs {
|
||||
reader, err := NewBinlogReader(b[crr.blobPos].Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
crr.fields[i] = reader.FieldID
|
||||
// TODO: assert schema being the same in every blobs
|
||||
crr.r.schema[reader.FieldID] = reader.PayloadDataType
|
||||
er, err := reader.NextEventReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr, err := er.GetArrowRecordReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
crr.rrs[i] = rr
|
||||
crr.closers[i] = func() {
|
||||
rr.Release()
|
||||
er.Close()
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Next() error {
|
||||
if crr.rrs == nil {
|
||||
if crr.blobs == nil || len(crr.blobs) == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
crr.rrs = make([]array.RecordReader, len(crr.blobs))
|
||||
crr.closers = make([]func(), len(crr.blobs))
|
||||
crr.blobPos = -1
|
||||
crr.fields = make([]FieldID, len(crr.rrs))
|
||||
crr.r = compositeRecord{
|
||||
recs: make(map[FieldID]arrow.Record, len(crr.rrs)),
|
||||
schema: make(map[FieldID]schemapb.DataType, len(crr.rrs)),
|
||||
}
|
||||
crr.iterateNextBatch()
|
||||
}
|
||||
|
||||
composeRecord := func() bool {
|
||||
for i, rr := range crr.rrs {
|
||||
if ok := rr.Next(); !ok {
|
||||
return false
|
||||
}
|
||||
// compose record
|
||||
crr.r.recs[crr.fields[i]] = rr.Record()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Try compose records
|
||||
if ok := composeRecord(); !ok {
|
||||
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
|
||||
if err := crr.iterateNextBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
// If iterate next batch success, try compose again
|
||||
if ok := composeRecord(); !ok {
|
||||
// If the next blob is empty, return io.EOF (it's rare).
|
||||
return io.EOF
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Record() Record {
|
||||
return &crr.r
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Close() {
|
||||
for _, close := range crr.closers {
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
||||
func parseBlobKey(bolbKey string) (colId FieldID, logId UniqueID) {
|
||||
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(bolbKey); ok {
|
||||
return colId, logId
|
||||
}
|
||||
if colId, err := strconv.ParseInt(bolbKey, 10, 64); err == nil {
|
||||
// data_codec.go generate single field id as blob key.
|
||||
return colId, 0
|
||||
}
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
func newCompositeRecordReader(blobs []*Blob) (*compositeRecordReader, error) {
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
iCol, iLog := parseBlobKey(blobs[i].Key)
|
||||
jCol, jLog := parseBlobKey(blobs[j].Key)
|
||||
|
||||
if iCol == jCol {
|
||||
return iLog < jLog
|
||||
}
|
||||
return iCol < jCol
|
||||
})
|
||||
|
||||
blobm := make([][]*Blob, 0)
|
||||
var fieldId FieldID = -1
|
||||
var currentCol []*Blob
|
||||
|
||||
for _, blob := range blobs {
|
||||
colId, _ := parseBlobKey(blob.Key)
|
||||
if colId != fieldId {
|
||||
if currentCol != nil {
|
||||
blobm = append(blobm, currentCol)
|
||||
}
|
||||
currentCol = make([]*Blob, 0)
|
||||
fieldId = colId
|
||||
}
|
||||
currentCol = append(currentCol, blob)
|
||||
}
|
||||
if currentCol != nil {
|
||||
blobm = append(blobm, currentCol)
|
||||
}
|
||||
|
||||
return &compositeRecordReader{
|
||||
blobs: blobm,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type DeserializeReader[T any] struct {
|
||||
rr RecordReader
|
||||
deserializer func(Record, []T) error
|
||||
rec Record
|
||||
values []T
|
||||
pos int
|
||||
}
|
||||
|
||||
// Iterate to next value, return error or EOF if no more value.
|
||||
func (deser *DeserializeReader[T]) Next() error {
|
||||
if deser.rec == nil || deser.pos >= deser.rec.Len()-1 {
|
||||
if err := deser.rr.Next(); err != nil {
|
||||
return err
|
||||
}
|
||||
deser.pos = 0
|
||||
if deser.rec != nil {
|
||||
deser.rec.Release()
|
||||
}
|
||||
deser.rec = deser.rr.Record()
|
||||
|
||||
if deser.values == nil {
|
||||
deser.values = make([]T, deser.rec.Len())
|
||||
}
|
||||
if err := deser.deserializer(deser.rec, deser.values); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
deser.pos++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (deser *DeserializeReader[T]) Value() T {
|
||||
return deser.values[deser.pos]
|
||||
}
|
||||
|
||||
func (deser *DeserializeReader[T]) Close() {
|
||||
if deser.rec != nil {
|
||||
deser.rec.Release()
|
||||
}
|
||||
if deser.rr != nil {
|
||||
deser.rr.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewDeserializeReader[T any](rr RecordReader, deserializer func(Record, []T) error) *DeserializeReader[T] {
|
||||
return &DeserializeReader[T]{
|
||||
rr: rr,
|
||||
deserializer: deserializer,
|
||||
}
|
||||
}
|
||||
|
||||
func deserializeCell(col arrow.Array, dataType schemapb.DataType, i int) (interface{}, bool) {
|
||||
switch dataType {
|
||||
case schemapb.DataType_Bool:
|
||||
arr, ok := col.(*array.Boolean)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Int8:
|
||||
arr, ok := col.(*array.Int8)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Int16:
|
||||
arr, ok := col.(*array.Int16)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Int32:
|
||||
arr, ok := col.(*array.Int32)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Int64:
|
||||
arr, ok := col.(*array.Int64)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Float:
|
||||
arr, ok := col.(*array.Float32)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Double:
|
||||
arr, ok := col.(*array.Float64)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
arr, ok := col.(*array.String)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_Array:
|
||||
arr, ok := col.(*array.Binary)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
v := &schemapb.ScalarField{}
|
||||
if err := proto.Unmarshal(arr.Value(i), v); err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return v, true
|
||||
|
||||
case schemapb.DataType_JSON:
|
||||
arr, ok := col.(*array.Binary)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
|
||||
arr, ok := col.(*array.FixedSizeBinary)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arr.Value(i), true
|
||||
|
||||
case schemapb.DataType_FloatVector:
|
||||
arr, ok := col.(*array.FixedSizeBinary)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return arrow.Float32Traits.CastFromBytes(arr.Value(i)), true
|
||||
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported type %s", dataType))
|
||||
}
|
||||
}
|
||||
|
||||
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
|
||||
reader, err := newCompositeRecordReader(blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
|
||||
// Note: the return value `Value` is reused.
|
||||
for i := 0; i < r.Len(); i++ {
|
||||
value := v[i]
|
||||
if value == nil {
|
||||
value = &Value{}
|
||||
v[i] = value
|
||||
}
|
||||
|
||||
m := make(map[FieldID]interface{})
|
||||
for j, dt := range r.Schema() {
|
||||
d, ok := deserializeCell(r.Column(j), dt, i)
|
||||
if ok {
|
||||
m[j] = d
|
||||
} else {
|
||||
return errors.New(fmt.Sprintf("unexpected type %s", dt))
|
||||
}
|
||||
}
|
||||
|
||||
value.ID = m[common.RowIDField].(int64)
|
||||
value.Timestamp = m[common.TimeStampField].(int64)
|
||||
|
||||
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], r.Schema()[PKfieldID])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
value.PK = pk
|
||||
value.IsDeleted = false
|
||||
value.Value = m
|
||||
}
|
||||
return nil
|
||||
}), nil
|
||||
}
|
|
@ -0,0 +1,175 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
func TestBinlogDeserializeReader(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
|
||||
// blobs := generateTestData(t, 0)
|
||||
// reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
// assert.NoError(t, err)
|
||||
// err = reader.Next()
|
||||
// assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test deserialize", func(t *testing.T) {
|
||||
len := 3
|
||||
blobs := generateTestData(t, len)
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
for i := 1; i <= len; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_deserializeCell(t *testing.T) {
|
||||
onelinerArray := func(dtype arrow.DataType, payload interface{}) arrow.Array {
|
||||
mem := memory.DefaultAllocator
|
||||
|
||||
switch dtype.ID() {
|
||||
case arrow.BOOL:
|
||||
builder := array.NewBooleanBuilder(mem)
|
||||
builder.Append(payload.(bool))
|
||||
return builder.NewBooleanArray()
|
||||
case arrow.INT8:
|
||||
builder := array.NewInt8Builder(mem)
|
||||
builder.Append(payload.(int8))
|
||||
return builder.NewInt8Array()
|
||||
case arrow.INT16:
|
||||
builder := array.NewInt16Builder(mem)
|
||||
builder.Append(payload.(int16))
|
||||
return builder.NewInt16Array()
|
||||
case arrow.INT32:
|
||||
builder := array.NewInt32Builder(mem)
|
||||
builder.Append(payload.(int32))
|
||||
return builder.NewInt32Array()
|
||||
case arrow.INT64:
|
||||
builder := array.NewInt64Builder(mem)
|
||||
builder.Append(payload.(int64))
|
||||
return builder.NewInt64Array()
|
||||
case arrow.FLOAT32:
|
||||
builder := array.NewFloat32Builder(mem)
|
||||
builder.Append(payload.(float32))
|
||||
return builder.NewFloat32Array()
|
||||
case arrow.FLOAT64:
|
||||
builder := array.NewFloat64Builder(mem)
|
||||
builder.Append(payload.(float64))
|
||||
return builder.NewFloat64Array()
|
||||
case arrow.STRING:
|
||||
builder := array.NewStringBuilder(mem)
|
||||
builder.Append(payload.(string))
|
||||
return builder.NewStringArray()
|
||||
case arrow.BINARY:
|
||||
builder := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
|
||||
builder.Append(payload.([]byte))
|
||||
return builder.NewBinaryArray()
|
||||
case arrow.FIXED_SIZE_BINARY:
|
||||
typ := dtype.(*arrow.FixedSizeBinaryType)
|
||||
builder := array.NewFixedSizeBinaryBuilder(mem, typ)
|
||||
builder.Append(payload.([]byte))
|
||||
return builder.NewFixedSizeBinaryArray()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type args struct {
|
||||
col arrow.Array
|
||||
dataType schemapb.DataType
|
||||
i int
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want interface{}
|
||||
want1 bool
|
||||
}{
|
||||
{"test bool", args{col: onelinerArray(arrow.FixedWidthTypes.Boolean, true), dataType: schemapb.DataType_Bool, i: 0}, true, true},
|
||||
{"test bool negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Bool, i: 0}, nil, false},
|
||||
{"test int8", args{col: onelinerArray(arrow.PrimitiveTypes.Int8, int8(1)), dataType: schemapb.DataType_Int8, i: 0}, int8(1), true},
|
||||
{"test int8 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Int8, i: 0}, nil, false},
|
||||
{"test int16", args{col: onelinerArray(arrow.PrimitiveTypes.Int16, int16(1)), dataType: schemapb.DataType_Int16, i: 0}, int16(1), true},
|
||||
{"test int16 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Int16, i: 0}, nil, false},
|
||||
{"test int32", args{col: onelinerArray(arrow.PrimitiveTypes.Int32, int32(1)), dataType: schemapb.DataType_Int32, i: 0}, int32(1), true},
|
||||
{"test int32 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Int32, i: 0}, nil, false},
|
||||
{"test int64", args{col: onelinerArray(arrow.PrimitiveTypes.Int64, int64(1)), dataType: schemapb.DataType_Int64, i: 0}, int64(1), true},
|
||||
{"test int64 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Int64, i: 0}, nil, false},
|
||||
{"test float32", args{col: onelinerArray(arrow.PrimitiveTypes.Float32, float32(1)), dataType: schemapb.DataType_Float, i: 0}, float32(1), true},
|
||||
{"test float32 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Float, i: 0}, nil, false},
|
||||
{"test float64", args{col: onelinerArray(arrow.PrimitiveTypes.Float64, float64(1)), dataType: schemapb.DataType_Double, i: 0}, float64(1), true},
|
||||
{"test float64 negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Double, i: 0}, nil, false},
|
||||
{"test string", args{col: onelinerArray(arrow.BinaryTypes.String, "test"), dataType: schemapb.DataType_String, i: 0}, "test", true},
|
||||
{"test string negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_String, i: 0}, nil, false},
|
||||
{"test varchar", args{col: onelinerArray(arrow.BinaryTypes.String, "test"), dataType: schemapb.DataType_VarChar, i: 0}, "test", true},
|
||||
{"test varchar negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_VarChar, i: 0}, nil, false},
|
||||
{"test array negative", args{col: onelinerArray(arrow.BinaryTypes.Binary, []byte("{}")), dataType: schemapb.DataType_Array, i: 0}, nil, false},
|
||||
{"test array negative null", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_Array, i: 0}, nil, false},
|
||||
{"test json", args{col: onelinerArray(arrow.BinaryTypes.Binary, []byte("{}")), dataType: schemapb.DataType_JSON, i: 0}, []byte("{}"), true},
|
||||
{"test json negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_JSON, i: 0}, nil, false},
|
||||
{"test float vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte{0, 0, 0, 0}), dataType: schemapb.DataType_FloatVector, i: 0}, []float32{0.0}, true},
|
||||
{"test float vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_FloatVector, i: 0}, nil, false},
|
||||
{"test bool vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_BinaryVector, i: 0}, []byte("test"), true},
|
||||
{"test float16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_Float16Vector, i: 0}, []byte("test"), true},
|
||||
{"test bfloat16 vector", args{col: onelinerArray(&arrow.FixedSizeBinaryType{ByteWidth: 4}, []byte("test")), dataType: schemapb.DataType_BFloat16Vector, i: 0}, []byte("test"), true},
|
||||
{"test bfloat16 vector negative", args{col: onelinerArray(arrow.Null, nil), dataType: schemapb.DataType_BFloat16Vector, i: 0}, nil, false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1 := deserializeCell(tt.args.col, tt.args.dataType, tt.args.i)
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("deserializeCell() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
if got1 != tt.want1 {
|
||||
t.Errorf("deserializeCell() got1 = %v, want %v", got1, tt.want1)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -16,6 +16,33 @@ func BuildInsertLogPath(rootPath string, collectionID, partitionID, segmentID, f
|
|||
return path.Join(rootPath, common.SegmentInsertLogPath, k)
|
||||
}
|
||||
|
||||
func ParseInsertLogPath(path string) (collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID, ok bool) {
|
||||
infos := strings.Split(path, pathSep)
|
||||
l := len(infos)
|
||||
if l < 6 {
|
||||
ok = false
|
||||
return
|
||||
}
|
||||
var err error
|
||||
if collectionID, err = strconv.ParseInt(infos[l-5], 10, 64); err != nil {
|
||||
return 0, 0, 0, 0, 0, false
|
||||
}
|
||||
if partitionID, err = strconv.ParseInt(infos[l-4], 10, 64); err != nil {
|
||||
return 0, 0, 0, 0, 0, false
|
||||
}
|
||||
if segmentID, err = strconv.ParseInt(infos[l-3], 10, 64); err != nil {
|
||||
return 0, 0, 0, 0, 0, false
|
||||
}
|
||||
if fieldID, err = strconv.ParseInt(infos[l-2], 10, 64); err != nil {
|
||||
return 0, 0, 0, 0, 0, false
|
||||
}
|
||||
if logID, err = strconv.ParseInt(infos[l-1], 10, 64); err != nil {
|
||||
return 0, 0, 0, 0, 0, false
|
||||
}
|
||||
ok = true
|
||||
return
|
||||
}
|
||||
|
||||
func GetSegmentIDFromInsertLogPath(logPath string) typeutil.UniqueID {
|
||||
return getSegmentIDFromPath(logPath, 3)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metautil
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func TestParseInsertLogPath(t *testing.T) {
|
||||
type args struct {
|
||||
path string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantCollectionID typeutil.UniqueID
|
||||
wantPartitionID typeutil.UniqueID
|
||||
wantSegmentID typeutil.UniqueID
|
||||
wantFieldID typeutil.UniqueID
|
||||
wantLogID typeutil.UniqueID
|
||||
wantOk bool
|
||||
}{
|
||||
{
|
||||
"test parse insert log path",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/446266956600703270/446266956600703326/447985737531772787/102/447985737523710526"},
|
||||
446266956600703270,
|
||||
446266956600703326,
|
||||
447985737531772787,
|
||||
102,
|
||||
447985737523710526,
|
||||
true,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative1",
|
||||
args{path: "foobar"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative2",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/446266956600703270/446266956600703326/447985737531772787/102/foo"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative3",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/446266956600703270/446266956600703326/447985737531772787/foo/447985737523710526"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative4",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/446266956600703270/446266956600703326/foo/102/447985737523710526"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative5",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/446266956600703270/foo/447985737531772787/102/447985737523710526"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
|
||||
{
|
||||
"test parse insert log path negative6",
|
||||
args{path: "8a8c3ac2298b12f/insert_log/foo/446266956600703326/447985737531772787/102/447985737523710526"},
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotCollectionID, gotPartitionID, gotSegmentID, gotFieldID, gotLogID, gotOk := ParseInsertLogPath(tt.args.path)
|
||||
if !reflect.DeepEqual(gotCollectionID, tt.wantCollectionID) {
|
||||
t.Errorf("ParseInsertLogPath() gotCollectionID = %v, want %v", gotCollectionID, tt.wantCollectionID)
|
||||
}
|
||||
if !reflect.DeepEqual(gotPartitionID, tt.wantPartitionID) {
|
||||
t.Errorf("ParseInsertLogPath() gotPartitionID = %v, want %v", gotPartitionID, tt.wantPartitionID)
|
||||
}
|
||||
if !reflect.DeepEqual(gotSegmentID, tt.wantSegmentID) {
|
||||
t.Errorf("ParseInsertLogPath() gotSegmentID = %v, want %v", gotSegmentID, tt.wantSegmentID)
|
||||
}
|
||||
if !reflect.DeepEqual(gotFieldID, tt.wantFieldID) {
|
||||
t.Errorf("ParseInsertLogPath() gotFieldID = %v, want %v", gotFieldID, tt.wantFieldID)
|
||||
}
|
||||
if !reflect.DeepEqual(gotLogID, tt.wantLogID) {
|
||||
t.Errorf("ParseInsertLogPath() gotLogID = %v, want %v", gotLogID, tt.wantLogID)
|
||||
}
|
||||
if gotOk != tt.wantOk {
|
||||
t.Errorf("ParseInsertLogPath() gotOk = %v, want %v", gotOk, tt.wantOk)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue