fix: fix data codec writer close (#33818)

issue:#33813

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
pull/33783/head
shaoting-huang 2024-06-18 13:59:57 +08:00 committed by GitHub
parent e83ecd5074
commit 8cdc0e6233
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 132 additions and 148 deletions

View File

@ -253,25 +253,10 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
var dim int64
if typeutil.IsVectorType(field.DataType) {
switch field.DataType {
case schemapb.DataType_FloatVector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_BinaryVector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_Float16Vector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
}
eventWriter, err = writer.NextInsertEventWriter(int(dim))
case schemapb.DataType_BFloat16Vector:
case schemapb.DataType_FloatVector,
schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector:
dim, err = typeutil.GetDim(field)
if err != nil {
return nil, err
@ -298,136 +283,12 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
blockMemorySize := singleData.GetMemorySize()
memorySize += int64(blockMemorySize)
switch field.DataType {
case schemapb.DataType_Bool:
err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int8:
err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int16:
err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int32:
err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Int64:
err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Float:
err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Double:
err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_String, schemapb.DataType_VarChar:
for _, singleString := range singleData.(*StringFieldData).Data {
err = eventWriter.AddOneStringToPayload(singleString)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Array:
for _, singleArray := range singleData.(*ArrayFieldData).Data {
err = eventWriter.AddOneArrayToPayload(singleArray)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_JSON:
for _, singleJSON := range singleData.(*JSONFieldData).Data {
err = eventWriter.AddOneJSONToPayload(singleJSON)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_BinaryVector:
err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_FloatVector:
err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_Float16Vector:
err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim)
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_BFloat16Vector:
err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
case schemapb.DataType_SparseFloatVector:
err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData))
if err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
default:
return nil, fmt.Errorf("undefined data type %d", field.DataType)
}
if err != nil {
if err = AddFieldDataToPayload(eventWriter, field.DataType, singleData); err != nil {
eventWriter.Close()
writer.Close()
return nil, err
}
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize))
writer.SetEventTimeStamp(startTs, endTs)
}
@ -458,6 +319,81 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique
return blobs, nil
}
func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.DataType, singleData FieldData) error {
var err error
switch dataType {
case schemapb.DataType_Bool:
if err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data); err != nil {
return err
}
case schemapb.DataType_Int8:
if err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data); err != nil {
return err
}
case schemapb.DataType_Int16:
if err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data); err != nil {
return err
}
case schemapb.DataType_Int32:
if err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data); err != nil {
return err
}
case schemapb.DataType_Int64:
if err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data); err != nil {
return err
}
case schemapb.DataType_Float:
if err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data); err != nil {
return err
}
case schemapb.DataType_Double:
if err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data); err != nil {
return err
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
for _, singleString := range singleData.(*StringFieldData).Data {
if err = eventWriter.AddOneStringToPayload(singleString); err != nil {
return err
}
}
case schemapb.DataType_Array:
for _, singleArray := range singleData.(*ArrayFieldData).Data {
if err = eventWriter.AddOneArrayToPayload(singleArray); err != nil {
return err
}
}
case schemapb.DataType_JSON:
for _, singleJSON := range singleData.(*JSONFieldData).Data {
if err = eventWriter.AddOneJSONToPayload(singleJSON); err != nil {
return err
}
}
case schemapb.DataType_BinaryVector:
if err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim); err != nil {
return err
}
case schemapb.DataType_FloatVector:
if err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim); err != nil {
return err
}
case schemapb.DataType_Float16Vector:
if err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim); err != nil {
return err
}
case schemapb.DataType_BFloat16Vector:
if err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim); err != nil {
return err
}
case schemapb.DataType_SparseFloatVector:
if err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)); err != nil {
return err
}
default:
return fmt.Errorf("undefined data type %d", dataType)
}
return nil
}
func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) (
collectionID UniqueID,
partitionID UniqueID,

View File

@ -918,3 +918,51 @@ func TestDeleteData(t *testing.T) {
assert.EqualValues(t, dData.Size(), 72)
})
}
func TestAddFieldDataToPayload(t *testing.T) {
w := NewInsertBinlogWriter(schemapb.DataType_Int64, 10, 20, 30, 40)
e, _ := w.NextInsertEventWriter()
var err error
err = AddFieldDataToPayload(e, schemapb.DataType_Bool, &BoolFieldData{[]bool{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Int8, &Int8FieldData{[]int8{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Int16, &Int16FieldData{[]int16{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Int32, &Int32FieldData{[]int32{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Int64, &Int64FieldData{[]int64{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Float, &FloatFieldData{[]float32{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Double, &DoubleFieldData{[]float64{}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_String, &StringFieldData{[]string{"test"}, schemapb.DataType_VarChar})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Array, &ArrayFieldData{
ElementType: schemapb.DataType_VarChar,
Data: []*schemapb.ScalarField{{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
},
}},
})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_JSON, &JSONFieldData{[][]byte{[]byte(`"batch":2}`)}})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_BinaryVector, &BinaryVectorFieldData{[]byte{}, 8})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_FloatVector, &FloatVectorFieldData{[]float32{}, 4})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Float16Vector, &Float16VectorFieldData{[]byte{}, 4})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_BFloat16Vector, &BFloat16VectorFieldData{[]byte{}, 8})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_SparseFloatVector, &SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: 0,
Contents: [][]byte{},
},
})
assert.Error(t, err)
}

View File

@ -243,7 +243,7 @@ func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error {
}
if len(data) == 0 {
return errors.New("can't add empty msgs into int64 payload")
return errors.New("can't add empty msgs into int16 payload")
}
builder, ok := w.builder.(*array.Int16Builder)