mirror of https://github.com/milvus-io/milvus.git
feat: adding deltalog stream reader and writer (#33844)
See #31679 --------- Signed-off-by: Ted Xu <ted.xu@zilliz.com>pull/33860/head
parent
7b9462c0d3
commit
6d5747cb3e
|
@ -17,26 +17,18 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/apache/arrow/go/v12/parquet"
|
||||
"github.com/apache/arrow/go/v12/parquet/pqarrow"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
type Record interface {
|
||||
|
@ -89,115 +81,6 @@ func (r *compositeRecord) Schema() map[FieldID]schemapb.DataType {
|
|||
return r.schema
|
||||
}
|
||||
|
||||
var _ RecordReader = (*compositeRecordReader)(nil)
|
||||
|
||||
type compositeRecordReader struct {
|
||||
blobs [][]*Blob
|
||||
|
||||
blobPos int
|
||||
rrs []array.RecordReader
|
||||
closers []func()
|
||||
fields []FieldID
|
||||
|
||||
r compositeRecord
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) iterateNextBatch() error {
|
||||
if crr.closers != nil {
|
||||
for _, close := range crr.closers {
|
||||
if close != nil {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
crr.blobPos++
|
||||
if crr.blobPos >= len(crr.blobs[0]) {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
for i, b := range crr.blobs {
|
||||
reader, err := NewBinlogReader(b[crr.blobPos].Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
crr.fields[i] = reader.FieldID
|
||||
// TODO: assert schema being the same in every blobs
|
||||
crr.r.schema[reader.FieldID] = reader.PayloadDataType
|
||||
er, err := reader.NextEventReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr, err := er.GetArrowRecordReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
crr.rrs[i] = rr
|
||||
crr.closers[i] = func() {
|
||||
rr.Release()
|
||||
er.Close()
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Next() error {
|
||||
if crr.rrs == nil {
|
||||
if crr.blobs == nil || len(crr.blobs) == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
crr.rrs = make([]array.RecordReader, len(crr.blobs))
|
||||
crr.closers = make([]func(), len(crr.blobs))
|
||||
crr.blobPos = -1
|
||||
crr.fields = make([]FieldID, len(crr.rrs))
|
||||
crr.r = compositeRecord{
|
||||
recs: make(map[FieldID]arrow.Record, len(crr.rrs)),
|
||||
schema: make(map[FieldID]schemapb.DataType, len(crr.rrs)),
|
||||
}
|
||||
if err := crr.iterateNextBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
composeRecord := func() bool {
|
||||
for i, rr := range crr.rrs {
|
||||
if ok := rr.Next(); !ok {
|
||||
return false
|
||||
}
|
||||
// compose record
|
||||
crr.r.recs[crr.fields[i]] = rr.Record()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Try compose records
|
||||
if ok := composeRecord(); !ok {
|
||||
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
|
||||
if err := crr.iterateNextBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
// If iterate next batch success, try compose again
|
||||
if ok := composeRecord(); !ok {
|
||||
// If the next blob is empty, return io.EOF (it's rare).
|
||||
return io.EOF
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Record() Record {
|
||||
return &crr.r
|
||||
}
|
||||
|
||||
func (crr *compositeRecordReader) Close() {
|
||||
for _, close := range crr.closers {
|
||||
if close != nil {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type serdeEntry struct {
|
||||
// arrowType returns the arrow type for the given dimension
|
||||
arrowType func(int) arrow.DataType
|
||||
|
@ -627,52 +510,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry {
|
|||
return m
|
||||
}()
|
||||
|
||||
func parseBlobKey(bolbKey string) (colId FieldID, logId UniqueID) {
|
||||
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(bolbKey); ok {
|
||||
return colId, logId
|
||||
}
|
||||
if colId, err := strconv.ParseInt(bolbKey, 10, 64); err == nil {
|
||||
// data_codec.go generate single field id as blob key.
|
||||
return colId, 0
|
||||
}
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
func newCompositeRecordReader(blobs []*Blob) (*compositeRecordReader, error) {
|
||||
sort.Slice(blobs, func(i, j int) bool {
|
||||
iCol, iLog := parseBlobKey(blobs[i].Key)
|
||||
jCol, jLog := parseBlobKey(blobs[j].Key)
|
||||
|
||||
if iCol == jCol {
|
||||
return iLog < jLog
|
||||
}
|
||||
return iCol < jCol
|
||||
})
|
||||
|
||||
blobm := make([][]*Blob, 0)
|
||||
var fieldId FieldID = -1
|
||||
var currentCol []*Blob
|
||||
|
||||
for _, blob := range blobs {
|
||||
colId, _ := parseBlobKey(blob.Key)
|
||||
if colId != fieldId {
|
||||
if currentCol != nil {
|
||||
blobm = append(blobm, currentCol)
|
||||
}
|
||||
currentCol = make([]*Blob, 0)
|
||||
fieldId = colId
|
||||
}
|
||||
currentCol = append(currentCol, blob)
|
||||
}
|
||||
if currentCol != nil {
|
||||
blobm = append(blobm, currentCol)
|
||||
}
|
||||
|
||||
return &compositeRecordReader{
|
||||
blobs: blobm,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type DeserializeReader[T any] struct {
|
||||
rr RecordReader
|
||||
deserializer Deserializer[T]
|
||||
|
@ -722,56 +559,6 @@ func NewDeserializeReader[T any](rr RecordReader, deserializer Deserializer[T])
|
|||
}
|
||||
}
|
||||
|
||||
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
|
||||
reader, err := newCompositeRecordReader(blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
|
||||
// Note: the return value `Value` is reused.
|
||||
for i := 0; i < r.Len(); i++ {
|
||||
value := v[i]
|
||||
if value == nil {
|
||||
value = &Value{}
|
||||
m := make(map[FieldID]interface{}, len(r.Schema()))
|
||||
value.Value = m
|
||||
v[i] = value
|
||||
}
|
||||
|
||||
m := value.Value.(map[FieldID]interface{})
|
||||
for j, dt := range r.Schema() {
|
||||
if r.Column(j).IsNull(i) {
|
||||
m[j] = nil
|
||||
} else {
|
||||
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
|
||||
if ok {
|
||||
m[j] = d // TODO: avoid memory copy here.
|
||||
} else {
|
||||
return errors.New(fmt.Sprintf("unexpected type %s", dt))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := m[common.RowIDField]; !ok {
|
||||
panic("no row id column found")
|
||||
}
|
||||
value.ID = m[common.RowIDField].(int64)
|
||||
value.Timestamp = m[common.TimeStampField].(int64)
|
||||
|
||||
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], r.Schema()[PKfieldID])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
value.PK = pk
|
||||
value.IsDeleted = false
|
||||
value.Value = m
|
||||
}
|
||||
return nil
|
||||
}), nil
|
||||
}
|
||||
|
||||
var _ Record = (*selectiveRecord)(nil)
|
||||
|
||||
// selectiveRecord is a Record that only contains a single field, reusing existing Record.
|
||||
|
@ -985,163 +772,3 @@ func newSimpleArrowRecord(r arrow.Record, schema map[FieldID]schemapb.DataType,
|
|||
field2Col: field2Col,
|
||||
}
|
||||
}
|
||||
|
||||
type BinlogStreamWriter struct {
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
segmentID UniqueID
|
||||
fieldSchema *schemapb.FieldSchema
|
||||
|
||||
memorySize int // To be updated on the fly
|
||||
|
||||
buf bytes.Buffer
|
||||
rw *singleFieldRecordWriter
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
||||
if bsw.rw != nil {
|
||||
return bsw.rw, nil
|
||||
}
|
||||
|
||||
fid := bsw.fieldSchema.FieldID
|
||||
dim, _ := typeutil.GetDim(bsw.fieldSchema)
|
||||
rw, err := newSingleFieldRecordWriter(fid, arrow.Field{
|
||||
Name: strconv.Itoa(int(fid)),
|
||||
Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)),
|
||||
Nullable: true, // No nullable check here.
|
||||
}, &bsw.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bsw.rw = rw
|
||||
return rw, nil
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
|
||||
if bsw.rw == nil {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
bsw.rw.Close()
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := bsw.writeBinlogHeaders(&b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.Write(bsw.buf.Bytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Blob{
|
||||
Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)),
|
||||
Value: b.Bytes(),
|
||||
RowNum: int64(bsw.rw.numRows),
|
||||
MemorySize: int64(bsw.memorySize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error {
|
||||
// Write magic number
|
||||
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write descriptor
|
||||
de := newDescriptorEvent()
|
||||
de.PayloadDataType = bsw.fieldSchema.DataType
|
||||
de.CollectionID = bsw.collectionID
|
||||
de.PartitionID = bsw.partitionID
|
||||
de.SegmentID = bsw.segmentID
|
||||
de.FieldID = bsw.fieldSchema.FieldID
|
||||
de.StartTimestamp = 0
|
||||
de.EndTimestamp = 0
|
||||
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize)) // FIXME: enable original size
|
||||
if err := de.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write event header
|
||||
eh := newEventHeader(InsertEventType)
|
||||
// Write event data
|
||||
ev := newInsertEventData()
|
||||
ev.StartTimestamp = 1 // Fixme: enable start/end timestamp
|
||||
ev.EndTimestamp = 1
|
||||
eh.EventLength = int32(bsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
|
||||
// eh.NextPosition = eh.EventLength + w.Offset()
|
||||
if err := eh.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ev.WriteEventData(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBinlogStreamWriters(collectionID, partitionID, segmentID UniqueID,
|
||||
schema []*schemapb.FieldSchema,
|
||||
) map[FieldID]*BinlogStreamWriter {
|
||||
bws := make(map[FieldID]*BinlogStreamWriter, len(schema))
|
||||
for _, f := range schema {
|
||||
bws[f.FieldID] = &BinlogStreamWriter{
|
||||
collectionID: collectionID,
|
||||
partitionID: partitionID,
|
||||
segmentID: segmentID,
|
||||
fieldSchema: f,
|
||||
}
|
||||
}
|
||||
return bws
|
||||
}
|
||||
|
||||
func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
|
||||
writers map[FieldID]*BinlogStreamWriter, batchSize int,
|
||||
) (*SerializeWriter[*Value], error) {
|
||||
rws := make(map[FieldID]RecordWriter, len(writers))
|
||||
for fid := range writers {
|
||||
w := writers[fid]
|
||||
rw, err := w.GetRecordWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rws[fid] = rw
|
||||
}
|
||||
compositeRecordWriter := newCompositeRecordWriter(rws)
|
||||
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, uint64, error) {
|
||||
builders := make(map[FieldID]array.Builder, len(schema.Fields))
|
||||
types := make(map[FieldID]schemapb.DataType, len(schema.Fields))
|
||||
for _, f := range schema.Fields {
|
||||
dim, _ := typeutil.GetDim(f)
|
||||
builders[f.FieldID] = array.NewBuilder(memory.DefaultAllocator, serdeMap[f.DataType].arrowType(int(dim)))
|
||||
types[f.FieldID] = f.DataType
|
||||
}
|
||||
|
||||
var memorySize uint64
|
||||
for _, vv := range v {
|
||||
m := vv.Value.(map[FieldID]any)
|
||||
|
||||
for fid, e := range m {
|
||||
typeEntry, ok := serdeMap[types[fid]]
|
||||
if !ok {
|
||||
panic("unknown type")
|
||||
}
|
||||
ok = typeEntry.serialize(builders[fid], e)
|
||||
if !ok {
|
||||
return nil, 0, errors.New(fmt.Sprintf("serialize error on type %s", types[fid]))
|
||||
}
|
||||
writers[fid].memorySize += int(typeEntry.sizeof(e))
|
||||
memorySize += typeEntry.sizeof(e)
|
||||
}
|
||||
}
|
||||
arrays := make([]arrow.Array, len(types))
|
||||
fields := make([]arrow.Field, len(types))
|
||||
field2Col := make(map[FieldID]int, len(types))
|
||||
i := 0
|
||||
for fid, builder := range builders {
|
||||
arrays[i] = builder.NewArray()
|
||||
builder.Release()
|
||||
fields[i] = arrow.Field{
|
||||
Name: strconv.Itoa(int(fid)),
|
||||
Type: arrays[i].DataType(),
|
||||
Nullable: true, // No nullable check here.
|
||||
}
|
||||
field2Col[fid] = i
|
||||
i++
|
||||
}
|
||||
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), types, field2Col), memorySize, nil
|
||||
}, batchSize), nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,562 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var _ RecordReader = (*compositeBinlogRecordReader)(nil)
|
||||
|
||||
type compositeBinlogRecordReader struct {
|
||||
blobs [][]*Blob
|
||||
|
||||
blobPos int
|
||||
rrs []array.RecordReader
|
||||
closers []func()
|
||||
fields []FieldID
|
||||
|
||||
r compositeRecord
|
||||
}
|
||||
|
||||
func (crr *compositeBinlogRecordReader) iterateNextBatch() error {
|
||||
if crr.closers != nil {
|
||||
for _, close := range crr.closers {
|
||||
if close != nil {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
crr.blobPos++
|
||||
if crr.blobPos >= len(crr.blobs[0]) {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
for i, b := range crr.blobs {
|
||||
reader, err := NewBinlogReader(b[crr.blobPos].Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
crr.fields[i] = reader.FieldID
|
||||
// TODO: assert schema being the same in every blobs
|
||||
crr.r.schema[reader.FieldID] = reader.PayloadDataType
|
||||
er, err := reader.NextEventReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr, err := er.GetArrowRecordReader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
crr.rrs[i] = rr
|
||||
crr.closers[i] = func() {
|
||||
rr.Release()
|
||||
er.Close()
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeBinlogRecordReader) Next() error {
|
||||
if crr.rrs == nil {
|
||||
if crr.blobs == nil || len(crr.blobs) == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
crr.rrs = make([]array.RecordReader, len(crr.blobs))
|
||||
crr.closers = make([]func(), len(crr.blobs))
|
||||
crr.blobPos = -1
|
||||
crr.fields = make([]FieldID, len(crr.rrs))
|
||||
crr.r = compositeRecord{
|
||||
recs: make(map[FieldID]arrow.Record, len(crr.rrs)),
|
||||
schema: make(map[FieldID]schemapb.DataType, len(crr.rrs)),
|
||||
}
|
||||
if err := crr.iterateNextBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
composeRecord := func() bool {
|
||||
for i, rr := range crr.rrs {
|
||||
if ok := rr.Next(); !ok {
|
||||
return false
|
||||
}
|
||||
// compose record
|
||||
crr.r.recs[crr.fields[i]] = rr.Record()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Try compose records
|
||||
if ok := composeRecord(); !ok {
|
||||
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
|
||||
if err := crr.iterateNextBatch(); err != nil {
|
||||
return err
|
||||
}
|
||||
// If iterate next batch success, try compose again
|
||||
if ok := composeRecord(); !ok {
|
||||
// If the next blob is empty, return io.EOF (it's rare).
|
||||
return io.EOF
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (crr *compositeBinlogRecordReader) Record() Record {
|
||||
return &crr.r
|
||||
}
|
||||
|
||||
func (crr *compositeBinlogRecordReader) Close() {
|
||||
for _, close := range crr.closers {
|
||||
if close != nil {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) {
|
||||
if _, _, _, colId, logId, ok := metautil.ParseInsertLogPath(blobKey); ok {
|
||||
return colId, logId
|
||||
}
|
||||
if colId, err := strconv.ParseInt(blobKey, 10, 64); err == nil {
|
||||
// data_codec.go generate single field id as blob key.
|
||||
return colId, 0
|
||||
}
|
||||
return InvalidUniqueID, InvalidUniqueID
|
||||
}
|
||||
|
||||
func newCompositeBinlogRecordReader(blobs []*Blob) (*compositeBinlogRecordReader, error) {
|
||||
blobMap := make(map[FieldID][]*Blob)
|
||||
for _, blob := range blobs {
|
||||
colId, _ := parseBlobKey(blob.Key)
|
||||
if _, exists := blobMap[colId]; !exists {
|
||||
blobMap[colId] = []*Blob{blob}
|
||||
} else {
|
||||
blobMap[colId] = append(blobMap[colId], blob)
|
||||
}
|
||||
}
|
||||
sortedBlobs := make([][]*Blob, 0, len(blobMap))
|
||||
for _, blobsForField := range blobMap {
|
||||
sort.Slice(blobsForField, func(i, j int) bool {
|
||||
_, iLog := parseBlobKey(blobsForField[i].Key)
|
||||
_, jLog := parseBlobKey(blobsForField[j].Key)
|
||||
|
||||
return iLog < jLog
|
||||
})
|
||||
sortedBlobs = append(sortedBlobs, blobsForField)
|
||||
}
|
||||
return &compositeBinlogRecordReader{
|
||||
blobs: sortedBlobs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*DeserializeReader[*Value], error) {
|
||||
reader, err := newCompositeBinlogRecordReader(blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewDeserializeReader(reader, func(r Record, v []*Value) error {
|
||||
// Note: the return value `Value` is reused.
|
||||
for i := 0; i < r.Len(); i++ {
|
||||
value := v[i]
|
||||
if value == nil {
|
||||
value = &Value{}
|
||||
value.Value = make(map[FieldID]interface{}, len(r.Schema()))
|
||||
v[i] = value
|
||||
}
|
||||
|
||||
m := value.Value.(map[FieldID]interface{})
|
||||
for j, dt := range r.Schema() {
|
||||
if r.Column(j).IsNull(i) {
|
||||
m[j] = nil
|
||||
} else {
|
||||
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
|
||||
if ok {
|
||||
m[j] = d // TODO: avoid memory copy here.
|
||||
} else {
|
||||
return merr.WrapErrServiceInternal(fmt.Sprintf("unexpected type %s", dt))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rowID, ok := m[common.RowIDField].(int64)
|
||||
if !ok {
|
||||
return merr.WrapErrIoKeyNotFound("no row id column found")
|
||||
}
|
||||
value.ID = rowID
|
||||
value.Timestamp = m[common.TimeStampField].(int64)
|
||||
|
||||
pk, err := GenPrimaryKeyByRawData(m[PKfieldID], r.Schema()[PKfieldID])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
value.PK = pk
|
||||
value.IsDeleted = false
|
||||
value.Value = m
|
||||
}
|
||||
return nil
|
||||
}), nil
|
||||
}
|
||||
|
||||
func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
|
||||
reader, err := newCompositeBinlogRecordReader(blobs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
|
||||
var fid FieldID // The only fid from delete file
|
||||
for k := range r.Schema() {
|
||||
fid = k
|
||||
break
|
||||
}
|
||||
for i := 0; i < r.Len(); i++ {
|
||||
if v[i] == nil {
|
||||
v[i] = &DeleteLog{}
|
||||
}
|
||||
a := r.Column(fid).(*array.String)
|
||||
strVal := a.Value(i)
|
||||
if err = json.Unmarshal([]byte(strVal), v[i]); err != nil {
|
||||
// compatible with versions that only support int64 type primary keys
|
||||
// compatible with fmt.Sprintf("%d,%d", pk, ts)
|
||||
// compatible error info (unmarshal err invalid character ',' after top-level value)
|
||||
splits := strings.Split(strVal, ",")
|
||||
if len(splits) != 2 {
|
||||
return fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
|
||||
}
|
||||
pk, err := strconv.ParseInt(splits[0], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
v[i].Pk = &Int64PrimaryKey{
|
||||
Value: pk,
|
||||
}
|
||||
v[i].Ts, err = strconv.ParseUint(splits[1], 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}), nil
|
||||
}
|
||||
|
||||
type BinlogStreamWriter struct {
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
segmentID UniqueID
|
||||
fieldSchema *schemapb.FieldSchema
|
||||
|
||||
memorySize int // To be updated on the fly
|
||||
|
||||
buf bytes.Buffer
|
||||
rw *singleFieldRecordWriter
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
||||
if bsw.rw != nil {
|
||||
return bsw.rw, nil
|
||||
}
|
||||
|
||||
fid := bsw.fieldSchema.FieldID
|
||||
dim, _ := typeutil.GetDim(bsw.fieldSchema)
|
||||
rw, err := newSingleFieldRecordWriter(fid, arrow.Field{
|
||||
Name: strconv.Itoa(int(fid)),
|
||||
Type: serdeMap[bsw.fieldSchema.DataType].arrowType(int(dim)),
|
||||
Nullable: true, // No nullable check here.
|
||||
}, &bsw.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bsw.rw = rw
|
||||
return rw, nil
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) {
|
||||
if bsw.rw == nil {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
bsw.rw.Close()
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := bsw.writeBinlogHeaders(&b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.Write(bsw.buf.Bytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Blob{
|
||||
Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)),
|
||||
Value: b.Bytes(),
|
||||
RowNum: int64(bsw.rw.numRows),
|
||||
MemorySize: int64(bsw.memorySize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error {
|
||||
// Write magic number
|
||||
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write descriptor
|
||||
de := newDescriptorEvent()
|
||||
de.PayloadDataType = bsw.fieldSchema.DataType
|
||||
de.CollectionID = bsw.collectionID
|
||||
de.PartitionID = bsw.partitionID
|
||||
de.SegmentID = bsw.segmentID
|
||||
de.FieldID = bsw.fieldSchema.FieldID
|
||||
de.StartTimestamp = 0
|
||||
de.EndTimestamp = 0
|
||||
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize))
|
||||
if err := de.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write event header
|
||||
eh := newEventHeader(InsertEventType)
|
||||
// Write event data
|
||||
ev := newInsertEventData()
|
||||
ev.StartTimestamp = 1
|
||||
ev.EndTimestamp = 1
|
||||
eh.EventLength = int32(bsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
|
||||
// eh.NextPosition = eh.EventLength + w.Offset()
|
||||
if err := eh.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ev.WriteEventData(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBinlogStreamWriters(collectionID, partitionID, segmentID UniqueID,
|
||||
schema []*schemapb.FieldSchema,
|
||||
) map[FieldID]*BinlogStreamWriter {
|
||||
bws := make(map[FieldID]*BinlogStreamWriter, len(schema))
|
||||
for _, f := range schema {
|
||||
bws[f.FieldID] = &BinlogStreamWriter{
|
||||
collectionID: collectionID,
|
||||
partitionID: partitionID,
|
||||
segmentID: segmentID,
|
||||
fieldSchema: f,
|
||||
}
|
||||
}
|
||||
return bws
|
||||
}
|
||||
|
||||
func NewBinlogSerializeWriter(schema *schemapb.CollectionSchema, partitionID, segmentID UniqueID,
|
||||
eventWriters map[FieldID]*BinlogStreamWriter, batchSize int,
|
||||
) (*SerializeWriter[*Value], error) {
|
||||
rws := make(map[FieldID]RecordWriter, len(eventWriters))
|
||||
for fid := range eventWriters {
|
||||
w := eventWriters[fid]
|
||||
rw, err := w.GetRecordWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rws[fid] = rw
|
||||
}
|
||||
compositeRecordWriter := newCompositeRecordWriter(rws)
|
||||
return NewSerializeRecordWriter[*Value](compositeRecordWriter, func(v []*Value) (Record, uint64, error) {
|
||||
builders := make(map[FieldID]array.Builder, len(schema.Fields))
|
||||
types := make(map[FieldID]schemapb.DataType, len(schema.Fields))
|
||||
for _, f := range schema.Fields {
|
||||
dim, _ := typeutil.GetDim(f)
|
||||
builders[f.FieldID] = array.NewBuilder(memory.DefaultAllocator, serdeMap[f.DataType].arrowType(int(dim)))
|
||||
types[f.FieldID] = f.DataType
|
||||
}
|
||||
|
||||
var memorySize uint64
|
||||
for _, vv := range v {
|
||||
m := vv.Value.(map[FieldID]any)
|
||||
|
||||
for fid, e := range m {
|
||||
typeEntry, ok := serdeMap[types[fid]]
|
||||
if !ok {
|
||||
panic("unknown type")
|
||||
}
|
||||
ok = typeEntry.serialize(builders[fid], e)
|
||||
if !ok {
|
||||
return nil, 0, merr.WrapErrServiceInternal(fmt.Sprintf("serialize error on type %s", types[fid]))
|
||||
}
|
||||
eventWriters[fid].memorySize += int(typeEntry.sizeof(e))
|
||||
memorySize += typeEntry.sizeof(e)
|
||||
}
|
||||
}
|
||||
arrays := make([]arrow.Array, len(types))
|
||||
fields := make([]arrow.Field, len(types))
|
||||
field2Col := make(map[FieldID]int, len(types))
|
||||
i := 0
|
||||
for fid, builder := range builders {
|
||||
arrays[i] = builder.NewArray()
|
||||
builder.Release()
|
||||
fields[i] = arrow.Field{
|
||||
Name: strconv.Itoa(int(fid)),
|
||||
Type: arrays[i].DataType(),
|
||||
Nullable: true, // No nullable check here.
|
||||
}
|
||||
field2Col[fid] = i
|
||||
i++
|
||||
}
|
||||
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), types, field2Col), memorySize, nil
|
||||
}, batchSize), nil
|
||||
}
|
||||
|
||||
type DeltalogStreamWriter struct {
|
||||
collectionID UniqueID
|
||||
partitionID UniqueID
|
||||
segmentID UniqueID
|
||||
|
||||
memorySize int // To be updated on the fly
|
||||
buf bytes.Buffer
|
||||
rw *singleFieldRecordWriter
|
||||
}
|
||||
|
||||
func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
||||
if dsw.rw != nil {
|
||||
return dsw.rw, nil
|
||||
}
|
||||
|
||||
rw, err := newSingleFieldRecordWriter(0, arrow.Field{
|
||||
Name: "delta",
|
||||
Type: arrow.BinaryTypes.String,
|
||||
Nullable: false,
|
||||
}, &dsw.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dsw.rw = rw
|
||||
return rw, nil
|
||||
}
|
||||
|
||||
func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
|
||||
if dsw.rw == nil {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
dsw.rw.Close()
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := dsw.writeDeltalogHeaders(&b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Blob{
|
||||
Value: b.Bytes(),
|
||||
RowNum: int64(dsw.rw.numRows),
|
||||
MemorySize: int64(dsw.memorySize),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
|
||||
// Write magic number
|
||||
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write descriptor
|
||||
de := newDescriptorEvent()
|
||||
de.PayloadDataType = schemapb.DataType_String
|
||||
de.CollectionID = dsw.collectionID
|
||||
de.PartitionID = dsw.partitionID
|
||||
de.SegmentID = dsw.segmentID
|
||||
de.StartTimestamp = 0
|
||||
de.EndTimestamp = 0
|
||||
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
|
||||
if err := de.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
// Write event header
|
||||
eh := newEventHeader(DeleteEventType)
|
||||
// Write event data
|
||||
ev := newDeleteEventData()
|
||||
ev.StartTimestamp = 1
|
||||
ev.EndTimestamp = 1
|
||||
eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
|
||||
// eh.NextPosition = eh.EventLength + w.Offset()
|
||||
if err := eh.Write(w); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ev.WriteEventData(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriter {
|
||||
return &DeltalogStreamWriter{
|
||||
collectionID: collectionID,
|
||||
partitionID: partitionID,
|
||||
segmentID: segmentID,
|
||||
}
|
||||
}
|
||||
|
||||
func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *DeltalogStreamWriter, batchSize int,
|
||||
) (*SerializeWriter[*DeleteLog], error) {
|
||||
rws := make(map[FieldID]RecordWriter, 1)
|
||||
rw, err := eventWriter.GetRecordWriter()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rws[0] = rw
|
||||
compositeRecordWriter := newCompositeRecordWriter(rws)
|
||||
return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
|
||||
builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String)
|
||||
|
||||
var memorySize uint64
|
||||
for _, vv := range v {
|
||||
strVal, err := json.Marshal(vv)
|
||||
if err != nil {
|
||||
return nil, memorySize, err
|
||||
}
|
||||
|
||||
builder.AppendValueFromString(string(strVal))
|
||||
memorySize += uint64(len(strVal))
|
||||
}
|
||||
arr := []arrow.Array{builder.NewArray()}
|
||||
field := []arrow.Field{{
|
||||
Name: "delta",
|
||||
Type: arrow.BinaryTypes.String,
|
||||
Nullable: false,
|
||||
}}
|
||||
field2Col := map[FieldID]int{
|
||||
0: 0,
|
||||
}
|
||||
schema := map[FieldID]schemapb.DataType{
|
||||
0: schemapb.DataType_String,
|
||||
}
|
||||
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil
|
||||
}, batchSize), nil
|
||||
}
|
|
@ -0,0 +1,345 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/apache/arrow/go/v12/parquet/file"
|
||||
"github.com/apache/arrow/go/v12/parquet/pqarrow"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
func TestBinlogDeserializeReader(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
|
||||
// blobs := generateTestData(t, 0)
|
||||
// reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
// assert.NoError(t, err)
|
||||
// err = reader.Next()
|
||||
// assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test deserialize", func(t *testing.T) {
|
||||
size := 3
|
||||
blobs, err := generateTestData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBinlogStreamWriter(t *testing.T) {
|
||||
t.Run("test write", func(t *testing.T) {
|
||||
size := 3
|
||||
|
||||
field := arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}
|
||||
var w bytes.Buffer
|
||||
rw, err := newSingleFieldRecordWriter(1, field, &w)
|
||||
assert.NoError(t, err)
|
||||
|
||||
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
||||
builder.AppendValues([]bool{true, false, true}, nil)
|
||||
arr := builder.NewArray()
|
||||
defer arr.Release()
|
||||
ar := array.NewRecord(
|
||||
arrow.NewSchema(
|
||||
[]arrow.Field{field},
|
||||
nil,
|
||||
),
|
||||
[]arrow.Array{arr},
|
||||
int64(size),
|
||||
)
|
||||
r := newSimpleArrowRecord(ar, map[FieldID]schemapb.DataType{1: schemapb.DataType_Bool}, map[FieldID]int{1: 0})
|
||||
defer r.Release()
|
||||
err = rw.Write(r)
|
||||
assert.NoError(t, err)
|
||||
rw.Close()
|
||||
|
||||
reader, err := file.NewParquetReader(bytes.NewReader(w.Bytes()))
|
||||
assert.NoError(t, err)
|
||||
arrowReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
|
||||
assert.NoError(t, err)
|
||||
rr, err := arrowReader.GetRecordReader(context.Background(), nil, nil)
|
||||
assert.NoError(t, err)
|
||||
defer rr.Release()
|
||||
ok := rr.Next()
|
||||
assert.True(t, ok)
|
||||
rec := rr.Record()
|
||||
defer rec.Release()
|
||||
assert.Equal(t, int64(size), rec.NumRows())
|
||||
ok = rr.Next()
|
||||
assert.False(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBinlogSerializeWriter(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test serialize", func(t *testing.T) {
|
||||
size := 16
|
||||
blobs, err := generateTestData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
schema := generateTestSchema()
|
||||
// Copy write the generated data
|
||||
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
err := writer.Write(value)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
err = writer.Close()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, writer.WrittenMemorySize() >= 429)
|
||||
|
||||
// Read from the written data
|
||||
newblobs := make([]*Blob, len(writers))
|
||||
i := 0
|
||||
for _, w := range writers {
|
||||
blob, err := w.Finalize()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, blob)
|
||||
assert.True(t, blob.MemorySize > 0)
|
||||
newblobs[i] = blob
|
||||
i++
|
||||
}
|
||||
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
|
||||
reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err, i)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNull(t *testing.T) {
|
||||
t.Run("test null", func(t *testing.T) {
|
||||
schema := generateTestSchema()
|
||||
// Copy write the generated data
|
||||
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024)
|
||||
assert.NoError(t, err)
|
||||
|
||||
m := make(map[FieldID]any)
|
||||
m[common.RowIDField] = int64(0)
|
||||
m[common.TimeStampField] = int64(0)
|
||||
m[10] = nil
|
||||
m[11] = nil
|
||||
m[12] = nil
|
||||
m[13] = nil
|
||||
m[14] = nil
|
||||
m[15] = nil
|
||||
m[16] = nil
|
||||
m[17] = nil
|
||||
m[18] = nil
|
||||
m[19] = nil
|
||||
m[101] = nil
|
||||
m[102] = nil
|
||||
m[103] = nil
|
||||
m[104] = nil
|
||||
m[105] = nil
|
||||
m[106] = nil
|
||||
pk, err := GenPrimaryKeyByRawData(m[common.RowIDField], schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := &Value{
|
||||
ID: 0,
|
||||
PK: pk,
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: m,
|
||||
}
|
||||
writer.Write(value)
|
||||
err = writer.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Read from the written data
|
||||
blobs := make([]*Blob, len(writers))
|
||||
i := 0
|
||||
for _, w := range writers {
|
||||
blob, err := w.Finalize()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, blob)
|
||||
blobs[i] = blob
|
||||
i++
|
||||
}
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
readValue := reader.Value()
|
||||
assert.Equal(t, value, readValue)
|
||||
})
|
||||
}
|
||||
|
||||
func generateTestDeltalogData(size int) (*Blob, error) {
|
||||
codec := NewDeleteCodec()
|
||||
pks := make([]int64, size)
|
||||
tss := make([]uint64, size)
|
||||
for i := 0; i < size; i++ {
|
||||
pks[i] = int64(i)
|
||||
tss[i] = uint64(i + 1)
|
||||
}
|
||||
data := &DeleteData{}
|
||||
for i := range pks {
|
||||
data.Append(NewInt64PrimaryKey(pks[i]), tss[i])
|
||||
}
|
||||
return codec.Serialize(0, 0, 0, data)
|
||||
}
|
||||
|
||||
func assertTestDeltalogData(t *testing.T, i int, value *DeleteLog) {
|
||||
assert.Equal(t, &Int64PrimaryKey{int64(i)}, value.Pk)
|
||||
assert.Equal(t, uint64(i+1), value.Ts)
|
||||
}
|
||||
|
||||
func TestDeltalogDeserializeReader(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewDeltalogDeserializeReader(nil)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test deserialize", func(t *testing.T) {
|
||||
size := 3
|
||||
blob, err := generateTestDeltalogData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewDeltalogDeserializeReader([]*Blob{blob})
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestDeltalogData(t, i, value)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeltalogSerializeWriter(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewDeltalogDeserializeReader(nil)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test serialize", func(t *testing.T) {
|
||||
size := 16
|
||||
blob, err := generateTestDeltalogData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewDeltalogDeserializeReader([]*Blob{blob})
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
// Copy write the generated data
|
||||
eventWriter := NewDeltalogStreamWriter(0, 0, 0)
|
||||
writer, err := NewDeltalogSerializeWriter(0, 0, eventWriter, 7)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestDeltalogData(t, i, value)
|
||||
err := writer.Write(value)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
err = writer.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Read from the written data
|
||||
newblob, err := eventWriter.Finalize()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, newblob)
|
||||
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
|
||||
reader, err = NewDeltalogDeserializeReader([]*Blob{newblob})
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
for i := 0; i < size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err, i)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestDeltalogData(t, i, value)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -17,229 +17,18 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/apache/arrow/go/v12/arrow"
|
||||
"github.com/apache/arrow/go/v12/arrow/array"
|
||||
"github.com/apache/arrow/go/v12/arrow/memory"
|
||||
"github.com/apache/arrow/go/v12/parquet/file"
|
||||
"github.com/apache/arrow/go/v12/parquet/pqarrow"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
func TestBinlogDeserializeReader(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
|
||||
// blobs := generateTestData(t, 0)
|
||||
// reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
// assert.NoError(t, err)
|
||||
// err = reader.Next()
|
||||
// assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test deserialize", func(t *testing.T) {
|
||||
size := 3
|
||||
blobs, err := generateTestData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBinlogStreamWriter(t *testing.T) {
|
||||
t.Run("test write", func(t *testing.T) {
|
||||
size := 3
|
||||
|
||||
field := arrow.Field{Name: "bool", Type: arrow.FixedWidthTypes.Boolean}
|
||||
var w bytes.Buffer
|
||||
rw, err := newSingleFieldRecordWriter(1, field, &w)
|
||||
assert.NoError(t, err)
|
||||
|
||||
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
||||
builder.AppendValues([]bool{true, false, true}, nil)
|
||||
arr := builder.NewArray()
|
||||
defer arr.Release()
|
||||
ar := array.NewRecord(
|
||||
arrow.NewSchema(
|
||||
[]arrow.Field{field},
|
||||
nil,
|
||||
),
|
||||
[]arrow.Array{arr},
|
||||
int64(size),
|
||||
)
|
||||
r := newSimpleArrowRecord(ar, map[FieldID]schemapb.DataType{1: schemapb.DataType_Bool}, map[FieldID]int{1: 0})
|
||||
defer r.Release()
|
||||
err = rw.Write(r)
|
||||
assert.NoError(t, err)
|
||||
rw.Close()
|
||||
|
||||
reader, err := file.NewParquetReader(bytes.NewReader(w.Bytes()))
|
||||
assert.NoError(t, err)
|
||||
arrowReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
|
||||
assert.NoError(t, err)
|
||||
rr, err := arrowReader.GetRecordReader(context.Background(), nil, nil)
|
||||
assert.NoError(t, err)
|
||||
defer rr.Release()
|
||||
ok := rr.Next()
|
||||
assert.True(t, ok)
|
||||
rec := rr.Record()
|
||||
defer rec.Release()
|
||||
assert.Equal(t, int64(size), rec.NumRows())
|
||||
ok = rr.Next()
|
||||
assert.False(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBinlogSerializeWriter(t *testing.T) {
|
||||
t.Run("test empty data", func(t *testing.T) {
|
||||
reader, err := NewBinlogDeserializeReader(nil, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
})
|
||||
|
||||
t.Run("test serialize", func(t *testing.T) {
|
||||
size := 16
|
||||
blobs, err := generateTestData(size)
|
||||
assert.NoError(t, err)
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
schema := generateTestSchema()
|
||||
// Copy write the generated data
|
||||
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
err := writer.Write(value)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = reader.Next()
|
||||
assert.Equal(t, io.EOF, err)
|
||||
err = writer.Close()
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, writer.WrittenMemorySize() >= 429)
|
||||
|
||||
// Read from the written data
|
||||
newblobs := make([]*Blob, len(writers))
|
||||
i := 0
|
||||
for _, w := range writers {
|
||||
blob, err := w.Finalize()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, blob)
|
||||
assert.True(t, blob.MemorySize > 0)
|
||||
newblobs[i] = blob
|
||||
i++
|
||||
}
|
||||
// assert.Equal(t, blobs[0].Value, newblobs[0].Value)
|
||||
reader, err = NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
for i := 1; i <= size; i++ {
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err, i)
|
||||
|
||||
value := reader.Value()
|
||||
assertTestData(t, i, value)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNull(t *testing.T) {
|
||||
t.Run("test null", func(t *testing.T) {
|
||||
schema := generateTestSchema()
|
||||
// Copy write the generated data
|
||||
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
|
||||
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024)
|
||||
assert.NoError(t, err)
|
||||
|
||||
m := make(map[FieldID]any)
|
||||
m[common.RowIDField] = int64(0)
|
||||
m[common.TimeStampField] = int64(0)
|
||||
m[10] = nil
|
||||
m[11] = nil
|
||||
m[12] = nil
|
||||
m[13] = nil
|
||||
m[14] = nil
|
||||
m[15] = nil
|
||||
m[16] = nil
|
||||
m[17] = nil
|
||||
m[18] = nil
|
||||
m[19] = nil
|
||||
m[101] = nil
|
||||
m[102] = nil
|
||||
m[103] = nil
|
||||
m[104] = nil
|
||||
m[105] = nil
|
||||
m[106] = nil
|
||||
pk, err := GenPrimaryKeyByRawData(m[common.RowIDField], schemapb.DataType_Int64)
|
||||
assert.NoError(t, err)
|
||||
|
||||
value := &Value{
|
||||
ID: 0,
|
||||
PK: pk,
|
||||
Timestamp: 0,
|
||||
IsDeleted: false,
|
||||
Value: m,
|
||||
}
|
||||
writer.Write(value)
|
||||
err = writer.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Read from the written data
|
||||
blobs := make([]*Blob, len(writers))
|
||||
i := 0
|
||||
for _, w := range writers {
|
||||
blob, err := w.Finalize()
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, blob)
|
||||
blobs[i] = blob
|
||||
i++
|
||||
}
|
||||
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
|
||||
assert.NoError(t, err)
|
||||
defer reader.Close()
|
||||
err = reader.Next()
|
||||
assert.NoError(t, err)
|
||||
|
||||
readValue := reader.Value()
|
||||
assert.Equal(t, value, readValue)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSerDe(t *testing.T) {
|
||||
type args struct {
|
||||
dt schemapb.DataType
|
||||
|
|
Loading…
Reference in New Issue