enhance: Refactor data codec deserialize (#33923)

#33922

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
pull/34011/head
shaoting-huang 2024-06-20 11:17:59 +08:00 committed by GitHub
parent 31ef0a1fe8
commit 5f02e52561
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 224 additions and 349 deletions

View File

@ -451,7 +451,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
dataType := binlogReader.PayloadDataType
fieldID := binlogReader.FieldID
totalLength := 0
dim := 0
for {
eventReader, err := binlogReader.NextEventReader()
@ -461,337 +460,19 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if eventReader == nil {
break
}
switch dataType {
case schemapb.DataType_Bool:
singleData, validData, err := eventReader.GetBoolFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &BoolFieldData{
Data: make([]bool, 0, rowNum),
}
}
boolFieldData := insertData.Data[fieldID].(*BoolFieldData)
boolFieldData.Data = append(boolFieldData.Data, singleData...)
boolFieldData.ValidData = append(boolFieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = boolFieldData
case schemapb.DataType_Int8:
singleData, validData, err := eventReader.GetInt8FromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int8FieldData{
Data: make([]int8, 0, rowNum),
}
}
int8FieldData := insertData.Data[fieldID].(*Int8FieldData)
int8FieldData.Data = append(int8FieldData.Data, singleData...)
int8FieldData.ValidData = append(int8FieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = int8FieldData
case schemapb.DataType_Int16:
singleData, validData, err := eventReader.GetInt16FromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int16FieldData{
Data: make([]int16, 0, rowNum),
}
}
int16FieldData := insertData.Data[fieldID].(*Int16FieldData)
int16FieldData.Data = append(int16FieldData.Data, singleData...)
int16FieldData.ValidData = append(int16FieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = int16FieldData
case schemapb.DataType_Int32:
singleData, validData, err := eventReader.GetInt32FromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int32FieldData{
Data: make([]int32, 0, rowNum),
}
}
int32FieldData := insertData.Data[fieldID].(*Int32FieldData)
int32FieldData.Data = append(int32FieldData.Data, singleData...)
int32FieldData.ValidData = append(int32FieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = int32FieldData
case schemapb.DataType_Int64:
singleData, validData, err := eventReader.GetInt64FromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int64FieldData{
Data: make([]int64, 0, rowNum),
}
}
int64FieldData := insertData.Data[fieldID].(*Int64FieldData)
int64FieldData.Data = append(int64FieldData.Data, singleData...)
int64FieldData.ValidData = append(int64FieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = int64FieldData
case schemapb.DataType_Float:
singleData, validData, err := eventReader.GetFloatFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &FloatFieldData{
Data: make([]float32, 0, rowNum),
}
}
floatFieldData := insertData.Data[fieldID].(*FloatFieldData)
floatFieldData.Data = append(floatFieldData.Data, singleData...)
floatFieldData.ValidData = append(floatFieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = floatFieldData
case schemapb.DataType_Double:
singleData, validData, err := eventReader.GetDoubleFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &DoubleFieldData{
Data: make([]float64, 0, rowNum),
}
}
doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData)
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
doubleFieldData.ValidData = append(doubleFieldData.ValidData, validData...)
totalLength += len(singleData)
insertData.Data[fieldID] = doubleFieldData
case schemapb.DataType_String, schemapb.DataType_VarChar:
stringPayload, validData, err := eventReader.GetStringFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &StringFieldData{
Data: make([]string, 0, rowNum),
}
}
stringFieldData := insertData.Data[fieldID].(*StringFieldData)
stringFieldData.DataType = dataType
stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
stringFieldData.ValidData = append(stringFieldData.ValidData, validData...)
totalLength += len(stringPayload)
insertData.Data[fieldID] = stringFieldData
case schemapb.DataType_Array:
arrayPayload, validData, err := eventReader.GetArrayFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &ArrayFieldData{
Data: make([]*schemapb.ScalarField, 0, rowNum),
}
}
arrayFieldData := insertData.Data[fieldID].(*ArrayFieldData)
arrayFieldData.Data = append(arrayFieldData.Data, arrayPayload...)
arrayFieldData.ValidData = append(arrayFieldData.ValidData, validData...)
totalLength += len(arrayPayload)
insertData.Data[fieldID] = arrayFieldData
case schemapb.DataType_JSON:
jsonPayload, validData, err := eventReader.GetJSONFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &JSONFieldData{
Data: make([][]byte, 0, rowNum),
}
}
jsonFieldData := insertData.Data[fieldID].(*JSONFieldData)
jsonFieldData.Data = append(jsonFieldData.Data, jsonPayload...)
jsonFieldData.ValidData = append(jsonFieldData.ValidData, validData...)
totalLength += len(jsonPayload)
insertData.Data[fieldID] = jsonFieldData
case schemapb.DataType_BinaryVector:
var singleData []byte
singleData, dim, err = eventReader.GetBinaryVectorFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &BinaryVectorFieldData{
Data: make([]byte, 0, rowNum*dim),
}
}
binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData)
binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
binaryVectorFieldData.Dim = dim
insertData.Data[fieldID] = binaryVectorFieldData
case schemapb.DataType_Float16Vector:
var singleData []byte
singleData, dim, err = eventReader.GetFloat16VectorFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Float16VectorFieldData{
Data: make([]byte, 0, rowNum*dim),
}
}
float16VectorFieldData := insertData.Data[fieldID].(*Float16VectorFieldData)
float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
float16VectorFieldData.Dim = dim
insertData.Data[fieldID] = float16VectorFieldData
case schemapb.DataType_BFloat16Vector:
var singleData []byte
singleData, dim, err = eventReader.GetBFloat16VectorFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &BFloat16VectorFieldData{
Data: make([]byte, 0, rowNum*dim),
}
}
bfloat16VectorFieldData := insertData.Data[fieldID].(*BFloat16VectorFieldData)
bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
bfloat16VectorFieldData.Dim = dim
insertData.Data[fieldID] = bfloat16VectorFieldData
case schemapb.DataType_FloatVector:
var singleData []float32
singleData, dim, err = eventReader.GetFloatVectorFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &FloatVectorFieldData{
Data: make([]float32, 0, rowNum*dim),
}
}
floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData)
floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
floatVectorFieldData.Dim = dim
insertData.Data[fieldID] = floatVectorFieldData
case schemapb.DataType_SparseFloatVector:
sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &SparseFloatVectorFieldData{}
}
vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData)
vec.AppendAllRows(sparseData)
totalLength += sparseData.RowNum()
insertData.Data[fieldID] = vec
default:
data, validData, dim, err := eventReader.GetDataFromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, fmt.Errorf("undefined data type %d", dataType)
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
length, err := AddInsertData(dataType, data, insertData, fieldID, rowNum, eventReader, dim, validData)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
eventReader.Close()
}
@ -811,6 +492,209 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
return collectionID, partitionID, segmentID, nil
}
func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *InsertData, fieldID int64, rowNum int, eventReader *EventReader, dim int, validData []bool) (dataLength int, err error) {
fieldData := insertData.Data[fieldID]
switch dataType {
case schemapb.DataType_Bool:
singleData := data.([]bool)
if fieldData == nil {
fieldData = &BoolFieldData{Data: make([]bool, 0, rowNum)}
}
boolFieldData := fieldData.(*BoolFieldData)
boolFieldData.Data = append(boolFieldData.Data, singleData...)
boolFieldData.ValidData = append(boolFieldData.ValidData, validData...)
insertData.Data[fieldID] = boolFieldData
return len(singleData), nil
case schemapb.DataType_Int8:
singleData := data.([]int8)
if fieldData == nil {
fieldData = &Int8FieldData{Data: make([]int8, 0, rowNum)}
}
int8FieldData := fieldData.(*Int8FieldData)
int8FieldData.Data = append(int8FieldData.Data, singleData...)
int8FieldData.ValidData = append(int8FieldData.ValidData, validData...)
insertData.Data[fieldID] = int8FieldData
return len(singleData), nil
case schemapb.DataType_Int16:
singleData := data.([]int16)
if fieldData == nil {
fieldData = &Int16FieldData{Data: make([]int16, 0, rowNum)}
}
int16FieldData := fieldData.(*Int16FieldData)
int16FieldData.Data = append(int16FieldData.Data, singleData...)
int16FieldData.ValidData = append(int16FieldData.ValidData, validData...)
insertData.Data[fieldID] = int16FieldData
return len(singleData), nil
case schemapb.DataType_Int32:
singleData := data.([]int32)
if fieldData == nil {
fieldData = &Int32FieldData{Data: make([]int32, 0, rowNum)}
}
int32FieldData := fieldData.(*Int32FieldData)
int32FieldData.Data = append(int32FieldData.Data, singleData...)
int32FieldData.ValidData = append(int32FieldData.ValidData, validData...)
insertData.Data[fieldID] = int32FieldData
return len(singleData), nil
case schemapb.DataType_Int64:
singleData := data.([]int64)
if fieldData == nil {
fieldData = &Int64FieldData{Data: make([]int64, 0, rowNum)}
}
int64FieldData := fieldData.(*Int64FieldData)
int64FieldData.Data = append(int64FieldData.Data, singleData...)
int64FieldData.ValidData = append(int64FieldData.ValidData, validData...)
insertData.Data[fieldID] = int64FieldData
return len(singleData), nil
case schemapb.DataType_Float:
singleData := data.([]float32)
if fieldData == nil {
fieldData = &FloatFieldData{Data: make([]float32, 0, rowNum)}
}
floatFieldData := fieldData.(*FloatFieldData)
floatFieldData.Data = append(floatFieldData.Data, singleData...)
floatFieldData.ValidData = append(floatFieldData.ValidData, validData...)
insertData.Data[fieldID] = floatFieldData
return len(singleData), nil
case schemapb.DataType_Double:
singleData := data.([]float64)
if fieldData == nil {
fieldData = &DoubleFieldData{Data: make([]float64, 0, rowNum)}
}
doubleFieldData := fieldData.(*DoubleFieldData)
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
doubleFieldData.ValidData = append(doubleFieldData.ValidData, validData...)
insertData.Data[fieldID] = doubleFieldData
return len(singleData), nil
case schemapb.DataType_String, schemapb.DataType_VarChar:
singleData := data.([]string)
if fieldData == nil {
fieldData = &StringFieldData{Data: make([]string, 0, rowNum)}
}
stringFieldData := fieldData.(*StringFieldData)
stringFieldData.Data = append(stringFieldData.Data, singleData...)
stringFieldData.ValidData = append(stringFieldData.ValidData, validData...)
stringFieldData.DataType = dataType
insertData.Data[fieldID] = stringFieldData
return len(singleData), nil
case schemapb.DataType_Array:
singleData := data.([]*schemapb.ScalarField)
if fieldData == nil {
fieldData = &ArrayFieldData{Data: make([]*schemapb.ScalarField, 0, rowNum)}
}
arrayFieldData := fieldData.(*ArrayFieldData)
arrayFieldData.Data = append(arrayFieldData.Data, singleData...)
arrayFieldData.ValidData = append(arrayFieldData.ValidData, validData...)
insertData.Data[fieldID] = arrayFieldData
return len(singleData), nil
case schemapb.DataType_JSON:
singleData := data.([][]byte)
if fieldData == nil {
fieldData = &JSONFieldData{Data: make([][]byte, 0, rowNum)}
}
jsonFieldData := fieldData.(*JSONFieldData)
jsonFieldData.Data = append(jsonFieldData.Data, singleData...)
jsonFieldData.ValidData = append(jsonFieldData.ValidData, validData...)
insertData.Data[fieldID] = jsonFieldData
return len(singleData), nil
case schemapb.DataType_BinaryVector:
singleData := data.([]byte)
if fieldData == nil {
fieldData = &BinaryVectorFieldData{Data: make([]byte, 0, rowNum*dim)}
}
binaryVectorFieldData := fieldData.(*BinaryVectorFieldData)
binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return length, err
}
binaryVectorFieldData.Dim = dim
insertData.Data[fieldID] = binaryVectorFieldData
return length, nil
case schemapb.DataType_Float16Vector:
singleData := data.([]byte)
if fieldData == nil {
fieldData = &Float16VectorFieldData{Data: make([]byte, 0, rowNum*dim)}
}
float16VectorFieldData := fieldData.(*Float16VectorFieldData)
float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return length, err
}
float16VectorFieldData.Dim = dim
insertData.Data[fieldID] = float16VectorFieldData
return length, nil
case schemapb.DataType_BFloat16Vector:
singleData := data.([]byte)
if fieldData == nil {
fieldData = &BFloat16VectorFieldData{Data: make([]byte, 0, rowNum*dim)}
}
bfloat16VectorFieldData := fieldData.(*BFloat16VectorFieldData)
bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return length, err
}
bfloat16VectorFieldData.Dim = dim
insertData.Data[fieldID] = bfloat16VectorFieldData
return length, nil
case schemapb.DataType_FloatVector:
singleData := data.([]float32)
if fieldData == nil {
fieldData = &FloatVectorFieldData{Data: make([]float32, 0, rowNum*dim)}
}
floatVectorFieldData := fieldData.(*FloatVectorFieldData)
floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return 0, err
}
floatVectorFieldData.Dim = dim
insertData.Data[fieldID] = floatVectorFieldData
return length, nil
case schemapb.DataType_SparseFloatVector:
singleData := data.(*SparseFloatVectorFieldData)
if fieldData == nil {
fieldData = &SparseFloatVectorFieldData{}
}
vec := fieldData.(*SparseFloatVectorFieldData)
vec.AppendAllRows(singleData)
insertData.Data[fieldID] = vec
return singleData.RowNum(), nil
default:
return 0, fmt.Errorf("undefined data type %d", dataType)
}
}
// Deserialize transfer blob back to insert data.
// From schema, it get all fields.
// For each field, it will create a binlog reader, and read all event to the buffer.
@ -838,13 +722,12 @@ func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog {
func (dl *DeleteLog) UnmarshalJSON(data []byte) error {
var messageMap map[string]*json.RawMessage
err := json.Unmarshal(data, &messageMap)
if err != nil {
var err error
if err = json.Unmarshal(data, &messageMap); err != nil {
return err
}
err = json.Unmarshal(*messageMap["pkType"], &dl.PkType)
if err != nil {
if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil {
return err
}
@ -855,13 +738,11 @@ func (dl *DeleteLog) UnmarshalJSON(data []byte) error {
dl.Pk = &VarCharPrimaryKey{}
}
err = json.Unmarshal(*messageMap["pk"], dl.Pk)
if err != nil {
if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil {
return err
}
err = json.Unmarshal(*messageMap["ts"], &dl.Ts)
if err != nil {
if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil {
return err
}
@ -1145,8 +1026,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req, true)
if err != nil {
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
@ -1155,8 +1035,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req, true)
if err != nil {
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
@ -1165,8 +1044,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req, true)
if err != nil {
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
@ -1175,8 +1053,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
err = eventWriter.AddOneStringToPayload(req, true)
if err != nil {
if err = eventWriter.AddOneStringToPayload(req, true); err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(ts[pos], ts[pos])
@ -1187,12 +1064,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
// https://github.com/milvus-io/milvus/issues/9620
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
err = writer.Finish()
if err != nil {
if err = writer.Finish(); err != nil {
return nil, err
}
buffer, err = writer.GetBuffer()
if err != nil {
if buffer, err = writer.GetBuffer(); err != nil {
return nil, err
}
blobs = append(blobs, &Blob{