Add blob info (#5792)

* Add blob info

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>

* fix error

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
pull/5796/head
godchen 2021-06-16 12:03:57 +08:00 committed by GitHub
parent 97049ce293
commit 1c6786f85c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 1 deletions

View File

@ -121,10 +121,17 @@ type FloatVectorFieldData struct {
// 101: second user field id
// 102: ...
// TODO: fill it
// info for each blob
type BlobInfo struct {
Length int
}
// example row_schema: {float_field, int_field, float_vector_field, string_field}
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
type InsertData struct {
Data map[FieldID]FieldData // field id to field data
Data map[FieldID]FieldData // field id to field data
Infos []BlobInfo
}
// Blob key example:
@ -262,6 +269,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
dataType := binlogReader.PayloadDataType
fieldID := binlogReader.FieldID
totalLength := 0
for {
eventReader, err := binlogReader.NextEventReader()
if err != nil {
@ -285,6 +293,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
boolFieldData.NumRows += length
resultData.Data[fieldID] = boolFieldData
case schemapb.DataType_Int8:
@ -301,6 +310,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
int8FieldData.NumRows += length
resultData.Data[fieldID] = int8FieldData
case schemapb.DataType_Int16:
@ -317,6 +327,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
int16FieldData.NumRows += length
resultData.Data[fieldID] = int16FieldData
case schemapb.DataType_Int32:
@ -333,6 +344,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
int32FieldData.NumRows += length
resultData.Data[fieldID] = int32FieldData
case schemapb.DataType_Int64:
@ -349,6 +361,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
int64FieldData.NumRows += length
resultData.Data[fieldID] = int64FieldData
case schemapb.DataType_Float:
@ -365,6 +378,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
floatFieldData.NumRows += length
resultData.Data[fieldID] = floatFieldData
case schemapb.DataType_Double:
@ -381,6 +395,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
doubleFieldData.NumRows += length
resultData.Data[fieldID] = doubleFieldData
case schemapb.DataType_String:
@ -392,6 +407,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
stringFieldData.NumRows += length
for i := 0; i < length; i++ {
singleString, err := eventReader.GetOneStringFromPayload(i)
@ -416,6 +432,7 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
binaryVectorFieldData.NumRows += length
resultData.Data[fieldID] = binaryVectorFieldData
case schemapb.DataType_FloatVector:
@ -433,12 +450,19 @@ func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return -1, -1, nil, err
}
totalLength += length
floatVectorFieldData.NumRows += length
resultData.Data[fieldID] = floatVectorFieldData
default:
return -1, -1, nil, fmt.Errorf("undefined data type %d", dataType)
}
}
if fieldID == ms.TimeStampField {
blobInfo := BlobInfo{
Length: totalLength,
}
resultData.Infos = append(resultData.Infos, blobInfo)
}
insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
}

View File

@ -15,9 +15,11 @@ import (
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestInsertCodec(t *testing.T) {
@ -271,6 +273,8 @@ func TestInsertCodec(t *testing.T) {
assert.Equal(t, []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
resultData.Data[109].(*FloatVectorFieldData).Data)
assert.Nil(t, insertCodec.Close())
log.Debug("Data", zap.Any("Data", resultData.Data))
log.Debug("Infos", zap.Any("Infos", resultData.Infos))
blobs := []*Blob{}
_, _, _, err = insertCodec.Deserialize(blobs)