mirror of https://github.com/milvus-io/milvus.git
Fix memory calculation in dataCodec (#21800)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/21839/head
parent
d7bb908148
commit
949d5d078f
|
@ -693,12 +693,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
// TODO copy maybe expensive, but this seems to be the only convinent way.
|
||||
func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) {
|
||||
var rst storage.FieldData
|
||||
numOfRows := []int64{numRows}
|
||||
switch schemaDataType {
|
||||
case schemapb.DataType_Bool:
|
||||
var data = &storage.BoolFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]bool, 0, len(content)),
|
||||
Data: make([]bool, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -712,8 +710,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Int8:
|
||||
var data = &storage.Int8FieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]int8, 0, len(content)),
|
||||
Data: make([]int8, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -727,8 +724,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Int16:
|
||||
var data = &storage.Int16FieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]int16, 0, len(content)),
|
||||
Data: make([]int16, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -742,8 +738,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Int32:
|
||||
var data = &storage.Int32FieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]int32, 0, len(content)),
|
||||
Data: make([]int32, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -757,8 +752,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Int64:
|
||||
var data = &storage.Int64FieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]int64, 0, len(content)),
|
||||
Data: make([]int64, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -772,8 +766,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Float:
|
||||
var data = &storage.FloatFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]float32, 0, len(content)),
|
||||
Data: make([]float32, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -787,8 +780,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_Double:
|
||||
var data = &storage.DoubleFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]float64, 0, len(content)),
|
||||
Data: make([]float64, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -802,8 +794,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
var data = &storage.StringFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]string, 0, len(content)),
|
||||
Data: make([]string, 0, len(content)),
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -817,8 +808,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_FloatVector:
|
||||
var data = &storage.FloatVectorFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: []float32{},
|
||||
Data: []float32{},
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
@ -834,8 +824,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
|
||||
case schemapb.DataType_BinaryVector:
|
||||
var data = &storage.BinaryVectorFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: []byte{},
|
||||
Data: []byte{},
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
|
|
|
@ -1159,54 +1159,42 @@ func genInsertData() *InsertData {
|
|||
return &InsertData{
|
||||
Data: map[int64]s.FieldData{
|
||||
0: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{11, 22},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
1: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
100: &s.FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{1.0, 6.0, 7.0, 8.0},
|
||||
Dim: 2,
|
||||
Data: []float32{1.0, 6.0, 7.0, 8.0},
|
||||
Dim: 2,
|
||||
},
|
||||
101: &s.BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
|
||||
Dim: 32,
|
||||
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
|
||||
Dim: 32,
|
||||
},
|
||||
102: &s.BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
103: &s.Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{5, 6},
|
||||
Data: []int8{5, 6},
|
||||
},
|
||||
104: &s.Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{7, 8},
|
||||
Data: []int16{7, 8},
|
||||
},
|
||||
105: &s.Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{9, 10},
|
||||
Data: []int32{9, 10},
|
||||
},
|
||||
106: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
107: &s.FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{2.333, 2.334},
|
||||
Data: []float32{2.333, 2.334},
|
||||
},
|
||||
108: &s.DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{3.333, 3.334},
|
||||
Data: []float64{3.333, 3.334},
|
||||
},
|
||||
109: &s.StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"test1", "test2"},
|
||||
Data: []string{"test1", "test2"},
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
@ -1215,54 +1203,42 @@ func genEmptyInsertData() *InsertData {
|
|||
return &InsertData{
|
||||
Data: map[int64]s.FieldData{
|
||||
0: &s.Int64FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int64{},
|
||||
Data: []int64{},
|
||||
},
|
||||
1: &s.Int64FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int64{},
|
||||
Data: []int64{},
|
||||
},
|
||||
100: &s.FloatVectorFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []float32{},
|
||||
Dim: 2,
|
||||
Data: []float32{},
|
||||
Dim: 2,
|
||||
},
|
||||
101: &s.BinaryVectorFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []byte{},
|
||||
Dim: 32,
|
||||
Data: []byte{},
|
||||
Dim: 32,
|
||||
},
|
||||
102: &s.BoolFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []bool{},
|
||||
Data: []bool{},
|
||||
},
|
||||
103: &s.Int8FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int8{},
|
||||
Data: []int8{},
|
||||
},
|
||||
104: &s.Int16FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int16{},
|
||||
Data: []int16{},
|
||||
},
|
||||
105: &s.Int32FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int32{},
|
||||
Data: []int32{},
|
||||
},
|
||||
106: &s.Int64FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []int64{},
|
||||
Data: []int64{},
|
||||
},
|
||||
107: &s.FloatFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []float32{},
|
||||
Data: []float32{},
|
||||
},
|
||||
108: &s.DoubleFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []float64{},
|
||||
Data: []float64{},
|
||||
},
|
||||
109: &s.StringFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: []string{},
|
||||
Data: []string{},
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
@ -1271,54 +1247,42 @@ func genInsertDataWithExpiredTS() *InsertData {
|
|||
return &InsertData{
|
||||
Data: map[int64]s.FieldData{
|
||||
0: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{11, 22},
|
||||
Data: []int64{11, 22},
|
||||
},
|
||||
1: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC
|
||||
Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC
|
||||
},
|
||||
100: &s.FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{1.0, 6.0, 7.0, 8.0},
|
||||
Dim: 2,
|
||||
Data: []float32{1.0, 6.0, 7.0, 8.0},
|
||||
Dim: 2,
|
||||
},
|
||||
101: &s.BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
|
||||
Dim: 32,
|
||||
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
|
||||
Dim: 32,
|
||||
},
|
||||
102: &s.BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
103: &s.Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{5, 6},
|
||||
Data: []int8{5, 6},
|
||||
},
|
||||
104: &s.Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{7, 8},
|
||||
Data: []int16{7, 8},
|
||||
},
|
||||
105: &s.Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{9, 10},
|
||||
Data: []int32{9, 10},
|
||||
},
|
||||
106: &s.Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
107: &s.FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{2.333, 2.334},
|
||||
Data: []float32{2.333, 2.334},
|
||||
},
|
||||
108: &s.DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{3.333, 3.334},
|
||||
Data: []float64{3.333, 3.334},
|
||||
},
|
||||
109: &s.StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"test1", "test2"},
|
||||
Data: []string{"test1", "test2"},
|
||||
},
|
||||
}}
|
||||
}
|
||||
|
|
|
@ -894,8 +894,7 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
|
|||
tsFieldData[i] = int64(ts)
|
||||
}
|
||||
fields[common.TimeStampField] = &storage.Int64FieldData{
|
||||
Data: tsFieldData,
|
||||
NumRows: []int64{int64(rowNum)},
|
||||
Data: tsFieldData,
|
||||
}
|
||||
|
||||
if status, _ := node.dataCoord.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{
|
||||
|
|
|
@ -183,17 +183,14 @@ func (c *mockChunkmgr) mockFieldData(numrows, dim int, collectionID, partitionID
|
|||
}
|
||||
vecs := randomFloats(numrows, dim)
|
||||
idField := storage.Int64FieldData{
|
||||
NumRows: []int64{},
|
||||
Data: idList,
|
||||
Data: idList,
|
||||
}
|
||||
tsField := storage.Int64FieldData{
|
||||
NumRows: []int64{},
|
||||
Data: tsList,
|
||||
Data: tsList,
|
||||
}
|
||||
vecField := storage.FloatVectorFieldData{
|
||||
NumRows: []int64{},
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
}
|
||||
insertData := &storage.InsertData{
|
||||
Data: map[int64]storage.FieldData{
|
||||
|
|
|
@ -831,70 +831,58 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I
|
|||
|
||||
// set data for rowID field
|
||||
insertData.Data[rowIDFieldID] = &storage.Int64FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateInt64Array(msgLength),
|
||||
Data: generateInt64Array(msgLength),
|
||||
}
|
||||
// set data for ts field
|
||||
insertData.Data[timestampFieldID] = &storage.Int64FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: genTimestampFieldData(msgLength),
|
||||
Data: genTimestampFieldData(msgLength),
|
||||
}
|
||||
|
||||
for _, f := range schema.Fields {
|
||||
switch f.DataType {
|
||||
case schemapb.DataType_Bool:
|
||||
insertData.Data[f.FieldID] = &storage.BoolFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateBoolArray(msgLength),
|
||||
Data: generateBoolArray(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Int8:
|
||||
insertData.Data[f.FieldID] = &storage.Int8FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateInt8Array(msgLength),
|
||||
Data: generateInt8Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Int16:
|
||||
insertData.Data[f.FieldID] = &storage.Int16FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateInt16Array(msgLength),
|
||||
Data: generateInt16Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Int32:
|
||||
insertData.Data[f.FieldID] = &storage.Int32FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateInt32Array(msgLength),
|
||||
Data: generateInt32Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Int64:
|
||||
insertData.Data[f.FieldID] = &storage.Int64FieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateInt64Array(msgLength),
|
||||
Data: generateInt64Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Float:
|
||||
insertData.Data[f.FieldID] = &storage.FloatFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateFloat32Array(msgLength),
|
||||
Data: generateFloat32Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_Double:
|
||||
insertData.Data[f.FieldID] = &storage.DoubleFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateFloat64Array(msgLength),
|
||||
Data: generateFloat64Array(msgLength),
|
||||
}
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
insertData.Data[f.FieldID] = &storage.StringFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateStringArray(msgLength),
|
||||
Data: generateStringArray(msgLength),
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
dim := simpleFloatVecField.dim // if no dim specified, use simpleFloatVecField's dim
|
||||
insertData.Data[f.FieldID] = &storage.FloatVectorFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateFloatVectors(msgLength, dim),
|
||||
Dim: dim,
|
||||
Data: generateFloatVectors(msgLength, dim),
|
||||
Dim: dim,
|
||||
}
|
||||
case schemapb.DataType_BinaryVector:
|
||||
dim := simpleBinVecField.dim
|
||||
insertData.Data[f.FieldID] = &storage.BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(msgLength)},
|
||||
Data: generateBinaryVectors(msgLength, dim),
|
||||
Dim: dim,
|
||||
Data: generateBinaryVectors(msgLength, dim),
|
||||
Dim: dim,
|
||||
}
|
||||
default:
|
||||
err := errors.New("data type not supported")
|
||||
|
|
|
@ -89,12 +89,9 @@ type Segment struct {
|
|||
lastMemSize int64
|
||||
lastRowCount int64
|
||||
|
||||
recentlyModified *atomic.Bool
|
||||
segmentType *atomic.Int32
|
||||
destroyed *atomic.Bool
|
||||
|
||||
idBinlogRowSizes []int64
|
||||
|
||||
recentlyModified *atomic.Bool
|
||||
segmentType *atomic.Int32
|
||||
destroyed *atomic.Bool
|
||||
indexedFieldInfos *typeutil.ConcurrentMap[UniqueID, *IndexedFieldInfo]
|
||||
|
||||
statLock sync.Mutex
|
||||
|
@ -108,14 +105,6 @@ func (s *Segment) ID() UniqueID {
|
|||
return s.segmentID
|
||||
}
|
||||
|
||||
func (s *Segment) setIDBinlogRowSizes(sizes []int64) {
|
||||
s.idBinlogRowSizes = sizes
|
||||
}
|
||||
|
||||
func (s *Segment) getIDBinlogRowSizes() []int64 {
|
||||
return s.idBinlogRowSizes
|
||||
}
|
||||
|
||||
func (s *Segment) setRecentlyModified(modify bool) {
|
||||
s.recentlyModified.Store(modify)
|
||||
}
|
||||
|
@ -386,12 +375,12 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro
|
|||
|
||||
func (s *Segment) getFieldDataPath(indexedFieldInfo *IndexedFieldInfo, offset int64) (dataPath string, offsetInBinlog int64) {
|
||||
offsetInBinlog = offset
|
||||
for index, idBinlogRowSize := range s.idBinlogRowSizes {
|
||||
if offsetInBinlog < idBinlogRowSize {
|
||||
dataPath = indexedFieldInfo.fieldBinlog.Binlogs[index].GetLogPath()
|
||||
for _, binlog := range indexedFieldInfo.fieldBinlog.Binlogs {
|
||||
if offsetInBinlog < binlog.EntriesNum {
|
||||
dataPath = binlog.GetLogPath()
|
||||
break
|
||||
} else {
|
||||
offsetInBinlog -= idBinlogRowSize
|
||||
offsetInBinlog -= binlog.EntriesNum
|
||||
}
|
||||
}
|
||||
return dataPath, offsetInBinlog
|
||||
|
|
|
@ -626,11 +626,6 @@ func (loader *segmentLoader) loadSealedSegments(segment *Segment, insertData *st
|
|||
numRows := insertRecord.NumRows
|
||||
for _, fieldData := range insertRecord.FieldsData {
|
||||
fieldID := fieldData.FieldId
|
||||
if fieldID == common.TimeStampField {
|
||||
timestampsData := insertData.Data[fieldID].(*storage.Int64FieldData)
|
||||
segment.setIDBinlogRowSizes(timestampsData.NumRows)
|
||||
}
|
||||
|
||||
err := segment.segmentLoadFieldData(fieldID, numRows, fieldData)
|
||||
if err != nil {
|
||||
// TODO: return or continue?
|
||||
|
|
|
@ -590,14 +590,6 @@ func TestSegment_BasicMetrics(t *testing.T) {
|
|||
defaultSegmentStartPosition)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("test id binlog row size", func(t *testing.T) {
|
||||
size := int64(1024)
|
||||
segment.setIDBinlogRowSizes([]int64{size})
|
||||
sizes := segment.getIDBinlogRowSizes()
|
||||
assert.Len(t, sizes, 1)
|
||||
assert.Equal(t, size, sizes[0])
|
||||
})
|
||||
|
||||
t.Run("test type", func(t *testing.T) {
|
||||
sType := segmentTypeGrowing
|
||||
segment.setType(sType)
|
||||
|
@ -685,17 +677,17 @@ func Test_getFieldDataPath(t *testing.T) {
|
|||
FieldID: 0,
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
LogPath: funcutil.GenRandomStr(),
|
||||
EntriesNum: 10,
|
||||
LogPath: funcutil.GenRandomStr(),
|
||||
},
|
||||
{
|
||||
LogPath: funcutil.GenRandomStr(),
|
||||
EntriesNum: 15,
|
||||
LogPath: funcutil.GenRandomStr(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
s := &Segment{
|
||||
idBinlogRowSizes: []int64{10, 15},
|
||||
}
|
||||
s := &Segment{}
|
||||
|
||||
path, offsetInBinlog := s.getFieldDataPath(indexedFieldInfo, 4)
|
||||
assert.Equal(t, indexedFieldInfo.fieldBinlog.Binlogs[0].LogPath, path)
|
||||
|
|
|
@ -63,14 +63,12 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
common.TimeStampField: &Int64FieldData{Data: field1},
|
||||
101: &Int32FieldData{Data: field101},
|
||||
102: &FloatVectorFieldData{
|
||||
NumRows: []int64{int64(num)},
|
||||
Data: field102,
|
||||
Dim: 8,
|
||||
Data: field102,
|
||||
Dim: 8,
|
||||
},
|
||||
103: &BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(num)},
|
||||
Data: field103,
|
||||
Dim: 8,
|
||||
Data: field103,
|
||||
Dim: 8,
|
||||
},
|
||||
}}
|
||||
|
||||
|
|
|
@ -110,46 +110,36 @@ type FieldData interface {
|
|||
}
|
||||
|
||||
type BoolFieldData struct {
|
||||
NumRows []int64
|
||||
Data []bool
|
||||
Data []bool
|
||||
}
|
||||
type Int8FieldData struct {
|
||||
NumRows []int64
|
||||
Data []int8
|
||||
Data []int8
|
||||
}
|
||||
type Int16FieldData struct {
|
||||
NumRows []int64
|
||||
Data []int16
|
||||
Data []int16
|
||||
}
|
||||
type Int32FieldData struct {
|
||||
NumRows []int64
|
||||
Data []int32
|
||||
Data []int32
|
||||
}
|
||||
type Int64FieldData struct {
|
||||
NumRows []int64
|
||||
Data []int64
|
||||
Data []int64
|
||||
}
|
||||
type FloatFieldData struct {
|
||||
NumRows []int64
|
||||
Data []float32
|
||||
Data []float32
|
||||
}
|
||||
type DoubleFieldData struct {
|
||||
NumRows []int64
|
||||
Data []float64
|
||||
Data []float64
|
||||
}
|
||||
type StringFieldData struct {
|
||||
NumRows []int64
|
||||
Data []string
|
||||
Data []string
|
||||
}
|
||||
type BinaryVectorFieldData struct {
|
||||
NumRows []int64
|
||||
Data []byte
|
||||
Dim int
|
||||
Data []byte
|
||||
Dim int
|
||||
}
|
||||
type FloatVectorFieldData struct {
|
||||
NumRows []int64
|
||||
Data []float32
|
||||
Dim int
|
||||
Data []float32
|
||||
Dim int
|
||||
}
|
||||
|
||||
// RowNum implements FieldData.RowNum
|
||||
|
@ -187,47 +177,51 @@ func (data *FloatVectorFieldData) GetRow(i int) interface{} {
|
|||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *BoolFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int8FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int16FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int32FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int64FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *FloatFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *DoubleFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *StringFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data)
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
size += len(val) + 16
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *BinaryVectorFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
|
||||
return binary.Size(data.Data) + 4
|
||||
}
|
||||
|
||||
func (data *FloatVectorFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
|
||||
return binary.Size(data.Data) + 4
|
||||
}
|
||||
|
||||
// system filed id:
|
||||
|
@ -511,15 +505,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &BoolFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]bool, 0, rowNum),
|
||||
Data: make([]bool, 0, rowNum),
|
||||
}
|
||||
}
|
||||
boolFieldData := insertData.Data[fieldID].(*BoolFieldData)
|
||||
|
||||
boolFieldData.Data = append(boolFieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
boolFieldData.NumRows = append(boolFieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = boolFieldData
|
||||
|
||||
case schemapb.DataType_Int8:
|
||||
|
@ -532,15 +524,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &Int8FieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]int8, 0, rowNum),
|
||||
Data: make([]int8, 0, rowNum),
|
||||
}
|
||||
}
|
||||
int8FieldData := insertData.Data[fieldID].(*Int8FieldData)
|
||||
|
||||
int8FieldData.Data = append(int8FieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
int8FieldData.NumRows = append(int8FieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = int8FieldData
|
||||
|
||||
case schemapb.DataType_Int16:
|
||||
|
@ -553,15 +543,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &Int16FieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]int16, 0, rowNum),
|
||||
Data: make([]int16, 0, rowNum),
|
||||
}
|
||||
}
|
||||
int16FieldData := insertData.Data[fieldID].(*Int16FieldData)
|
||||
|
||||
int16FieldData.Data = append(int16FieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
int16FieldData.NumRows = append(int16FieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = int16FieldData
|
||||
|
||||
case schemapb.DataType_Int32:
|
||||
|
@ -574,15 +562,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &Int32FieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]int32, 0, rowNum),
|
||||
Data: make([]int32, 0, rowNum),
|
||||
}
|
||||
}
|
||||
int32FieldData := insertData.Data[fieldID].(*Int32FieldData)
|
||||
|
||||
int32FieldData.Data = append(int32FieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
int32FieldData.NumRows = append(int32FieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = int32FieldData
|
||||
|
||||
case schemapb.DataType_Int64:
|
||||
|
@ -595,15 +581,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &Int64FieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]int64, 0, rowNum),
|
||||
Data: make([]int64, 0, rowNum),
|
||||
}
|
||||
}
|
||||
int64FieldData := insertData.Data[fieldID].(*Int64FieldData)
|
||||
|
||||
int64FieldData.Data = append(int64FieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
int64FieldData.NumRows = append(int64FieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = int64FieldData
|
||||
|
||||
case schemapb.DataType_Float:
|
||||
|
@ -616,15 +600,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &FloatFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]float32, 0, rowNum),
|
||||
Data: make([]float32, 0, rowNum),
|
||||
}
|
||||
}
|
||||
floatFieldData := insertData.Data[fieldID].(*FloatFieldData)
|
||||
|
||||
floatFieldData.Data = append(floatFieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
floatFieldData.NumRows = append(floatFieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = floatFieldData
|
||||
|
||||
case schemapb.DataType_Double:
|
||||
|
@ -637,15 +619,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &DoubleFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]float64, 0, rowNum),
|
||||
Data: make([]float64, 0, rowNum),
|
||||
}
|
||||
}
|
||||
doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData)
|
||||
|
||||
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
|
||||
totalLength += len(singleData)
|
||||
doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(len(singleData)))
|
||||
insertData.Data[fieldID] = doubleFieldData
|
||||
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
|
@ -658,15 +638,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &StringFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]string, 0, rowNum),
|
||||
Data: make([]string, 0, rowNum),
|
||||
}
|
||||
}
|
||||
stringFieldData := insertData.Data[fieldID].(*StringFieldData)
|
||||
|
||||
stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
|
||||
totalLength += len(stringPayload)
|
||||
stringFieldData.NumRows = append(stringFieldData.NumRows, int64(len(stringPayload)))
|
||||
insertData.Data[fieldID] = stringFieldData
|
||||
|
||||
case schemapb.DataType_BinaryVector:
|
||||
|
@ -680,8 +658,7 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &BinaryVectorFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]byte, 0, rowNum*dim),
|
||||
Data: make([]byte, 0, rowNum*dim),
|
||||
}
|
||||
}
|
||||
binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData)
|
||||
|
@ -694,7 +671,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
|
||||
}
|
||||
totalLength += length
|
||||
binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length))
|
||||
binaryVectorFieldData.Dim = dim
|
||||
insertData.Data[fieldID] = binaryVectorFieldData
|
||||
|
||||
|
@ -709,8 +685,7 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
|
||||
if insertData.Data[fieldID] == nil {
|
||||
insertData.Data[fieldID] = &FloatVectorFieldData{
|
||||
NumRows: make([]int64, 0),
|
||||
Data: make([]float32, 0, rowNum*dim),
|
||||
Data: make([]float32, 0, rowNum*dim),
|
||||
}
|
||||
}
|
||||
floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData)
|
||||
|
@ -723,7 +698,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
|
|||
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
|
||||
}
|
||||
totalLength += length
|
||||
floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length))
|
||||
floatVectorFieldData.Dim = dim
|
||||
insertData.Data[fieldID] = floatVectorFieldData
|
||||
|
||||
|
|
|
@ -21,11 +21,12 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -148,54 +149,42 @@ func TestInsertCodec(t *testing.T) {
|
|||
insertData1 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
TimestampField: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{3, 4},
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{3, 4},
|
||||
Data: []int16{3, 4},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{3, 4},
|
||||
Data: []int32{3, 4},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{3, 4},
|
||||
Data: []float32{3, 4},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{3, 4},
|
||||
Data: []float64{3, 4},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"3", "4"},
|
||||
Data: []string{"3", "4"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
|
||||
Dim: 4,
|
||||
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
|
||||
Dim: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -203,72 +192,60 @@ func TestInsertCodec(t *testing.T) {
|
|||
insertData2 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
TimestampField: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{1, 2},
|
||||
Data: []int8{1, 2},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{1, 2},
|
||||
Data: []int16{1, 2},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{1, 2},
|
||||
Data: []int32{1, 2},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{1, 2},
|
||||
Data: []float32{1, 2},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{1, 2},
|
||||
Data: []float64{1, 2},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"1", "2"},
|
||||
Data: []string{"1", "2"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
|
||||
Dim: 4,
|
||||
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
|
||||
Dim: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
insertDataEmpty := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{[]int64{}, []int64{}},
|
||||
TimestampField: &Int64FieldData{[]int64{}, []int64{}},
|
||||
BoolField: &BoolFieldData{[]int64{}, []bool{}},
|
||||
Int8Field: &Int8FieldData{[]int64{}, []int8{}},
|
||||
Int16Field: &Int16FieldData{[]int64{}, []int16{}},
|
||||
Int32Field: &Int32FieldData{[]int64{}, []int32{}},
|
||||
Int64Field: &Int64FieldData{[]int64{}, []int64{}},
|
||||
FloatField: &FloatFieldData{[]int64{}, []float32{}},
|
||||
DoubleField: &DoubleFieldData{[]int64{}, []float64{}},
|
||||
StringField: &StringFieldData{[]int64{}, []string{}},
|
||||
BinaryVectorField: &BinaryVectorFieldData{[]int64{}, []byte{}, 8},
|
||||
FloatVectorField: &FloatVectorFieldData{[]int64{}, []float32{}, 4},
|
||||
RowIDField: &Int64FieldData{[]int64{}},
|
||||
TimestampField: &Int64FieldData{[]int64{}},
|
||||
BoolField: &BoolFieldData{[]bool{}},
|
||||
Int8Field: &Int8FieldData{[]int8{}},
|
||||
Int16Field: &Int16FieldData{[]int16{}},
|
||||
Int32Field: &Int32FieldData{[]int32{}},
|
||||
Int64Field: &Int64FieldData{[]int64{}},
|
||||
FloatField: &FloatFieldData{[]float32{}},
|
||||
DoubleField: &DoubleFieldData{[]float64{}},
|
||||
StringField: &StringFieldData{[]string{}},
|
||||
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
|
||||
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
|
||||
},
|
||||
}
|
||||
b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
|
||||
|
@ -294,18 +271,6 @@ func TestInsertCodec(t *testing.T) {
|
|||
assert.Equal(t, UniqueID(CollectionID), collID)
|
||||
assert.Equal(t, UniqueID(PartitionID), partID)
|
||||
assert.Equal(t, UniqueID(SegmentID), segID)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[RowIDField].(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[TimestampField].(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[BoolField].(*BoolFieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[Int8Field].(*Int8FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[Int16Field].(*Int16FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[Int32Field].(*Int32FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[Int64Field].(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatField].(*FloatFieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[DoubleField].(*DoubleFieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[StringField].(*StringFieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows)
|
||||
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows)
|
||||
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[RowIDField].(*Int64FieldData).Data)
|
||||
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[TimestampField].(*Int64FieldData).Data)
|
||||
assert.Equal(t, []bool{true, false, true, false}, resultData.Data[BoolField].(*BoolFieldData).Data)
|
||||
|
@ -459,59 +424,146 @@ func TestTsError(t *testing.T) {
|
|||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
//func TestSchemaError(t *testing.T) {
|
||||
// schema := &etcdpb.CollectionMeta{
|
||||
// ID: CollectionID,
|
||||
// CreateTime: 1,
|
||||
// SegmentIDs: []int64{SegmentID},
|
||||
// PartitionTags: []string{"partition_0", "partition_1"},
|
||||
// Schema: &schemapb.CollectionSchema{
|
||||
// Name: "schema",
|
||||
// Description: "schema",
|
||||
// AutoID: true,
|
||||
// Fields: []*schemapb.FieldSchema{
|
||||
// {
|
||||
// FieldID: RowIDField,
|
||||
// Name: "row_id",
|
||||
// IsPrimaryKey: false,
|
||||
// Description: "row_id",
|
||||
// DataType: schemapb.DataType_Int64,
|
||||
// },
|
||||
// {
|
||||
// FieldID: TimestampField,
|
||||
// Name: "Timestamp",
|
||||
// IsPrimaryKey: false,
|
||||
// Description: "Timestamp",
|
||||
// DataType: schemapb.DataType_Int64,
|
||||
// },
|
||||
// {
|
||||
// FieldID: BoolField,
|
||||
// Name: "field_bool",
|
||||
// IsPrimaryKey: false,
|
||||
// Description: "bool",
|
||||
// DataType: 999,
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
// insertData := &InsertData{
|
||||
// Data: map[int64]FieldData{
|
||||
// RowIDField: &Int64FieldData{
|
||||
// NumRows: []int64{2},
|
||||
// Data: []int64{3, 4},
|
||||
// },
|
||||
// TimestampField: &Int64FieldData{
|
||||
// NumRows: []int64{2},
|
||||
// Data: []int64{3, 4},
|
||||
// },
|
||||
// BoolField: &BoolFieldData{
|
||||
// NumRows: []int64{2},
|
||||
// Data: []bool{true, false},
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
// insertCodec := NewInsertCodec(schema)
|
||||
// blobs, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData)
|
||||
// assert.Nil(t, blobs)
|
||||
// assert.NotNil(t, err)
|
||||
//}
|
||||
func TestMemorySize(t *testing.T) {
|
||||
insertData1 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{
|
||||
Data: []int64{3},
|
||||
},
|
||||
TimestampField: &Int64FieldData{
|
||||
Data: []int64{3},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
Data: []bool{true},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
Data: []int8{3},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
Data: []int16{3},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
Data: []int32{3},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
Data: []int64{3},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
Data: []float32{3},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
Data: []float64{3},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
Data: []string{"3"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
Data: []byte{0},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
Data: []float32{4, 5, 6, 7},
|
||||
Dim: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
assert.Equal(t, insertData1.Data[RowIDField].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData1.Data[TimestampField].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData1.Data[BoolField].GetMemorySize(), 1)
|
||||
assert.Equal(t, insertData1.Data[Int8Field].GetMemorySize(), 1)
|
||||
assert.Equal(t, insertData1.Data[Int16Field].GetMemorySize(), 2)
|
||||
assert.Equal(t, insertData1.Data[Int32Field].GetMemorySize(), 4)
|
||||
assert.Equal(t, insertData1.Data[Int64Field].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4)
|
||||
assert.Equal(t, insertData1.Data[DoubleField].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData1.Data[StringField].GetMemorySize(), 17)
|
||||
assert.Equal(t, insertData1.Data[BinaryVectorField].GetMemorySize(), 5)
|
||||
assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4)
|
||||
|
||||
insertData2 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
TimestampField: &Int64FieldData{
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
Data: []int8{1, 2},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
Data: []int16{1, 2},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
Data: []int32{1, 2},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
Data: []float32{1, 2},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
Data: []float64{1, 2},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
Data: []string{"1", "23"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
|
||||
Dim: 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, insertData2.Data[RowIDField].GetMemorySize(), 16)
|
||||
assert.Equal(t, insertData2.Data[TimestampField].GetMemorySize(), 16)
|
||||
assert.Equal(t, insertData2.Data[BoolField].GetMemorySize(), 2)
|
||||
assert.Equal(t, insertData2.Data[Int8Field].GetMemorySize(), 2)
|
||||
assert.Equal(t, insertData2.Data[Int16Field].GetMemorySize(), 4)
|
||||
assert.Equal(t, insertData2.Data[Int32Field].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData2.Data[Int64Field].GetMemorySize(), 16)
|
||||
assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8)
|
||||
assert.Equal(t, insertData2.Data[DoubleField].GetMemorySize(), 16)
|
||||
assert.Equal(t, insertData2.Data[StringField].GetMemorySize(), 35)
|
||||
assert.Equal(t, insertData2.Data[BinaryVectorField].GetMemorySize(), 6)
|
||||
assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8)
|
||||
|
||||
insertDataEmpty := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
RowIDField: &Int64FieldData{[]int64{}},
|
||||
TimestampField: &Int64FieldData{[]int64{}},
|
||||
BoolField: &BoolFieldData{[]bool{}},
|
||||
Int8Field: &Int8FieldData{[]int8{}},
|
||||
Int16Field: &Int16FieldData{[]int16{}},
|
||||
Int32Field: &Int32FieldData{[]int32{}},
|
||||
Int64Field: &Int64FieldData{[]int64{}},
|
||||
FloatField: &FloatFieldData{[]float32{}},
|
||||
DoubleField: &DoubleFieldData{[]float64{}},
|
||||
StringField: &StringFieldData{[]string{}},
|
||||
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
|
||||
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, insertDataEmpty.Data[RowIDField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[TimestampField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[BoolField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[Int8Field].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[Int16Field].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[Int32Field].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[Int64Field].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[FloatField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[DoubleField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[StringField].GetMemorySize(), 0)
|
||||
assert.Equal(t, insertDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4)
|
||||
assert.Equal(t, insertDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
|
||||
|
||||
}
|
||||
|
|
|
@ -128,54 +128,42 @@ func TestDataSorter(t *testing.T) {
|
|||
insertDataFirst := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int64FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int64{3, 4, 2},
|
||||
Data: []int64{3, 4, 2},
|
||||
},
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int64{3, 4, 5},
|
||||
Data: []int64{3, 4, 5},
|
||||
},
|
||||
100: &BoolFieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []bool{true, false, true},
|
||||
Data: []bool{true, false, true},
|
||||
},
|
||||
101: &Int8FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int8{3, 4, 5},
|
||||
Data: []int8{3, 4, 5},
|
||||
},
|
||||
102: &Int16FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int16{3, 4, 5},
|
||||
Data: []int16{3, 4, 5},
|
||||
},
|
||||
103: &Int32FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int32{3, 4, 5},
|
||||
Data: []int32{3, 4, 5},
|
||||
},
|
||||
104: &Int64FieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []int64{3, 4, 5},
|
||||
Data: []int64{3, 4, 5},
|
||||
},
|
||||
105: &FloatFieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []float32{3, 4, 5},
|
||||
Data: []float32{3, 4, 5},
|
||||
},
|
||||
106: &DoubleFieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []float64{3, 4, 5},
|
||||
Data: []float64{3, 4, 5},
|
||||
},
|
||||
107: &StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"3", "4", "5"},
|
||||
Data: []string{"3", "4", "5"},
|
||||
},
|
||||
108: &BinaryVectorFieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []byte{0, 255, 128},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255, 128},
|
||||
Dim: 8,
|
||||
},
|
||||
109: &FloatVectorFieldData{
|
||||
NumRows: []int64{3},
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
|
||||
Dim: 8,
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
|
||||
Dim: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -244,8 +232,7 @@ func TestDataSorter_Len(t *testing.T) {
|
|||
insertData := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{6, 4},
|
||||
Data: []int64{6, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -261,8 +248,7 @@ func TestDataSorter_Len(t *testing.T) {
|
|||
insertData = &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{3, 4},
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -280,8 +266,7 @@ func TestDataSorter_Less(t *testing.T) {
|
|||
insertData := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{6, 4},
|
||||
Data: []int64{6, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -297,8 +282,7 @@ func TestDataSorter_Less(t *testing.T) {
|
|||
insertData = &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{3, 4},
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -314,8 +298,7 @@ func TestDataSorter_Less(t *testing.T) {
|
|||
insertData = &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{6, 4},
|
||||
Data: []int64{6, 4},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -185,54 +185,42 @@ func TestPrintBinlogFiles(t *testing.T) {
|
|||
insertDataFirst := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
100: &BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
101: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{3, 4},
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
102: &Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{3, 4},
|
||||
Data: []int16{3, 4},
|
||||
},
|
||||
103: &Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{3, 4},
|
||||
Data: []int32{3, 4},
|
||||
},
|
||||
104: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
105: &FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{3, 4},
|
||||
Data: []float32{3, 4},
|
||||
},
|
||||
106: &DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{3, 4},
|
||||
Data: []float64{3, 4},
|
||||
},
|
||||
107: &StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"3", "4"},
|
||||
Data: []string{"3", "4"},
|
||||
},
|
||||
108: &BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
109: &FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
|
||||
Dim: 8,
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
|
||||
Dim: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -240,54 +228,42 @@ func TestPrintBinlogFiles(t *testing.T) {
|
|||
insertDataSecond := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
100: &BoolFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []bool{true, false},
|
||||
Data: []bool{true, false},
|
||||
},
|
||||
101: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{1, 2},
|
||||
Data: []int8{1, 2},
|
||||
},
|
||||
102: &Int16FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int16{1, 2},
|
||||
Data: []int16{1, 2},
|
||||
},
|
||||
103: &Int32FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int32{1, 2},
|
||||
Data: []int32{1, 2},
|
||||
},
|
||||
104: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{1, 2},
|
||||
Data: []int64{1, 2},
|
||||
},
|
||||
105: &FloatFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{1, 2},
|
||||
Data: []float32{1, 2},
|
||||
},
|
||||
106: &DoubleFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float64{1, 2},
|
||||
Data: []float64{1, 2},
|
||||
},
|
||||
107: &StringFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []string{"1", "2"},
|
||||
Data: []string{"1", "2"},
|
||||
},
|
||||
108: &BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
109: &FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
|
||||
Dim: 8,
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
|
||||
Dim: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -315,9 +315,8 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
|
|||
|
||||
vecs := readFloatVectors(blobReaders, dim)
|
||||
idata.Data[field.FieldID] = &FloatVectorFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
}
|
||||
|
||||
case schemapb.DataType_BinaryVector:
|
||||
|
@ -330,39 +329,33 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
|
|||
|
||||
vecs := readBinaryVectors(blobReaders, dim)
|
||||
idata.Data[field.FieldID] = &BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
Data: vecs,
|
||||
Dim: dim,
|
||||
}
|
||||
|
||||
case schemapb.DataType_Bool:
|
||||
idata.Data[field.FieldID] = &BoolFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readBoolArray(blobReaders),
|
||||
Data: readBoolArray(blobReaders),
|
||||
}
|
||||
|
||||
case schemapb.DataType_Int8:
|
||||
idata.Data[field.FieldID] = &Int8FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readInt8Array(blobReaders),
|
||||
Data: readInt8Array(blobReaders),
|
||||
}
|
||||
|
||||
case schemapb.DataType_Int16:
|
||||
idata.Data[field.FieldID] = &Int16FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readInt16Array(blobReaders),
|
||||
Data: readInt16Array(blobReaders),
|
||||
}
|
||||
|
||||
case schemapb.DataType_Int32:
|
||||
idata.Data[field.FieldID] = &Int32FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readInt32Array(blobReaders),
|
||||
Data: readInt32Array(blobReaders),
|
||||
}
|
||||
|
||||
case schemapb.DataType_Int64:
|
||||
idata.Data[field.FieldID] = &Int64FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
|
||||
fieldData := idata.Data[field.FieldID].(*Int64FieldData)
|
||||
|
@ -379,14 +372,12 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
|
|||
|
||||
case schemapb.DataType_Float:
|
||||
idata.Data[field.FieldID] = &FloatFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readFloatArray(blobReaders),
|
||||
Data: readFloatArray(blobReaders),
|
||||
}
|
||||
|
||||
case schemapb.DataType_Double:
|
||||
idata.Data[field.FieldID] = &DoubleFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: readDoubleArray(blobReaders),
|
||||
Data: readDoubleArray(blobReaders),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -418,9 +409,8 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetVectors().GetFloatVector().GetData()
|
||||
|
||||
fieldData := &FloatVectorFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]float32, 0, len(srcData)),
|
||||
Dim: dim,
|
||||
Data: make([]float32, 0, len(srcData)),
|
||||
Dim: dim,
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -436,9 +426,8 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetVectors().GetBinaryVector()
|
||||
|
||||
fieldData := &BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]byte, 0, len(srcData)),
|
||||
Dim: dim,
|
||||
Data: make([]byte, 0, len(srcData)),
|
||||
Dim: dim,
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -448,8 +437,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetBoolData().GetData()
|
||||
|
||||
fieldData := &BoolFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]bool, 0, len(srcData)),
|
||||
Data: make([]bool, 0, len(srcData)),
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -459,8 +447,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
|
||||
|
||||
fieldData := &Int8FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]int8, 0, len(srcData)),
|
||||
Data: make([]int8, 0, len(srcData)),
|
||||
}
|
||||
int8SrcData := make([]int8, len(srcData))
|
||||
for i := 0; i < len(srcData); i++ {
|
||||
|
@ -474,8 +461,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
|
||||
|
||||
fieldData := &Int16FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]int16, 0, len(srcData)),
|
||||
Data: make([]int16, 0, len(srcData)),
|
||||
}
|
||||
int16SrcData := make([]int16, len(srcData))
|
||||
for i := 0; i < len(srcData); i++ {
|
||||
|
@ -489,8 +475,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
|
||||
|
||||
fieldData := &Int32FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]int32, 0, len(srcData)),
|
||||
Data: make([]int32, 0, len(srcData)),
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -498,8 +483,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
|
||||
case schemapb.DataType_Int64:
|
||||
fieldData := &Int64FieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]int64, 0),
|
||||
Data: make([]int64, 0),
|
||||
}
|
||||
|
||||
switch field.FieldID {
|
||||
|
@ -523,8 +507,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetFloatData().GetData()
|
||||
|
||||
fieldData := &FloatFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]float32, 0, len(srcData)),
|
||||
Data: make([]float32, 0, len(srcData)),
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -534,8 +517,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetDoubleData().GetData()
|
||||
|
||||
fieldData := &DoubleFieldData{
|
||||
NumRows: []int64{int64(msg.NRows())},
|
||||
Data: make([]float64, 0, len(srcData)),
|
||||
Data: make([]float64, 0, len(srcData)),
|
||||
}
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
||||
|
@ -544,8 +526,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
|
|||
srcData := srcFields[field.FieldID].GetScalars().GetStringData().GetData()
|
||||
|
||||
fieldData := &StringFieldData{
|
||||
NumRows: []int64{int64(msg.NumRows)},
|
||||
Data: make([]string, 0, len(srcData)),
|
||||
Data: make([]string, 0, len(srcData)),
|
||||
}
|
||||
|
||||
fieldData.Data = append(fieldData.Data, srcData...)
|
||||
|
@ -566,133 +547,113 @@ func InsertMsgToInsertData(msg *msgstream.InsertMsg, schema *schemapb.Collection
|
|||
func mergeBoolField(data *InsertData, fid FieldID, field *BoolFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &BoolFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*BoolFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeInt8Field(data *InsertData, fid FieldID, field *Int8FieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &Int8FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*Int8FieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeInt16Field(data *InsertData, fid FieldID, field *Int16FieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &Int16FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*Int16FieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeInt32Field(data *InsertData, fid FieldID, field *Int32FieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &Int32FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*Int32FieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeInt64Field(data *InsertData, fid FieldID, field *Int64FieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &Int64FieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*Int64FieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeFloatField(data *InsertData, fid FieldID, field *FloatFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &FloatFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*FloatFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeDoubleField(data *InsertData, fid FieldID, field *DoubleFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &DoubleFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*DoubleFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeStringField(data *InsertData, fid FieldID, field *StringFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &StringFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Data: nil,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*StringFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeBinaryVectorField(data *InsertData, fid FieldID, field *BinaryVectorFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &BinaryVectorFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Dim: field.Dim,
|
||||
Data: nil,
|
||||
Dim: field.Dim,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*BinaryVectorFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
func mergeFloatVectorField(data *InsertData, fid FieldID, field *FloatVectorFieldData) {
|
||||
if _, ok := data.Data[fid]; !ok {
|
||||
fieldData := &FloatVectorFieldData{
|
||||
NumRows: []int64{0},
|
||||
Data: nil,
|
||||
Dim: field.Dim,
|
||||
Data: nil,
|
||||
Dim: field.Dim,
|
||||
}
|
||||
data.Data[fid] = fieldData
|
||||
}
|
||||
fieldData := data.Data[fid].(*FloatVectorFieldData)
|
||||
fieldData.Data = append(fieldData.Data, field.Data...)
|
||||
fieldData.NumRows[0] += int64(field.RowNum())
|
||||
}
|
||||
|
||||
// MergeFieldData merge field into data.
|
||||
|
|
|
@ -68,16 +68,13 @@ func TestCheckNumRows(t *testing.T) {
|
|||
assert.True(t, checkNumRows())
|
||||
|
||||
f1 := &Int64FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int64{1, 2, 3},
|
||||
Data: []int64{1, 2, 3},
|
||||
}
|
||||
f2 := &Int64FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int64{1, 2, 3},
|
||||
Data: []int64{1, 2, 3},
|
||||
}
|
||||
f3 := &Int64FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int64{1, 2, 3, 4},
|
||||
Data: []int64{1, 2, 3, 4},
|
||||
}
|
||||
|
||||
assert.True(t, checkNumRows(f1, f2))
|
||||
|
@ -88,16 +85,13 @@ func TestCheckNumRows(t *testing.T) {
|
|||
|
||||
func TestSortFieldDataList(t *testing.T) {
|
||||
f1 := &Int16FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int16{1, 2, 3},
|
||||
Data: []int16{1, 2, 3},
|
||||
}
|
||||
f2 := &Int32FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int32{4, 5, 6},
|
||||
Data: []int32{4, 5, 6},
|
||||
}
|
||||
f3 := &Int64FieldData{
|
||||
NumRows: nil,
|
||||
Data: []int64{7, 8, 9},
|
||||
Data: []int64{7, 8, 9},
|
||||
}
|
||||
|
||||
ls := fieldDataList{
|
||||
|
@ -840,54 +834,42 @@ func TestMergeInsertData(t *testing.T) {
|
|||
d1 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
common.RowIDField: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{1},
|
||||
Data: []int64{1},
|
||||
},
|
||||
common.TimeStampField: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{1},
|
||||
Data: []int64{1},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []bool{true},
|
||||
Data: []bool{true},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int8{1},
|
||||
Data: []int8{1},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int16{1},
|
||||
Data: []int16{1},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int32{1},
|
||||
Data: []int32{1},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{1},
|
||||
Data: []int64{1},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float32{0},
|
||||
Data: []float32{0},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float64{0},
|
||||
Data: []float64{0},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []string{"1"},
|
||||
Data: []string{"1"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []byte{0},
|
||||
Dim: 8,
|
||||
Data: []byte{0},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float32{0},
|
||||
Dim: 1,
|
||||
Data: []float32{0},
|
||||
Dim: 1,
|
||||
},
|
||||
},
|
||||
Infos: nil,
|
||||
|
@ -895,54 +877,42 @@ func TestMergeInsertData(t *testing.T) {
|
|||
d2 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
common.RowIDField: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{2},
|
||||
Data: []int64{2},
|
||||
},
|
||||
common.TimeStampField: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{2},
|
||||
Data: []int64{2},
|
||||
},
|
||||
BoolField: &BoolFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []bool{false},
|
||||
Data: []bool{false},
|
||||
},
|
||||
Int8Field: &Int8FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int8{2},
|
||||
Data: []int8{2},
|
||||
},
|
||||
Int16Field: &Int16FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int16{2},
|
||||
Data: []int16{2},
|
||||
},
|
||||
Int32Field: &Int32FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int32{2},
|
||||
Data: []int32{2},
|
||||
},
|
||||
Int64Field: &Int64FieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []int64{2},
|
||||
Data: []int64{2},
|
||||
},
|
||||
FloatField: &FloatFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float32{0},
|
||||
Data: []float32{0},
|
||||
},
|
||||
DoubleField: &DoubleFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float64{0},
|
||||
Data: []float64{0},
|
||||
},
|
||||
StringField: &StringFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []string{"2"},
|
||||
Data: []string{"2"},
|
||||
},
|
||||
BinaryVectorField: &BinaryVectorFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []byte{0},
|
||||
Dim: 8,
|
||||
Data: []byte{0},
|
||||
Dim: 8,
|
||||
},
|
||||
FloatVectorField: &FloatVectorFieldData{
|
||||
NumRows: []int64{1},
|
||||
Data: []float32{0},
|
||||
Dim: 1,
|
||||
Data: []float32{0},
|
||||
Dim: 1,
|
||||
},
|
||||
},
|
||||
Infos: nil,
|
||||
|
@ -952,62 +922,50 @@ func TestMergeInsertData(t *testing.T) {
|
|||
|
||||
f, ok := merged.Data[common.RowIDField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[common.TimeStampField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[BoolField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*BoolFieldData).NumRows)
|
||||
assert.Equal(t, []bool{true, false}, f.(*BoolFieldData).Data)
|
||||
|
||||
f, ok = merged.Data[Int8Field]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int8FieldData).NumRows)
|
||||
assert.Equal(t, []int8{1, 2}, f.(*Int8FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[Int16Field]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int16FieldData).NumRows)
|
||||
assert.Equal(t, []int16{1, 2}, f.(*Int16FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[Int32Field]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int32FieldData).NumRows)
|
||||
assert.Equal(t, []int32{1, 2}, f.(*Int32FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[Int64Field]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
|
||||
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
|
||||
|
||||
f, ok = merged.Data[FloatField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*FloatFieldData).NumRows)
|
||||
assert.Equal(t, []float32{0, 0}, f.(*FloatFieldData).Data)
|
||||
|
||||
f, ok = merged.Data[DoubleField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*DoubleFieldData).NumRows)
|
||||
assert.Equal(t, []float64{0, 0}, f.(*DoubleFieldData).Data)
|
||||
|
||||
f, ok = merged.Data[StringField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*StringFieldData).NumRows)
|
||||
assert.Equal(t, []string{"1", "2"}, f.(*StringFieldData).Data)
|
||||
|
||||
f, ok = merged.Data[BinaryVectorField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*BinaryVectorFieldData).NumRows)
|
||||
assert.Equal(t, []byte{0, 0}, f.(*BinaryVectorFieldData).Data)
|
||||
|
||||
f, ok = merged.Data[FloatVectorField]
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []int64{2}, f.(*FloatVectorFieldData).NumRows)
|
||||
assert.Equal(t, []float32{0, 0}, f.(*FloatVectorFieldData).Data)
|
||||
}
|
||||
|
||||
|
|
|
@ -88,26 +88,21 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob {
|
|||
insertData := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
0: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
1: &Int64FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int64{3, 4},
|
||||
Data: []int64{3, 4},
|
||||
},
|
||||
101: &Int8FieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []int8{3, 4},
|
||||
Data: []int8{3, 4},
|
||||
},
|
||||
108: &BinaryVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
Data: []byte{0, 255},
|
||||
Dim: 8,
|
||||
},
|
||||
109: &FloatVectorFieldData{
|
||||
NumRows: []int64{2},
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666},
|
||||
Dim: 8,
|
||||
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666},
|
||||
Dim: 8,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -557,7 +557,6 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
|
|||
|
||||
// append the entity to primary key's FieldData
|
||||
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, key)
|
||||
field.(*storage.Int64FieldData).NumRows[0]++
|
||||
|
||||
shardList = append(shardList, int32(shardID))
|
||||
}
|
||||
|
@ -609,7 +608,6 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
|
|||
|
||||
// append the entity to primary key's FieldData
|
||||
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, key)
|
||||
field.(*storage.StringFieldData).NumRows[0]++
|
||||
|
||||
shardList = append(shardList, int32(shardID))
|
||||
}
|
||||
|
@ -773,7 +771,6 @@ func (p *BinlogAdapter) dispatchBoolToShards(data []bool, memoryData []map[stora
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, val)
|
||||
field.(*storage.BoolFieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -797,7 +794,6 @@ func (p *BinlogAdapter) dispatchInt8ToShards(data []int8, memoryData []map[stora
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, val)
|
||||
field.(*storage.Int8FieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -821,7 +817,6 @@ func (p *BinlogAdapter) dispatchInt16ToShards(data []int16, memoryData []map[sto
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, val)
|
||||
field.(*storage.Int16FieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -845,7 +840,6 @@ func (p *BinlogAdapter) dispatchInt32ToShards(data []int32, memoryData []map[sto
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, val)
|
||||
field.(*storage.Int32FieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -869,7 +863,6 @@ func (p *BinlogAdapter) dispatchInt64ToShards(data []int64, memoryData []map[sto
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, val)
|
||||
field.(*storage.Int64FieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -893,7 +886,6 @@ func (p *BinlogAdapter) dispatchFloatToShards(data []float32, memoryData []map[s
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, val)
|
||||
field.(*storage.FloatFieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -917,7 +909,6 @@ func (p *BinlogAdapter) dispatchDoubleToShards(data []float64, memoryData []map[
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, val)
|
||||
field.(*storage.DoubleFieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -941,7 +932,6 @@ func (p *BinlogAdapter) dispatchVarcharToShards(data []string, memoryData []map[
|
|||
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
|
||||
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
|
||||
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, val)
|
||||
field.(*storage.StringFieldData).NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -983,7 +973,6 @@ func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryDa
|
|||
|
||||
binVecField.Data = append(binVecField.Data, val)
|
||||
}
|
||||
binVecField.NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -1023,7 +1012,6 @@ func (p *BinlogAdapter) dispatchFloatVecToShards(data []float32, dim int, memory
|
|||
val := data[dim*i+j]
|
||||
floatVecField.Data = append(floatVecField.Data, val)
|
||||
}
|
||||
floatVecField.NumRows[0]++
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -54,8 +54,7 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
|
|||
// rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator
|
||||
// if primary key is int64 and autoID=true, primary key field is equal to rowID field
|
||||
segmentData[common.RowIDField] = &storage.Int64FieldData{
|
||||
Data: make([]int64, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int64, 0),
|
||||
}
|
||||
|
||||
for i := 0; i < len(collectionSchema.Fields); i++ {
|
||||
|
@ -63,57 +62,47 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
|
|||
switch schema.DataType {
|
||||
case schemapb.DataType_Bool:
|
||||
segmentData[schema.GetFieldID()] = &storage.BoolFieldData{
|
||||
Data: make([]bool, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]bool, 0),
|
||||
}
|
||||
case schemapb.DataType_Float:
|
||||
segmentData[schema.GetFieldID()] = &storage.FloatFieldData{
|
||||
Data: make([]float32, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]float32, 0),
|
||||
}
|
||||
case schemapb.DataType_Double:
|
||||
segmentData[schema.GetFieldID()] = &storage.DoubleFieldData{
|
||||
Data: make([]float64, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]float64, 0),
|
||||
}
|
||||
case schemapb.DataType_Int8:
|
||||
segmentData[schema.GetFieldID()] = &storage.Int8FieldData{
|
||||
Data: make([]int8, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int8, 0),
|
||||
}
|
||||
case schemapb.DataType_Int16:
|
||||
segmentData[schema.GetFieldID()] = &storage.Int16FieldData{
|
||||
Data: make([]int16, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int16, 0),
|
||||
}
|
||||
case schemapb.DataType_Int32:
|
||||
segmentData[schema.GetFieldID()] = &storage.Int32FieldData{
|
||||
Data: make([]int32, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int32, 0),
|
||||
}
|
||||
case schemapb.DataType_Int64:
|
||||
segmentData[schema.GetFieldID()] = &storage.Int64FieldData{
|
||||
Data: make([]int64, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int64, 0),
|
||||
}
|
||||
case schemapb.DataType_BinaryVector:
|
||||
dim, _ := getFieldDimension(schema)
|
||||
segmentData[schema.GetFieldID()] = &storage.BinaryVectorFieldData{
|
||||
Data: make([]byte, 0),
|
||||
NumRows: []int64{0},
|
||||
Dim: dim,
|
||||
Data: make([]byte, 0),
|
||||
Dim: dim,
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
dim, _ := getFieldDimension(schema)
|
||||
segmentData[schema.GetFieldID()] = &storage.FloatVectorFieldData{
|
||||
Data: make([]float32, 0),
|
||||
NumRows: []int64{0},
|
||||
Dim: dim,
|
||||
Data: make([]float32, 0),
|
||||
Dim: dim,
|
||||
}
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
segmentData[schema.GetFieldID()] = &storage.StringFieldData{
|
||||
Data: make([]string, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]string, 0),
|
||||
}
|
||||
default:
|
||||
log.Error("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType)))
|
||||
|
@ -158,7 +147,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
||||
if value, ok := obj.(bool); ok {
|
||||
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
|
||||
field.(*storage.BoolFieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for bool type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -173,7 +161,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return err
|
||||
}
|
||||
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value))
|
||||
field.(*storage.FloatFieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for float type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -188,7 +175,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return err
|
||||
}
|
||||
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
|
||||
field.(*storage.DoubleFieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for double type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -202,7 +188,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return fmt.Errorf("failed to parse value '%v' for int8 field '%s', error: %w", num, schema.GetName(), err)
|
||||
}
|
||||
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value))
|
||||
field.(*storage.Int8FieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for int8 type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -216,7 +201,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return fmt.Errorf("failed to parse value '%v' for int16 field '%s', error: %w", num, schema.GetName(), err)
|
||||
}
|
||||
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value))
|
||||
field.(*storage.Int16FieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for int16 type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -230,7 +214,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return fmt.Errorf("failed to parse value '%v' for int32 field '%s', error: %w", num, schema.GetName(), err)
|
||||
}
|
||||
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value))
|
||||
field.(*storage.Int32FieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for int32 type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -244,7 +227,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
return fmt.Errorf("failed to parse value '%v' for int64 field '%s', error: %w", num, schema.GetName(), err)
|
||||
}
|
||||
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
|
||||
field.(*storage.Int64FieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for int64 type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
@ -279,7 +261,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
}
|
||||
}
|
||||
|
||||
field.(*storage.BinaryVectorFieldData).NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
|
@ -310,7 +291,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
}
|
||||
}
|
||||
|
||||
field.(*storage.FloatVectorFieldData).NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
|
@ -319,7 +299,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
|
|||
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
|
||||
if value, ok := obj.(string); ok {
|
||||
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
|
||||
field.(*storage.StringFieldData).NumRows[0]++
|
||||
} else {
|
||||
return fmt.Errorf("illegal value '%v' for varchar type field '%s'", obj, schema.GetName())
|
||||
}
|
||||
|
|
|
@ -504,7 +504,7 @@ func Test_TryFlushBlocks(t *testing.T) {
|
|||
}
|
||||
|
||||
blockSize := int64(1024)
|
||||
maxTotalSize := int64(2048)
|
||||
maxTotalSize := int64(4096)
|
||||
shardNum := int32(3)
|
||||
|
||||
// prepare flush data, 3 shards, each shard 10 rows
|
||||
|
|
|
@ -240,7 +240,6 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
|
|||
shard = hash % uint32(v.shardNum)
|
||||
pkArray := v.segmentsData[shard][v.primaryKey].(*storage.StringFieldData)
|
||||
pkArray.Data = append(pkArray.Data, pk)
|
||||
pkArray.NumRows[0]++
|
||||
} else {
|
||||
// get/generate the row id
|
||||
var pk int64
|
||||
|
@ -269,13 +268,11 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
|
|||
shard = hash % uint32(v.shardNum)
|
||||
pkArray := v.segmentsData[shard][v.primaryKey].(*storage.Int64FieldData)
|
||||
pkArray.Data = append(pkArray.Data, pk)
|
||||
pkArray.NumRows[0]++
|
||||
}
|
||||
|
||||
// set rowid field
|
||||
rowIDField := v.segmentsData[shard][common.RowIDField].(*storage.Int64FieldData)
|
||||
rowIDField.Data = append(rowIDField.Data, rowIDBegin+int64(i))
|
||||
rowIDField.NumRows[0]++
|
||||
|
||||
// convert value and consume
|
||||
for name, validator := range v.validators {
|
||||
|
|
|
@ -141,7 +141,6 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
|
|||
for j := 0; j < rowCountEachShard; j++ {
|
||||
pkFieldData.Data = append(pkFieldData.Data, int64(j))
|
||||
}
|
||||
pkFieldData.NumRows = []int64{int64(rowCountEachShard)}
|
||||
}
|
||||
|
||||
err = consumer.flush(true)
|
||||
|
@ -162,7 +161,6 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
|
|||
for j := 0; j < rowCountEachShard; j++ {
|
||||
pkFieldData.Data = append(pkFieldData.Data, int64(j))
|
||||
}
|
||||
pkFieldData.NumRows = []int64{int64(rowCountEachShard)}
|
||||
}
|
||||
err = consumer.flush(true)
|
||||
assert.Nil(t, err)
|
||||
|
@ -208,7 +206,6 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
|
|||
for i := 0; i < 10; i++ {
|
||||
pkFieldData.Data = append(pkFieldData.Data, int64(i))
|
||||
}
|
||||
pkFieldData.NumRows = []int64{int64(10)}
|
||||
|
||||
// nil input will trigger flush
|
||||
err = consumer.Handle(nil)
|
||||
|
@ -222,7 +219,6 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
|
|||
for j := 0; j < rowCount; j++ {
|
||||
pkFieldData.Data = append(pkFieldData.Data, int64(j))
|
||||
}
|
||||
pkFieldData.NumRows = []int64{int64(rowCount)}
|
||||
|
||||
input := make([]map[storage.FieldID]interface{}, rowCount)
|
||||
for j := 0; j < rowCount; j++ {
|
||||
|
|
|
@ -444,8 +444,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.BoolFieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Int8:
|
||||
data, err := columnReader.reader.ReadInt8(rowCount)
|
||||
|
@ -455,8 +454,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.Int8FieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Int16:
|
||||
data, err := columnReader.reader.ReadInt16(rowCount)
|
||||
|
@ -466,8 +464,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.Int16FieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Int32:
|
||||
data, err := columnReader.reader.ReadInt32(rowCount)
|
||||
|
@ -477,8 +474,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.Int32FieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Int64:
|
||||
data, err := columnReader.reader.ReadInt64(rowCount)
|
||||
|
@ -488,8 +484,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.Int64FieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Float:
|
||||
data, err := columnReader.reader.ReadFloat32(rowCount)
|
||||
|
@ -499,8 +494,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.FloatFieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_Double:
|
||||
data, err := columnReader.reader.ReadFloat64(rowCount)
|
||||
|
@ -510,8 +504,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.DoubleFieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_VarChar:
|
||||
data, err := columnReader.reader.ReadString(rowCount)
|
||||
|
@ -521,8 +514,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.StringFieldData{
|
||||
NumRows: []int64{int64(len(data))},
|
||||
Data: data,
|
||||
Data: data,
|
||||
}, nil
|
||||
case schemapb.DataType_BinaryVector:
|
||||
data, err := columnReader.reader.ReadUint8(rowCount * (columnReader.dimension / 8))
|
||||
|
@ -532,9 +524,8 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(len(data) * 8 / columnReader.dimension)},
|
||||
Data: data,
|
||||
Dim: columnReader.dimension,
|
||||
Data: data,
|
||||
Dim: columnReader.dimension,
|
||||
}, nil
|
||||
case schemapb.DataType_FloatVector:
|
||||
// float32/float64 numpy file can be used for float vector file, 2 reasons:
|
||||
|
@ -564,9 +555,8 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
|
|||
}
|
||||
|
||||
return &storage.FloatVectorFieldData{
|
||||
NumRows: []int64{int64(len(data) / columnReader.dimension)},
|
||||
Data: data,
|
||||
Dim: columnReader.dimension,
|
||||
Data: data,
|
||||
Dim: columnReader.dimension,
|
||||
}, nil
|
||||
default:
|
||||
log.Error("Numpy parser: unsupported data type of field", zap.Any("dataType", columnReader.dataType),
|
||||
|
@ -583,63 +573,54 @@ func (p *NumpyParser) appendFunc(schema *schemapb.FieldSchema) func(src storage.
|
|||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.BoolFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(bool))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Float:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.FloatFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(float32))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Double:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.DoubleFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(float64))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Int8:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.Int8FieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(int8))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Int16:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.Int16FieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(int16))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Int32:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.Int32FieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(int32))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_Int64:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.Int64FieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).(int64))
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_BinaryVector:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.BinaryVectorFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).([]byte)...)
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_FloatVector:
|
||||
return func(src storage.FieldData, n int, target storage.FieldData) error {
|
||||
arr := target.(*storage.FloatVectorFieldData)
|
||||
arr.Data = append(arr.Data, src.GetRow(n).([]float32)...)
|
||||
arr.NumRows[0]++
|
||||
return nil
|
||||
}
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
|
@ -736,8 +717,7 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie
|
|||
rowIDField, ok := fieldsData[common.RowIDField]
|
||||
if !ok {
|
||||
rowIDField = &storage.Int64FieldData{
|
||||
Data: make([]int64, 0),
|
||||
NumRows: []int64{0},
|
||||
Data: make([]int64, 0),
|
||||
}
|
||||
fieldsData[common.RowIDField] = rowIDField
|
||||
}
|
||||
|
@ -755,8 +735,7 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie
|
|||
}
|
||||
|
||||
primaryDataArr := &storage.Int64FieldData{
|
||||
NumRows: []int64{int64(rowCount)},
|
||||
Data: make([]int64, 0, rowCount),
|
||||
Data: make([]int64, 0, rowCount),
|
||||
}
|
||||
for i := rowIDBegin; i < rowIDEnd; i++ {
|
||||
primaryDataArr.Data = append(primaryDataArr.Data, i)
|
||||
|
|
Loading…
Reference in New Issue