mirror of https://github.com/milvus-io/milvus.git
enhance: [cherry-pick] Batch pick PRs related to data codec (#34345)
This PR cherry-picks the following commits related to data codec - Fix data codec writer close. #33818 - Legacy code clean up. #33838 issue: #33813 #33839 pr: #33818 #33838 Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>pull/34475/head
parent
b33243f236
commit
dd4dfbcd8d
|
@ -127,69 +127,6 @@ func (itr *InsertBinlogIterator) isDisposed() bool {
|
||||||
return atomic.LoadInt32(&itr.dispose) == 1
|
return atomic.LoadInt32(&itr.dispose) == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
type DeltalogIterator struct {
|
|
||||||
dispose int32
|
|
||||||
values []*Value
|
|
||||||
pos int
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDeltalogIterator(blob *Blob) (*DeltalogIterator, error) {
|
|
||||||
deltaCodec := NewDeleteCodec()
|
|
||||||
_, _, serData, err := deltaCodec.Deserialize(blob)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
values := make([]*Value, 0, len(serData.Data))
|
|
||||||
for pkstr, ts := range serData.Data {
|
|
||||||
pk, err := strconv.ParseInt(pkstr, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
values = append(values, &Value{pk, ts, true, nil})
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(values, func(i, j int) bool { return values[i].id < values[j].id })
|
|
||||||
|
|
||||||
return &DeltalogIterator{values: values}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasNext returns true if the iterator have unread record
|
|
||||||
func (itr *DeltalogIterator) HasNext() bool {
|
|
||||||
return !itr.isDisposed() && itr.hasNext()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next returns the next record
|
|
||||||
func (itr *DeltalogIterator) Next() (interface{}, error) {
|
|
||||||
if itr.isDisposed() {
|
|
||||||
return nil, ErrDisposed
|
|
||||||
}
|
|
||||||
|
|
||||||
if !itr.hasNext() {
|
|
||||||
return nil, ErrNoMoreRecord
|
|
||||||
}
|
|
||||||
|
|
||||||
tmp := itr.values[itr.pos]
|
|
||||||
itr.pos++
|
|
||||||
return tmp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispose disposes the iterator
|
|
||||||
func (itr *DeltalogIterator) Dispose() {
|
|
||||||
atomic.CompareAndSwapInt32(&itr.dispose, 0, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (itr *DeltalogIterator) hasNext() bool {
|
|
||||||
return itr.pos < len(itr.values)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (itr *DeltalogIterator) isDisposed() bool {
|
|
||||||
return atomic.LoadInt32(&itr.dispose) == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
// MergeIterator merge iterators.
|
// MergeIterator merge iterators.
|
||||||
type MergeIterator struct {
|
type MergeIterator struct {
|
||||||
disposed int32
|
disposed int32
|
||||||
|
@ -278,156 +215,3 @@ func (itr *MergeIterator) hasNext() bool {
|
||||||
itr.nextRecord = minRecord
|
itr.nextRecord = minRecord
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func NewInsertlogMergeIterator(blobs [][]*Blob) (*MergeIterator, error) {
|
|
||||||
iterators := make([]Iterator, 0, len(blobs))
|
|
||||||
for _, fieldBlobs := range blobs {
|
|
||||||
itr, err := NewInsertBinlogIterator(fieldBlobs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
iterators = append(iterators, itr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return NewMergeIterator(iterators), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewDeltalogMergeIterator(blobs []*Blob) (*MergeIterator, error) {
|
|
||||||
iterators := make([]Iterator, 0, len(blobs))
|
|
||||||
for _, blob := range blobs {
|
|
||||||
itr, err := NewDeltalogIterator(blob)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
iterators = append(iterators, itr)
|
|
||||||
}
|
|
||||||
return NewMergeIterator(iterators), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type MergeSingleSegmentIterator struct {
|
|
||||||
disposed int32
|
|
||||||
insertItr Iterator
|
|
||||||
deltaItr Iterator
|
|
||||||
timetravel int64
|
|
||||||
nextRecord *Value
|
|
||||||
insertTmpRecord *Value
|
|
||||||
deltaTmpRecord *Value
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMergeSingleSegmentIterator(insertBlobs [][]*Blob, deltaBlobs []*Blob, timetravel int64) (*MergeSingleSegmentIterator, error) {
|
|
||||||
insertMergeItr, err := NewInsertlogMergeIterator(insertBlobs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
deltaMergeItr, err := NewDeltalogMergeIterator(deltaBlobs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &MergeSingleSegmentIterator{
|
|
||||||
insertItr: insertMergeItr,
|
|
||||||
deltaItr: deltaMergeItr,
|
|
||||||
timetravel: timetravel,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasNext returns true if the iterator have unread record
|
|
||||||
func (itr *MergeSingleSegmentIterator) HasNext() bool {
|
|
||||||
return !itr.isDisposed() && itr.hasNext()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next returns the next record
|
|
||||||
func (itr *MergeSingleSegmentIterator) Next() (interface{}, error) {
|
|
||||||
if itr.isDisposed() {
|
|
||||||
return nil, ErrDisposed
|
|
||||||
}
|
|
||||||
if !itr.hasNext() {
|
|
||||||
return nil, ErrNoMoreRecord
|
|
||||||
}
|
|
||||||
|
|
||||||
tmp := itr.nextRecord
|
|
||||||
itr.nextRecord = nil
|
|
||||||
return tmp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispose disposes the iterator
|
|
||||||
func (itr *MergeSingleSegmentIterator) Dispose() {
|
|
||||||
if itr.isDisposed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if itr.insertItr != nil {
|
|
||||||
itr.insertItr.Dispose()
|
|
||||||
}
|
|
||||||
if itr.deltaItr != nil {
|
|
||||||
itr.deltaItr.Dispose()
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.CompareAndSwapInt32(&itr.disposed, 0, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (itr *MergeSingleSegmentIterator) isDisposed() bool {
|
|
||||||
return atomic.LoadInt32(&itr.disposed) == 1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (itr *MergeSingleSegmentIterator) hasNext() bool {
|
|
||||||
if itr.nextRecord != nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
if itr.insertTmpRecord == nil && itr.insertItr.HasNext() {
|
|
||||||
r, _ := itr.insertItr.Next()
|
|
||||||
itr.insertTmpRecord = r.(*Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
if itr.deltaTmpRecord == nil && itr.deltaItr.HasNext() {
|
|
||||||
r, _ := itr.deltaItr.Next()
|
|
||||||
itr.deltaTmpRecord = r.(*Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
if itr.insertTmpRecord == nil && itr.deltaTmpRecord == nil {
|
|
||||||
return false
|
|
||||||
} else if itr.insertTmpRecord == nil {
|
|
||||||
itr.nextRecord = itr.deltaTmpRecord
|
|
||||||
itr.deltaTmpRecord = nil
|
|
||||||
return true
|
|
||||||
} else if itr.deltaTmpRecord == nil {
|
|
||||||
itr.nextRecord = itr.insertTmpRecord
|
|
||||||
itr.insertTmpRecord = nil
|
|
||||||
return true
|
|
||||||
} else {
|
|
||||||
// merge records
|
|
||||||
if itr.insertTmpRecord.timestamp >= itr.timetravel {
|
|
||||||
itr.nextRecord = itr.insertTmpRecord
|
|
||||||
itr.insertTmpRecord = nil
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if itr.deltaTmpRecord.timestamp >= itr.timetravel {
|
|
||||||
itr.nextRecord = itr.deltaTmpRecord
|
|
||||||
itr.deltaTmpRecord = nil
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if itr.insertTmpRecord.id < itr.deltaTmpRecord.id {
|
|
||||||
itr.nextRecord = itr.insertTmpRecord
|
|
||||||
itr.insertTmpRecord = nil
|
|
||||||
return true
|
|
||||||
} else if itr.insertTmpRecord.id > itr.deltaTmpRecord.id {
|
|
||||||
itr.deltaTmpRecord = nil
|
|
||||||
continue
|
|
||||||
} else if itr.insertTmpRecord.id == itr.deltaTmpRecord.id {
|
|
||||||
if itr.insertTmpRecord.timestamp <= itr.deltaTmpRecord.timestamp {
|
|
||||||
itr.insertTmpRecord = nil
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
itr.deltaTmpRecord = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
|
@ -248,25 +248,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
||||||
var dim int64
|
var dim int64
|
||||||
if typeutil.IsVectorType(field.DataType) {
|
if typeutil.IsVectorType(field.DataType) {
|
||||||
switch field.DataType {
|
switch field.DataType {
|
||||||
case schemapb.DataType_FloatVector:
|
case schemapb.DataType_FloatVector,
|
||||||
dim, err = typeutil.GetDim(field)
|
schemapb.DataType_BinaryVector,
|
||||||
if err != nil {
|
schemapb.DataType_Float16Vector,
|
||||||
return nil, err
|
schemapb.DataType_BFloat16Vector:
|
||||||
}
|
|
||||||
eventWriter, err = writer.NextInsertEventWriter(int(dim))
|
|
||||||
case schemapb.DataType_BinaryVector:
|
|
||||||
dim, err = typeutil.GetDim(field)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
eventWriter, err = writer.NextInsertEventWriter(int(dim))
|
|
||||||
case schemapb.DataType_Float16Vector:
|
|
||||||
dim, err = typeutil.GetDim(field)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
eventWriter, err = writer.NextInsertEventWriter(int(dim))
|
|
||||||
case schemapb.DataType_BFloat16Vector:
|
|
||||||
dim, err = typeutil.GetDim(field)
|
dim, err = typeutil.GetDim(field)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -293,136 +278,12 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
||||||
|
|
||||||
blockMemorySize := singleData.GetMemorySize()
|
blockMemorySize := singleData.GetMemorySize()
|
||||||
memorySize += int64(blockMemorySize)
|
memorySize += int64(blockMemorySize)
|
||||||
|
if err = AddFieldDataToPayload(eventWriter, field.DataType, singleData); err != nil {
|
||||||
switch field.DataType {
|
eventWriter.Close()
|
||||||
case schemapb.DataType_Bool:
|
writer.Close()
|
||||||
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Int8:
|
|
||||||
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Int16:
|
|
||||||
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Int32:
|
|
||||||
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Int64:
|
|
||||||
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Float:
|
|
||||||
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Double:
|
|
||||||
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
|
||||||
for _, singleString := range singleData.(*StringFieldData).Data {
|
|
||||||
err = eventWriter.AddOneStringToPayload(singleString)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Array:
|
|
||||||
for _, singleArray := range singleData.(*ArrayFieldData).Data {
|
|
||||||
err = eventWriter.AddOneArrayToPayload(singleArray)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_JSON:
|
|
||||||
for _, singleJSON := range singleData.(*JSONFieldData).Data {
|
|
||||||
err = eventWriter.AddOneJSONToPayload(singleJSON)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_BinaryVector:
|
|
||||||
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_FloatVector:
|
|
||||||
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_Float16Vector:
|
|
||||||
err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim)
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_BFloat16Vector:
|
|
||||||
err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim)
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
case schemapb.DataType_SparseFloatVector:
|
|
||||||
err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData))
|
|
||||||
if err != nil {
|
|
||||||
eventWriter.Close()
|
|
||||||
writer.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("undefined data type %d", field.DataType)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
|
||||||
writer.SetEventTimeStamp(startTs, endTs)
|
writer.SetEventTimeStamp(startTs, endTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,6 +314,81 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
|
||||||
return blobs, nil
|
return blobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.DataType, singleData FieldData) error {
|
||||||
|
var err error
|
||||||
|
switch dataType {
|
||||||
|
case schemapb.DataType_Bool:
|
||||||
|
if err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Int8:
|
||||||
|
if err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Int16:
|
||||||
|
if err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Int32:
|
||||||
|
if err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Int64:
|
||||||
|
if err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Float:
|
||||||
|
if err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Double:
|
||||||
|
if err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||||
|
for _, singleString := range singleData.(*StringFieldData).Data {
|
||||||
|
if err = eventWriter.AddOneStringToPayload(singleString); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Array:
|
||||||
|
for _, singleArray := range singleData.(*ArrayFieldData).Data {
|
||||||
|
if err = eventWriter.AddOneArrayToPayload(singleArray); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case schemapb.DataType_JSON:
|
||||||
|
for _, singleJSON := range singleData.(*JSONFieldData).Data {
|
||||||
|
if err = eventWriter.AddOneJSONToPayload(singleJSON); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case schemapb.DataType_BinaryVector:
|
||||||
|
if err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_FloatVector:
|
||||||
|
if err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_Float16Vector:
|
||||||
|
if err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_BFloat16Vector:
|
||||||
|
if err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case schemapb.DataType_SparseFloatVector:
|
||||||
|
if err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("undefined data type %d", dataType)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
|
func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
|
||||||
collectionID UniqueID,
|
collectionID UniqueID,
|
||||||
partitionID UniqueID,
|
partitionID UniqueID,
|
||||||
|
@ -845,31 +781,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
||||||
return collectionID, partitionID, segmentID, nil
|
return collectionID, partitionID, segmentID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// func deserializeEntity[T any, U any](
|
|
||||||
// eventReader *EventReader,
|
|
||||||
// binlogReader *BinlogReader,
|
|
||||||
// insertData *InsertData,
|
|
||||||
// getPayloadFunc func() (U, error),
|
|
||||||
// fillDataFunc func() FieldData,
|
|
||||||
// ) error {
|
|
||||||
// fieldID := binlogReader.FieldID
|
|
||||||
// stringPayload, err := getPayloadFunc()
|
|
||||||
// if err != nil {
|
|
||||||
// eventReader.Close()
|
|
||||||
// binlogReader.Close()
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if insertData.Data[fieldID] == nil {
|
|
||||||
// insertData.Data[fieldID] = fillDataFunc()
|
|
||||||
// }
|
|
||||||
// stringFieldData := insertData.Data[fieldID].(*T)
|
|
||||||
//
|
|
||||||
// stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
|
|
||||||
// totalLength += len(stringPayload)
|
|
||||||
// insertData.Data[fieldID] = stringFieldData
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Deserialize transfer blob back to insert data.
|
// Deserialize transfer blob back to insert data.
|
||||||
// From schema, it get all fields.
|
// From schema, it get all fields.
|
||||||
// For each field, it will create a binlog reader, and read all event to the buffer.
|
// For each field, it will create a binlog reader, and read all event to the buffer.
|
||||||
|
|
|
@ -918,3 +918,51 @@ func TestDeleteData(t *testing.T) {
|
||||||
assert.EqualValues(t, dData.Size(), 72)
|
assert.EqualValues(t, dData.Size(), 72)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAddFieldDataToPayload(t *testing.T) {
|
||||||
|
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
|
||||||
|
e, _ := w.NextInsertEventWriter()
|
||||||
|
var err error
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Int8, &Int8FieldData{[]int8{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Int16, &Int16FieldData{[]int16{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Int32, &Int32FieldData{[]int32{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Int64, &Int64FieldData{[]int64{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Float, &FloatFieldData{[]float32{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Double, &DoubleFieldData{[]float64{}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_String, &StringFieldData{[]string{"test"}, schemapb.DataType_VarChar})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Array, &ArrayFieldData{
|
||||||
|
ElementType: schemapb.DataType_VarChar,
|
||||||
|
Data: []*schemapb.ScalarField{{
|
||||||
|
Data: &schemapb.ScalarField_IntData{
|
||||||
|
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_JSON, &JSONFieldData{[][]byte{[]byte(`"batch":2}`)}})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_BinaryVector, &BinaryVectorFieldData{[]byte{}, 8})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_FloatVector, &FloatVectorFieldData{[]float32{}, 4})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_Float16Vector, &Float16VectorFieldData{[]byte{}, 4})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_BFloat16Vector, &BFloat16VectorFieldData{[]byte{}, 8})
|
||||||
|
assert.Error(t, err)
|
||||||
|
err = AddFieldDataToPayload(e, schemapb.DataType_SparseFloatVector, &SparseFloatVectorFieldData{
|
||||||
|
SparseFloatArray: schemapb.SparseFloatArray{
|
||||||
|
Dim: 0,
|
||||||
|
Contents: [][]byte{},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
|
@ -243,7 +243,7 @@ func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return errors.New("can't add empty msgs into int64 payload")
|
return errors.New("can't add empty msgs into int16 payload")
|
||||||
}
|
}
|
||||||
|
|
||||||
builder, ok := w.builder.(*array.Int16Builder)
|
builder, ok := w.builder.(*array.Int16Builder)
|
||||||
|
|
|
@ -982,44 +982,6 @@ func binaryWrite(endian binary.ByteOrder, data interface{}) ([]byte, error) {
|
||||||
return buf.Bytes(), nil
|
return buf.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FieldDataToBytes encode field data to byte slice.
|
|
||||||
// For some fixed-length data, such as int32, int64, float vector, use binary.Write directly.
|
|
||||||
// For binary vector, return it directly.
|
|
||||||
// For bool data, first transfer to schemapb.BoolArray and then marshal it. (TODO: handle bool like other scalar data.)
|
|
||||||
// For variable-length data, such as string, first transfer to schemapb.StringArray and then marshal it.
|
|
||||||
// TODO: find a proper way to store variable-length data. Or we should unify to use protobuf?
|
|
||||||
func FieldDataToBytes(endian binary.ByteOrder, fieldData FieldData) ([]byte, error) {
|
|
||||||
switch field := fieldData.(type) {
|
|
||||||
case *BoolFieldData:
|
|
||||||
// return binaryWrite(endian, field.Data)
|
|
||||||
return boolFieldDataToPbBytes(field)
|
|
||||||
case *StringFieldData:
|
|
||||||
return stringFieldDataToPbBytes(field)
|
|
||||||
case *ArrayFieldData:
|
|
||||||
return arrayFieldDataToPbBytes(field)
|
|
||||||
case *JSONFieldData:
|
|
||||||
return jsonFieldDataToPbBytes(field)
|
|
||||||
case *BinaryVectorFieldData:
|
|
||||||
return field.Data, nil
|
|
||||||
case *FloatVectorFieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *Int8FieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *Int16FieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *Int32FieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *Int64FieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *FloatFieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
case *DoubleFieldData:
|
|
||||||
return binaryWrite(endian, field.Data)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unsupported field data: %s", field)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error) {
|
func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error) {
|
||||||
insertRecord := &segcorepb.InsertRecord{}
|
insertRecord := &segcorepb.InsertRecord{}
|
||||||
for fieldID, rawData := range insertData.Data {
|
for fieldID, rawData := range insertData.Data {
|
||||||
|
|
|
@ -1551,94 +1551,6 @@ func binaryRead(endian binary.ByteOrder, bs []byte, receiver interface{}) error
|
||||||
return binary.Read(reader, endian, receiver)
|
return binary.Read(reader, endian, receiver)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFieldDataToBytes(t *testing.T) {
|
|
||||||
// TODO: test big endian.
|
|
||||||
endian := common.Endian
|
|
||||||
|
|
||||||
var bs []byte
|
|
||||||
var err error
|
|
||||||
var receiver interface{}
|
|
||||||
|
|
||||||
f1 := &BoolFieldData{Data: []bool{true, false}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f1)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
var barr schemapb.BoolArray
|
|
||||||
err = proto.Unmarshal(bs, &barr)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f1.Data, barr.Data)
|
|
||||||
|
|
||||||
f2 := &StringFieldData{Data: []string{"true", "false"}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f2)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
var sarr schemapb.StringArray
|
|
||||||
err = proto.Unmarshal(bs, &sarr)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f2.Data, sarr.Data)
|
|
||||||
|
|
||||||
f3 := &Int8FieldData{Data: []int8{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f3)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]int8, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f3.Data, receiver)
|
|
||||||
|
|
||||||
f4 := &Int16FieldData{Data: []int16{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f4)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]int16, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f4.Data, receiver)
|
|
||||||
|
|
||||||
f5 := &Int32FieldData{Data: []int32{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f5)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]int32, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f5.Data, receiver)
|
|
||||||
|
|
||||||
f6 := &Int64FieldData{Data: []int64{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f6)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]int64, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f6.Data, receiver)
|
|
||||||
|
|
||||||
// in fact, hard to compare float point value.
|
|
||||||
|
|
||||||
f7 := &FloatFieldData{Data: []float32{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f7)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]float32, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f7.Data, receiver)
|
|
||||||
|
|
||||||
f8 := &DoubleFieldData{Data: []float64{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f8)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]float64, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f8.Data, receiver)
|
|
||||||
|
|
||||||
f9 := &BinaryVectorFieldData{Data: []byte{0, 1, 0}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f9)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f9.Data, bs)
|
|
||||||
|
|
||||||
f10 := &FloatVectorFieldData{Data: []float32{0, 1}}
|
|
||||||
bs, err = FieldDataToBytes(endian, f10)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
receiver = make([]float32, 2)
|
|
||||||
err = binaryRead(endian, bs, receiver)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.ElementsMatch(t, f10.Data, receiver)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestJson(t *testing.T) {
|
func TestJson(t *testing.T) {
|
||||||
extras := make(map[string]string)
|
extras := make(map[string]string)
|
||||||
extras["IndexBuildID"] = "10"
|
extras["IndexBuildID"] = "10"
|
||||||
|
|
Loading…
Reference in New Issue