From 5f02e52561a40623d690202e9ea44ee34a93473a Mon Sep 17 00:00:00 2001 From: shaoting-huang <167743503+shaoting-huang@users.noreply.github.com> Date: Thu, 20 Jun 2024 11:17:59 +0800 Subject: [PATCH] enhance: Refactor data codec deserialize (#33923) #33922 Signed-off-by: shaoting-huang --- internal/storage/data_codec.go | 573 +++++++++++++-------------------- 1 file changed, 224 insertions(+), 349 deletions(-) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index d8c44fc9f9..9f64808a73 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -451,7 +451,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int dataType := binlogReader.PayloadDataType fieldID := binlogReader.FieldID totalLength := 0 - dim := 0 for { eventReader, err := binlogReader.NextEventReader() @@ -461,337 +460,19 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int if eventReader == nil { break } - switch dataType { - case schemapb.DataType_Bool: - singleData, validData, err := eventReader.GetBoolFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BoolFieldData{ - Data: make([]bool, 0, rowNum), - } - } - boolFieldData := insertData.Data[fieldID].(*BoolFieldData) - - boolFieldData.Data = append(boolFieldData.Data, singleData...) - boolFieldData.ValidData = append(boolFieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = boolFieldData - - case schemapb.DataType_Int8: - singleData, validData, err := eventReader.GetInt8FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int8FieldData{ - Data: make([]int8, 0, rowNum), - } - } - int8FieldData := insertData.Data[fieldID].(*Int8FieldData) - - int8FieldData.Data = append(int8FieldData.Data, singleData...) - int8FieldData.ValidData = append(int8FieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int8FieldData - - case schemapb.DataType_Int16: - singleData, validData, err := eventReader.GetInt16FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int16FieldData{ - Data: make([]int16, 0, rowNum), - } - } - int16FieldData := insertData.Data[fieldID].(*Int16FieldData) - - int16FieldData.Data = append(int16FieldData.Data, singleData...) - int16FieldData.ValidData = append(int16FieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int16FieldData - - case schemapb.DataType_Int32: - singleData, validData, err := eventReader.GetInt32FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int32FieldData{ - Data: make([]int32, 0, rowNum), - } - } - int32FieldData := insertData.Data[fieldID].(*Int32FieldData) - - int32FieldData.Data = append(int32FieldData.Data, singleData...) - int32FieldData.ValidData = append(int32FieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int32FieldData - - case schemapb.DataType_Int64: - singleData, validData, err := eventReader.GetInt64FromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Int64FieldData{ - Data: make([]int64, 0, rowNum), - } - } - int64FieldData := insertData.Data[fieldID].(*Int64FieldData) - - int64FieldData.Data = append(int64FieldData.Data, singleData...) - int64FieldData.ValidData = append(int64FieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = int64FieldData - - case schemapb.DataType_Float: - singleData, validData, err := eventReader.GetFloatFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatFieldData{ - Data: make([]float32, 0, rowNum), - } - } - floatFieldData := insertData.Data[fieldID].(*FloatFieldData) - - floatFieldData.Data = append(floatFieldData.Data, singleData...) - floatFieldData.ValidData = append(floatFieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = floatFieldData - - case schemapb.DataType_Double: - singleData, validData, err := eventReader.GetDoubleFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &DoubleFieldData{ - Data: make([]float64, 0, rowNum), - } - } - doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData) - - doubleFieldData.Data = append(doubleFieldData.Data, singleData...) - doubleFieldData.ValidData = append(doubleFieldData.ValidData, validData...) - totalLength += len(singleData) - insertData.Data[fieldID] = doubleFieldData - - case schemapb.DataType_String, schemapb.DataType_VarChar: - stringPayload, validData, err := eventReader.GetStringFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &StringFieldData{ - Data: make([]string, 0, rowNum), - } - } - stringFieldData := insertData.Data[fieldID].(*StringFieldData) - stringFieldData.DataType = dataType - - stringFieldData.Data = append(stringFieldData.Data, stringPayload...) - stringFieldData.ValidData = append(stringFieldData.ValidData, validData...) - totalLength += len(stringPayload) - insertData.Data[fieldID] = stringFieldData - - case schemapb.DataType_Array: - arrayPayload, validData, err := eventReader.GetArrayFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &ArrayFieldData{ - Data: make([]*schemapb.ScalarField, 0, rowNum), - } - } - arrayFieldData := insertData.Data[fieldID].(*ArrayFieldData) - - arrayFieldData.Data = append(arrayFieldData.Data, arrayPayload...) - arrayFieldData.ValidData = append(arrayFieldData.ValidData, validData...) - totalLength += len(arrayPayload) - insertData.Data[fieldID] = arrayFieldData - - case schemapb.DataType_JSON: - jsonPayload, validData, err := eventReader.GetJSONFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &JSONFieldData{ - Data: make([][]byte, 0, rowNum), - } - } - jsonFieldData := insertData.Data[fieldID].(*JSONFieldData) - - jsonFieldData.Data = append(jsonFieldData.Data, jsonPayload...) - jsonFieldData.ValidData = append(jsonFieldData.ValidData, validData...) - totalLength += len(jsonPayload) - insertData.Data[fieldID] = jsonFieldData - - case schemapb.DataType_BinaryVector: - var singleData []byte - singleData, dim, err = eventReader.GetBinaryVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BinaryVectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData) - - binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - binaryVectorFieldData.Dim = dim - insertData.Data[fieldID] = binaryVectorFieldData - - case schemapb.DataType_Float16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &Float16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - float16VectorFieldData := insertData.Data[fieldID].(*Float16VectorFieldData) - - float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - float16VectorFieldData.Dim = dim - insertData.Data[fieldID] = float16VectorFieldData - - case schemapb.DataType_BFloat16Vector: - var singleData []byte - singleData, dim, err = eventReader.GetBFloat16VectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &BFloat16VectorFieldData{ - Data: make([]byte, 0, rowNum*dim), - } - } - bfloat16VectorFieldData := insertData.Data[fieldID].(*BFloat16VectorFieldData) - - bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - bfloat16VectorFieldData.Dim = dim - insertData.Data[fieldID] = bfloat16VectorFieldData - - case schemapb.DataType_FloatVector: - var singleData []float32 - singleData, dim, err = eventReader.GetFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &FloatVectorFieldData{ - Data: make([]float32, 0, rowNum*dim), - } - } - floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData) - - floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) - length, err := eventReader.GetPayloadLengthFromReader() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - totalLength += length - floatVectorFieldData.Dim = dim - insertData.Data[fieldID] = floatVectorFieldData - - case schemapb.DataType_SparseFloatVector: - sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload() - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err - } - if insertData.Data[fieldID] == nil { - insertData.Data[fieldID] = &SparseFloatVectorFieldData{} - } - vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) - vec.AppendAllRows(sparseData) - - totalLength += sparseData.RowNum() - insertData.Data[fieldID] = vec - - default: + data, validData, dim, err := eventReader.GetDataFromPayload() + if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, fmt.Errorf("undefined data type %d", dataType) + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } + length, err := AddInsertData(dataType, data, insertData, fieldID, rowNum, eventReader, dim, validData) + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + totalLength += length eventReader.Close() } @@ -811,6 +492,209 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int return collectionID, partitionID, segmentID, nil } +func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *InsertData, fieldID int64, rowNum int, eventReader *EventReader, dim int, validData []bool) (dataLength int, err error) { + fieldData := insertData.Data[fieldID] + switch dataType { + case schemapb.DataType_Bool: + singleData := data.([]bool) + if fieldData == nil { + fieldData = &BoolFieldData{Data: make([]bool, 0, rowNum)} + } + boolFieldData := fieldData.(*BoolFieldData) + + boolFieldData.Data = append(boolFieldData.Data, singleData...) + boolFieldData.ValidData = append(boolFieldData.ValidData, validData...) + insertData.Data[fieldID] = boolFieldData + return len(singleData), nil + + case schemapb.DataType_Int8: + singleData := data.([]int8) + if fieldData == nil { + fieldData = &Int8FieldData{Data: make([]int8, 0, rowNum)} + } + int8FieldData := fieldData.(*Int8FieldData) + + int8FieldData.Data = append(int8FieldData.Data, singleData...) + int8FieldData.ValidData = append(int8FieldData.ValidData, validData...) + insertData.Data[fieldID] = int8FieldData + return len(singleData), nil + + case schemapb.DataType_Int16: + singleData := data.([]int16) + if fieldData == nil { + fieldData = &Int16FieldData{Data: make([]int16, 0, rowNum)} + } + int16FieldData := fieldData.(*Int16FieldData) + + int16FieldData.Data = append(int16FieldData.Data, singleData...) + int16FieldData.ValidData = append(int16FieldData.ValidData, validData...) + insertData.Data[fieldID] = int16FieldData + return len(singleData), nil + + case schemapb.DataType_Int32: + singleData := data.([]int32) + if fieldData == nil { + fieldData = &Int32FieldData{Data: make([]int32, 0, rowNum)} + } + int32FieldData := fieldData.(*Int32FieldData) + + int32FieldData.Data = append(int32FieldData.Data, singleData...) + int32FieldData.ValidData = append(int32FieldData.ValidData, validData...) + insertData.Data[fieldID] = int32FieldData + return len(singleData), nil + + case schemapb.DataType_Int64: + singleData := data.([]int64) + if fieldData == nil { + fieldData = &Int64FieldData{Data: make([]int64, 0, rowNum)} + } + int64FieldData := fieldData.(*Int64FieldData) + + int64FieldData.Data = append(int64FieldData.Data, singleData...) + int64FieldData.ValidData = append(int64FieldData.ValidData, validData...) + insertData.Data[fieldID] = int64FieldData + return len(singleData), nil + + case schemapb.DataType_Float: + singleData := data.([]float32) + if fieldData == nil { + fieldData = &FloatFieldData{Data: make([]float32, 0, rowNum)} + } + floatFieldData := fieldData.(*FloatFieldData) + + floatFieldData.Data = append(floatFieldData.Data, singleData...) + floatFieldData.ValidData = append(floatFieldData.ValidData, validData...) + insertData.Data[fieldID] = floatFieldData + return len(singleData), nil + + case schemapb.DataType_Double: + singleData := data.([]float64) + if fieldData == nil { + fieldData = &DoubleFieldData{Data: make([]float64, 0, rowNum)} + } + doubleFieldData := fieldData.(*DoubleFieldData) + + doubleFieldData.Data = append(doubleFieldData.Data, singleData...) + doubleFieldData.ValidData = append(doubleFieldData.ValidData, validData...) + insertData.Data[fieldID] = doubleFieldData + return len(singleData), nil + + case schemapb.DataType_String, schemapb.DataType_VarChar: + singleData := data.([]string) + if fieldData == nil { + fieldData = &StringFieldData{Data: make([]string, 0, rowNum)} + } + stringFieldData := fieldData.(*StringFieldData) + + stringFieldData.Data = append(stringFieldData.Data, singleData...) + stringFieldData.ValidData = append(stringFieldData.ValidData, validData...) + stringFieldData.DataType = dataType + insertData.Data[fieldID] = stringFieldData + return len(singleData), nil + + case schemapb.DataType_Array: + singleData := data.([]*schemapb.ScalarField) + if fieldData == nil { + fieldData = &ArrayFieldData{Data: make([]*schemapb.ScalarField, 0, rowNum)} + } + arrayFieldData := fieldData.(*ArrayFieldData) + + arrayFieldData.Data = append(arrayFieldData.Data, singleData...) + arrayFieldData.ValidData = append(arrayFieldData.ValidData, validData...) + insertData.Data[fieldID] = arrayFieldData + return len(singleData), nil + + case schemapb.DataType_JSON: + singleData := data.([][]byte) + if fieldData == nil { + fieldData = &JSONFieldData{Data: make([][]byte, 0, rowNum)} + } + jsonFieldData := fieldData.(*JSONFieldData) + + jsonFieldData.Data = append(jsonFieldData.Data, singleData...) + jsonFieldData.ValidData = append(jsonFieldData.ValidData, validData...) + insertData.Data[fieldID] = jsonFieldData + return len(singleData), nil + + case schemapb.DataType_BinaryVector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &BinaryVectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + binaryVectorFieldData := fieldData.(*BinaryVectorFieldData) + + binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + binaryVectorFieldData.Dim = dim + insertData.Data[fieldID] = binaryVectorFieldData + return length, nil + + case schemapb.DataType_Float16Vector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &Float16VectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + float16VectorFieldData := fieldData.(*Float16VectorFieldData) + + float16VectorFieldData.Data = append(float16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + float16VectorFieldData.Dim = dim + insertData.Data[fieldID] = float16VectorFieldData + return length, nil + + case schemapb.DataType_BFloat16Vector: + singleData := data.([]byte) + if fieldData == nil { + fieldData = &BFloat16VectorFieldData{Data: make([]byte, 0, rowNum*dim)} + } + bfloat16VectorFieldData := fieldData.(*BFloat16VectorFieldData) + + bfloat16VectorFieldData.Data = append(bfloat16VectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return length, err + } + bfloat16VectorFieldData.Dim = dim + insertData.Data[fieldID] = bfloat16VectorFieldData + return length, nil + + case schemapb.DataType_FloatVector: + singleData := data.([]float32) + if fieldData == nil { + fieldData = &FloatVectorFieldData{Data: make([]float32, 0, rowNum*dim)} + } + floatVectorFieldData := fieldData.(*FloatVectorFieldData) + + floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) + length, err := eventReader.GetPayloadLengthFromReader() + if err != nil { + return 0, err + } + floatVectorFieldData.Dim = dim + insertData.Data[fieldID] = floatVectorFieldData + return length, nil + + case schemapb.DataType_SparseFloatVector: + singleData := data.(*SparseFloatVectorFieldData) + if fieldData == nil { + fieldData = &SparseFloatVectorFieldData{} + } + vec := fieldData.(*SparseFloatVectorFieldData) + vec.AppendAllRows(singleData) + insertData.Data[fieldID] = vec + return singleData.RowNum(), nil + + default: + return 0, fmt.Errorf("undefined data type %d", dataType) + } +} + // Deserialize transfer blob back to insert data. // From schema, it get all fields. // For each field, it will create a binlog reader, and read all event to the buffer. @@ -838,13 +722,12 @@ func NewDeleteLog(pk PrimaryKey, ts Timestamp) *DeleteLog { func (dl *DeleteLog) UnmarshalJSON(data []byte) error { var messageMap map[string]*json.RawMessage - err := json.Unmarshal(data, &messageMap) - if err != nil { + var err error + if err = json.Unmarshal(data, &messageMap); err != nil { return err } - err = json.Unmarshal(*messageMap["pkType"], &dl.PkType) - if err != nil { + if err = json.Unmarshal(*messageMap["pkType"], &dl.PkType); err != nil { return err } @@ -855,13 +738,11 @@ func (dl *DeleteLog) UnmarshalJSON(data []byte) error { dl.Pk = &VarCharPrimaryKey{} } - err = json.Unmarshal(*messageMap["pk"], dl.Pk) - if err != nil { + if err = json.Unmarshal(*messageMap["pk"], dl.Pk); err != nil { return err } - err = json.Unmarshal(*messageMap["ts"], &dl.Ts) - if err != nil { + if err = json.Unmarshal(*messageMap["ts"], &dl.Ts); err != nil { return err } @@ -1145,8 +1026,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req, true) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req, true); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1155,8 +1035,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req, true) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req, true); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1165,8 +1044,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req, true) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req, true); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1175,8 +1053,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ if err != nil { return nil, err } - err = eventWriter.AddOneStringToPayload(req, true) - if err != nil { + if err = eventWriter.AddOneStringToPayload(req, true); err != nil { return nil, err } eventWriter.SetEventTimestamp(ts[pos], ts[pos]) @@ -1187,12 +1064,10 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ // https://github.com/milvus-io/milvus/issues/9620 writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - err = writer.Finish() - if err != nil { + if err = writer.Finish(); err != nil { return nil, err } - buffer, err = writer.GetBuffer() - if err != nil { + if buffer, err = writer.GetBuffer(); err != nil { return nil, err } blobs = append(blobs, &Blob{