Fix Memory calculation (#21798)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
pull/21838/head
Xiaofan 2023-01-28 11:09:51 +08:00 committed by GitHub
parent 76ad254542
commit 375b2b355d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 459 additions and 710 deletions

View File

@ -689,12 +689,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
// TODO copy maybe expensive, but this seems to be the only convinent way.
func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) {
var rst storage.FieldData
numOfRows := []int64{numRows}
switch schemaDataType {
case schemapb.DataType_Bool:
var data = &storage.BoolFieldData{
NumRows: numOfRows,
Data: make([]bool, 0, len(content)),
Data: make([]bool, 0, len(content)),
}
for _, c := range content {
@ -708,8 +706,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Int8:
var data = &storage.Int8FieldData{
NumRows: numOfRows,
Data: make([]int8, 0, len(content)),
Data: make([]int8, 0, len(content)),
}
for _, c := range content {
@ -723,8 +720,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Int16:
var data = &storage.Int16FieldData{
NumRows: numOfRows,
Data: make([]int16, 0, len(content)),
Data: make([]int16, 0, len(content)),
}
for _, c := range content {
@ -738,8 +734,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Int32:
var data = &storage.Int32FieldData{
NumRows: numOfRows,
Data: make([]int32, 0, len(content)),
Data: make([]int32, 0, len(content)),
}
for _, c := range content {
@ -753,8 +748,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Int64:
var data = &storage.Int64FieldData{
NumRows: numOfRows,
Data: make([]int64, 0, len(content)),
Data: make([]int64, 0, len(content)),
}
for _, c := range content {
@ -768,8 +762,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Float:
var data = &storage.FloatFieldData{
NumRows: numOfRows,
Data: make([]float32, 0, len(content)),
Data: make([]float32, 0, len(content)),
}
for _, c := range content {
@ -783,8 +776,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_Double:
var data = &storage.DoubleFieldData{
NumRows: numOfRows,
Data: make([]float64, 0, len(content)),
Data: make([]float64, 0, len(content)),
}
for _, c := range content {
@ -798,8 +790,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_String, schemapb.DataType_VarChar:
var data = &storage.StringFieldData{
NumRows: numOfRows,
Data: make([]string, 0, len(content)),
Data: make([]string, 0, len(content)),
}
for _, c := range content {
@ -813,8 +804,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_FloatVector:
var data = &storage.FloatVectorFieldData{
NumRows: numOfRows,
Data: []float32{},
Data: []float32{},
}
for _, c := range content {
@ -830,8 +820,7 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
case schemapb.DataType_BinaryVector:
var data = &storage.BinaryVectorFieldData{
NumRows: numOfRows,
Data: []byte{},
Data: []byte{},
}
for _, c := range content {

View File

@ -1413,8 +1413,7 @@ func createBinLogs(rowNum int, schema *schemapb.CollectionSchema, ts Timestamp,
tsFieldData[i] = int64(ts)
}
fields[common.TimeStampField] = &storage.Int64FieldData{
Data: tsFieldData,
NumRows: []int64{int64(rowNum)},
Data: tsFieldData,
}
if status, _ := node.dataCoord.UpdateSegmentStatistics(context.TODO(), &datapb.UpdateSegmentStatisticsRequest{

View File

@ -1157,54 +1157,42 @@ func genInsertData() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{11, 22},
Data: []int64{11, 22},
},
1: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
100: &s.FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{1.0, 6.0, 7.0, 8.0},
Dim: 2,
Data: []float32{1.0, 6.0, 7.0, 8.0},
Dim: 2,
},
101: &s.BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Dim: 32,
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Dim: 32,
},
102: &s.BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
103: &s.Int8FieldData{
NumRows: []int64{2},
Data: []int8{5, 6},
Data: []int8{5, 6},
},
104: &s.Int16FieldData{
NumRows: []int64{2},
Data: []int16{7, 8},
Data: []int16{7, 8},
},
105: &s.Int32FieldData{
NumRows: []int64{2},
Data: []int32{9, 10},
Data: []int32{9, 10},
},
106: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
107: &s.FloatFieldData{
NumRows: []int64{2},
Data: []float32{2.333, 2.334},
Data: []float32{2.333, 2.334},
},
108: &s.DoubleFieldData{
NumRows: []int64{2},
Data: []float64{3.333, 3.334},
Data: []float64{3.333, 3.334},
},
109: &s.StringFieldData{
NumRows: []int64{2},
Data: []string{"test1", "test2"},
Data: []string{"test1", "test2"},
},
}}
}
@ -1213,54 +1201,42 @@ func genEmptyInsertData() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
Data: []int64{},
},
1: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
Data: []int64{},
},
100: &s.FloatVectorFieldData{
NumRows: []int64{0},
Data: []float32{},
Dim: 2,
Data: []float32{},
Dim: 2,
},
101: &s.BinaryVectorFieldData{
NumRows: []int64{0},
Data: []byte{},
Dim: 32,
Data: []byte{},
Dim: 32,
},
102: &s.BoolFieldData{
NumRows: []int64{0},
Data: []bool{},
Data: []bool{},
},
103: &s.Int8FieldData{
NumRows: []int64{0},
Data: []int8{},
Data: []int8{},
},
104: &s.Int16FieldData{
NumRows: []int64{0},
Data: []int16{},
Data: []int16{},
},
105: &s.Int32FieldData{
NumRows: []int64{0},
Data: []int32{},
Data: []int32{},
},
106: &s.Int64FieldData{
NumRows: []int64{0},
Data: []int64{},
Data: []int64{},
},
107: &s.FloatFieldData{
NumRows: []int64{0},
Data: []float32{},
Data: []float32{},
},
108: &s.DoubleFieldData{
NumRows: []int64{0},
Data: []float64{},
Data: []float64{},
},
109: &s.StringFieldData{
NumRows: []int64{0},
Data: []string{},
Data: []string{},
},
}}
}
@ -1269,54 +1245,42 @@ func genInsertDataWithExpiredTS() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{11, 22},
Data: []int64{11, 22},
},
1: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC
Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC
},
100: &s.FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{1.0, 6.0, 7.0, 8.0},
Dim: 2,
Data: []float32{1.0, 6.0, 7.0, 8.0},
Dim: 2,
},
101: &s.BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Dim: 32,
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Dim: 32,
},
102: &s.BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
103: &s.Int8FieldData{
NumRows: []int64{2},
Data: []int8{5, 6},
Data: []int8{5, 6},
},
104: &s.Int16FieldData{
NumRows: []int64{2},
Data: []int16{7, 8},
Data: []int16{7, 8},
},
105: &s.Int32FieldData{
NumRows: []int64{2},
Data: []int32{9, 10},
Data: []int32{9, 10},
},
106: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
107: &s.FloatFieldData{
NumRows: []int64{2},
Data: []float32{2.333, 2.334},
Data: []float32{2.333, 2.334},
},
108: &s.DoubleFieldData{
NumRows: []int64{2},
Data: []float64{3.333, 3.334},
Data: []float64{3.333, 3.334},
},
109: &s.StringFieldData{
NumRows: []int64{2},
Data: []string{"test1", "test2"},
Data: []string{"test1", "test2"},
},
}}
}

View File

@ -183,17 +183,14 @@ func (c *mockChunkmgr) mockFieldData(numrows, dim int, collectionID, partitionID
}
vecs := randomFloats(numrows, dim)
idField := storage.Int64FieldData{
NumRows: []int64{},
Data: idList,
Data: idList,
}
tsField := storage.Int64FieldData{
NumRows: []int64{},
Data: tsList,
Data: tsList,
}
vecField := storage.FloatVectorFieldData{
NumRows: []int64{},
Data: vecs,
Dim: dim,
Data: vecs,
Dim: dim,
}
insertData := &storage.InsertData{
Data: map[int64]storage.FieldData{

View File

@ -830,70 +830,58 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I
// set data for rowID field
insertData.Data[rowIDFieldID] = &storage.Int64FieldData{
NumRows: []int64{int64(msgLength)},
Data: generateInt64Array(msgLength),
Data: generateInt64Array(msgLength),
}
// set data for ts field
insertData.Data[timestampFieldID] = &storage.Int64FieldData{
NumRows: []int64{int64(msgLength)},
Data: genTimestampFieldData(msgLength),
Data: genTimestampFieldData(msgLength),
}
for _, f := range schema.Fields {
switch f.DataType {
case schemapb.DataType_Bool:
insertData.Data[f.FieldID] = &storage.BoolFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateBoolArray(msgLength),
Data: generateBoolArray(msgLength),
}
case schemapb.DataType_Int8:
insertData.Data[f.FieldID] = &storage.Int8FieldData{
NumRows: []int64{int64(msgLength)},
Data: generateInt8Array(msgLength),
Data: generateInt8Array(msgLength),
}
case schemapb.DataType_Int16:
insertData.Data[f.FieldID] = &storage.Int16FieldData{
NumRows: []int64{int64(msgLength)},
Data: generateInt16Array(msgLength),
Data: generateInt16Array(msgLength),
}
case schemapb.DataType_Int32:
insertData.Data[f.FieldID] = &storage.Int32FieldData{
NumRows: []int64{int64(msgLength)},
Data: generateInt32Array(msgLength),
Data: generateInt32Array(msgLength),
}
case schemapb.DataType_Int64:
insertData.Data[f.FieldID] = &storage.Int64FieldData{
NumRows: []int64{int64(msgLength)},
Data: generateInt64Array(msgLength),
Data: generateInt64Array(msgLength),
}
case schemapb.DataType_Float:
insertData.Data[f.FieldID] = &storage.FloatFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateFloat32Array(msgLength),
Data: generateFloat32Array(msgLength),
}
case schemapb.DataType_Double:
insertData.Data[f.FieldID] = &storage.DoubleFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateFloat64Array(msgLength),
Data: generateFloat64Array(msgLength),
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
insertData.Data[f.FieldID] = &storage.StringFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateStringArray(msgLength),
Data: generateStringArray(msgLength),
}
case schemapb.DataType_FloatVector:
dim := simpleFloatVecField.dim // if no dim specified, use simpleFloatVecField's dim
insertData.Data[f.FieldID] = &storage.FloatVectorFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateFloatVectors(msgLength, dim),
Dim: dim,
Data: generateFloatVectors(msgLength, dim),
Dim: dim,
}
case schemapb.DataType_BinaryVector:
dim := simpleBinVecField.dim
insertData.Data[f.FieldID] = &storage.BinaryVectorFieldData{
NumRows: []int64{int64(msgLength)},
Data: generateBinaryVectors(msgLength, dim),
Dim: dim,
Data: generateBinaryVectors(msgLength, dim),
Dim: dim,
}
default:
err := errors.New("data type not supported")

View File

@ -88,12 +88,9 @@ type Segment struct {
lastMemSize int64
lastRowCount int64
recentlyModified *atomic.Bool
segmentType *atomic.Int32
destroyed *atomic.Bool
idBinlogRowSizes []int64
recentlyModified *atomic.Bool
segmentType *atomic.Int32
destroyed *atomic.Bool
indexedFieldInfos *typeutil.ConcurrentMap[UniqueID, *IndexedFieldInfo]
statLock sync.Mutex
@ -107,14 +104,6 @@ func (s *Segment) ID() UniqueID {
return s.segmentID
}
func (s *Segment) setIDBinlogRowSizes(sizes []int64) {
s.idBinlogRowSizes = sizes
}
func (s *Segment) getIDBinlogRowSizes() []int64 {
return s.idBinlogRowSizes
}
func (s *Segment) setRecentlyModified(modify bool) {
s.recentlyModified.Store(modify)
}
@ -386,12 +375,12 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro
func (s *Segment) getFieldDataPath(indexedFieldInfo *IndexedFieldInfo, offset int64) (dataPath string, offsetInBinlog int64) {
offsetInBinlog = offset
for index, idBinlogRowSize := range s.idBinlogRowSizes {
if offsetInBinlog < idBinlogRowSize {
dataPath = indexedFieldInfo.fieldBinlog.Binlogs[index].GetLogPath()
for _, binlog := range indexedFieldInfo.fieldBinlog.Binlogs {
if offsetInBinlog < binlog.EntriesNum {
dataPath = binlog.GetLogPath()
break
} else {
offsetInBinlog -= idBinlogRowSize
offsetInBinlog -= binlog.EntriesNum
}
}
return dataPath, offsetInBinlog

View File

@ -626,11 +626,6 @@ func (loader *segmentLoader) loadSealedSegments(segment *Segment, insertData *st
numRows := insertRecord.NumRows
for _, fieldData := range insertRecord.FieldsData {
fieldID := fieldData.FieldId
if fieldID == common.TimeStampField {
timestampsData := insertData.Data[fieldID].(*storage.Int64FieldData)
segment.setIDBinlogRowSizes(timestampsData.NumRows)
}
err := segment.segmentLoadFieldData(fieldID, numRows, fieldData)
if err != nil {
// TODO: return or continue?

View File

@ -591,14 +591,6 @@ func TestSegment_BasicMetrics(t *testing.T) {
)
assert.Nil(t, err)
t.Run("test id binlog row size", func(t *testing.T) {
size := int64(1024)
segment.setIDBinlogRowSizes([]int64{size})
sizes := segment.getIDBinlogRowSizes()
assert.Len(t, sizes, 1)
assert.Equal(t, size, sizes[0])
})
t.Run("test type", func(t *testing.T) {
sType := segmentTypeGrowing
segment.setType(sType)
@ -687,17 +679,17 @@ func Test_getFieldDataPath(t *testing.T) {
FieldID: 0,
Binlogs: []*datapb.Binlog{
{
LogPath: funcutil.GenRandomStr(),
EntriesNum: 10,
LogPath: funcutil.GenRandomStr(),
},
{
LogPath: funcutil.GenRandomStr(),
EntriesNum: 15,
LogPath: funcutil.GenRandomStr(),
},
},
},
}
s := &Segment{
idBinlogRowSizes: []int64{10, 15},
}
s := &Segment{}
path, offsetInBinlog := s.getFieldDataPath(indexedFieldInfo, 4)
assert.Equal(t, indexedFieldInfo.fieldBinlog.Binlogs[0].LogPath, path)

View File

@ -63,14 +63,12 @@ func generateTestData(t *testing.T, num int) []*Blob {
common.TimeStampField: &Int64FieldData{Data: field1},
101: &Int32FieldData{Data: field101},
102: &FloatVectorFieldData{
NumRows: []int64{int64(num)},
Data: field102,
Dim: 8,
Data: field102,
Dim: 8,
},
103: &BinaryVectorFieldData{
NumRows: []int64{int64(num)},
Data: field103,
Dim: 8,
Data: field103,
Dim: 8,
},
}}

View File

@ -110,46 +110,36 @@ type FieldData interface {
}
type BoolFieldData struct {
NumRows []int64
Data []bool
Data []bool
}
type Int8FieldData struct {
NumRows []int64
Data []int8
Data []int8
}
type Int16FieldData struct {
NumRows []int64
Data []int16
Data []int16
}
type Int32FieldData struct {
NumRows []int64
Data []int32
Data []int32
}
type Int64FieldData struct {
NumRows []int64
Data []int64
Data []int64
}
type FloatFieldData struct {
NumRows []int64
Data []float32
Data []float32
}
type DoubleFieldData struct {
NumRows []int64
Data []float64
Data []float64
}
type StringFieldData struct {
NumRows []int64
Data []string
Data []string
}
type BinaryVectorFieldData struct {
NumRows []int64
Data []byte
Dim int
Data []byte
Dim int
}
type FloatVectorFieldData struct {
NumRows []int64
Data []float32
Dim int
Data []float32
Dim int
}
// RowNum implements FieldData.RowNum
@ -187,47 +177,51 @@ func (data *FloatVectorFieldData) GetRow(i int) interface{} {
// GetMemorySize implements FieldData.GetMemorySize
func (data *BoolFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *Int8FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *Int16FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *Int32FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *Int64FieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
func (data *FloatFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
func (data *DoubleFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
return binary.Size(data.Data)
}
func (data *StringFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data)
var size int
for _, val := range data.Data {
size += len(val) + 16
}
return size
}
func (data *BinaryVectorFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
return binary.Size(data.Data) + 4
}
func (data *FloatVectorFieldData) GetMemorySize() int {
return binary.Size(data.NumRows) + binary.Size(data.Data) + binary.Size(data.Dim)
return binary.Size(data.Data) + 4
}
// system filed id:
@ -511,15 +505,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &BoolFieldData{
NumRows: make([]int64, 0),
Data: make([]bool, 0, rowNum),
Data: make([]bool, 0, rowNum),
}
}
boolFieldData := insertData.Data[fieldID].(*BoolFieldData)
boolFieldData.Data = append(boolFieldData.Data, singleData...)
totalLength += len(singleData)
boolFieldData.NumRows = append(boolFieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = boolFieldData
case schemapb.DataType_Int8:
@ -532,15 +524,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int8FieldData{
NumRows: make([]int64, 0),
Data: make([]int8, 0, rowNum),
Data: make([]int8, 0, rowNum),
}
}
int8FieldData := insertData.Data[fieldID].(*Int8FieldData)
int8FieldData.Data = append(int8FieldData.Data, singleData...)
totalLength += len(singleData)
int8FieldData.NumRows = append(int8FieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = int8FieldData
case schemapb.DataType_Int16:
@ -553,15 +543,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int16FieldData{
NumRows: make([]int64, 0),
Data: make([]int16, 0, rowNum),
Data: make([]int16, 0, rowNum),
}
}
int16FieldData := insertData.Data[fieldID].(*Int16FieldData)
int16FieldData.Data = append(int16FieldData.Data, singleData...)
totalLength += len(singleData)
int16FieldData.NumRows = append(int16FieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = int16FieldData
case schemapb.DataType_Int32:
@ -574,15 +562,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int32FieldData{
NumRows: make([]int64, 0),
Data: make([]int32, 0, rowNum),
Data: make([]int32, 0, rowNum),
}
}
int32FieldData := insertData.Data[fieldID].(*Int32FieldData)
int32FieldData.Data = append(int32FieldData.Data, singleData...)
totalLength += len(singleData)
int32FieldData.NumRows = append(int32FieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = int32FieldData
case schemapb.DataType_Int64:
@ -595,15 +581,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &Int64FieldData{
NumRows: make([]int64, 0),
Data: make([]int64, 0, rowNum),
Data: make([]int64, 0, rowNum),
}
}
int64FieldData := insertData.Data[fieldID].(*Int64FieldData)
int64FieldData.Data = append(int64FieldData.Data, singleData...)
totalLength += len(singleData)
int64FieldData.NumRows = append(int64FieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = int64FieldData
case schemapb.DataType_Float:
@ -616,15 +600,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &FloatFieldData{
NumRows: make([]int64, 0),
Data: make([]float32, 0, rowNum),
Data: make([]float32, 0, rowNum),
}
}
floatFieldData := insertData.Data[fieldID].(*FloatFieldData)
floatFieldData.Data = append(floatFieldData.Data, singleData...)
totalLength += len(singleData)
floatFieldData.NumRows = append(floatFieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = floatFieldData
case schemapb.DataType_Double:
@ -637,15 +619,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &DoubleFieldData{
NumRows: make([]int64, 0),
Data: make([]float64, 0, rowNum),
Data: make([]float64, 0, rowNum),
}
}
doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData)
doubleFieldData.Data = append(doubleFieldData.Data, singleData...)
totalLength += len(singleData)
doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(len(singleData)))
insertData.Data[fieldID] = doubleFieldData
case schemapb.DataType_String, schemapb.DataType_VarChar:
@ -658,15 +638,13 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &StringFieldData{
NumRows: make([]int64, 0),
Data: make([]string, 0, rowNum),
Data: make([]string, 0, rowNum),
}
}
stringFieldData := insertData.Data[fieldID].(*StringFieldData)
stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
totalLength += len(stringPayload)
stringFieldData.NumRows = append(stringFieldData.NumRows, int64(len(stringPayload)))
insertData.Data[fieldID] = stringFieldData
case schemapb.DataType_BinaryVector:
@ -680,8 +658,7 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &BinaryVectorFieldData{
NumRows: make([]int64, 0),
Data: make([]byte, 0, rowNum*dim),
Data: make([]byte, 0, rowNum*dim),
}
}
binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData)
@ -694,7 +671,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length))
binaryVectorFieldData.Dim = dim
insertData.Data[fieldID] = binaryVectorFieldData
@ -709,8 +685,7 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
if insertData.Data[fieldID] == nil {
insertData.Data[fieldID] = &FloatVectorFieldData{
NumRows: make([]int64, 0),
Data: make([]float32, 0, rowNum*dim),
Data: make([]float32, 0, rowNum*dim),
}
}
floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData)
@ -723,7 +698,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err
}
totalLength += length
floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length))
floatVectorFieldData.Dim = dim
insertData.Data[fieldID] = floatVectorFieldData

View File

@ -21,11 +21,12 @@ import (
"fmt"
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
const (
@ -148,54 +149,42 @@ func TestInsertCodec(t *testing.T) {
insertData1 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
TimestampField: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
BoolField: &BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
Int8Field: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{3, 4},
Data: []int8{3, 4},
},
Int16Field: &Int16FieldData{
NumRows: []int64{2},
Data: []int16{3, 4},
Data: []int16{3, 4},
},
Int32Field: &Int32FieldData{
NumRows: []int64{2},
Data: []int32{3, 4},
Data: []int32{3, 4},
},
Int64Field: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
FloatField: &FloatFieldData{
NumRows: []int64{2},
Data: []float32{3, 4},
Data: []float32{3, 4},
},
DoubleField: &DoubleFieldData{
NumRows: []int64{2},
Data: []float64{3, 4},
Data: []float64{3, 4},
},
StringField: &StringFieldData{
NumRows: []int64{2},
Data: []string{"3", "4"},
Data: []string{"3", "4"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
Data: []byte{0, 255},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
Dim: 4,
Data: []float32{4, 5, 6, 7, 4, 5, 6, 7},
Dim: 4,
},
},
}
@ -203,72 +192,60 @@ func TestInsertCodec(t *testing.T) {
insertData2 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
TimestampField: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
BoolField: &BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
Int8Field: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{1, 2},
Data: []int8{1, 2},
},
Int16Field: &Int16FieldData{
NumRows: []int64{2},
Data: []int16{1, 2},
Data: []int16{1, 2},
},
Int32Field: &Int32FieldData{
NumRows: []int64{2},
Data: []int32{1, 2},
Data: []int32{1, 2},
},
Int64Field: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
FloatField: &FloatFieldData{
NumRows: []int64{2},
Data: []float32{1, 2},
Data: []float32{1, 2},
},
DoubleField: &DoubleFieldData{
NumRows: []int64{2},
Data: []float64{1, 2},
Data: []float64{1, 2},
},
StringField: &StringFieldData{
NumRows: []int64{2},
Data: []string{"1", "2"},
Data: []string{"1", "2"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
Data: []byte{0, 255},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
},
}
insertDataEmpty := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{[]int64{}, []int64{}},
TimestampField: &Int64FieldData{[]int64{}, []int64{}},
BoolField: &BoolFieldData{[]int64{}, []bool{}},
Int8Field: &Int8FieldData{[]int64{}, []int8{}},
Int16Field: &Int16FieldData{[]int64{}, []int16{}},
Int32Field: &Int32FieldData{[]int64{}, []int32{}},
Int64Field: &Int64FieldData{[]int64{}, []int64{}},
FloatField: &FloatFieldData{[]int64{}, []float32{}},
DoubleField: &DoubleFieldData{[]int64{}, []float64{}},
StringField: &StringFieldData{[]int64{}, []string{}},
BinaryVectorField: &BinaryVectorFieldData{[]int64{}, []byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]int64{}, []float32{}, 4},
RowIDField: &Int64FieldData{[]int64{}},
TimestampField: &Int64FieldData{[]int64{}},
BoolField: &BoolFieldData{[]bool{}},
Int8Field: &Int8FieldData{[]int8{}},
Int16Field: &Int16FieldData{[]int16{}},
Int32Field: &Int32FieldData{[]int32{}},
Int64Field: &Int64FieldData{[]int64{}},
FloatField: &FloatFieldData{[]float32{}},
DoubleField: &DoubleFieldData{[]float64{}},
StringField: &StringFieldData{[]string{}},
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
},
}
b, s, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
@ -294,18 +271,6 @@ func TestInsertCodec(t *testing.T) {
assert.Equal(t, UniqueID(CollectionID), collID)
assert.Equal(t, UniqueID(PartitionID), partID)
assert.Equal(t, UniqueID(SegmentID), segID)
assert.Equal(t, []int64{2, 2}, resultData.Data[RowIDField].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[TimestampField].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[BoolField].(*BoolFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int8Field].(*Int8FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int16Field].(*Int16FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int32Field].(*Int32FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[Int64Field].(*Int64FieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatField].(*FloatFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[DoubleField].(*DoubleFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[StringField].(*StringFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[BinaryVectorField].(*BinaryVectorFieldData).NumRows)
assert.Equal(t, []int64{2, 2}, resultData.Data[FloatVectorField].(*FloatVectorFieldData).NumRows)
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[RowIDField].(*Int64FieldData).Data)
assert.Equal(t, []int64{1, 2, 3, 4}, resultData.Data[TimestampField].(*Int64FieldData).Data)
assert.Equal(t, []bool{true, false, true, false}, resultData.Data[BoolField].(*BoolFieldData).Data)
@ -459,59 +424,146 @@ func TestTsError(t *testing.T) {
assert.NotNil(t, err)
}
//func TestSchemaError(t *testing.T) {
// schema := &etcdpb.CollectionMeta{
// ID: CollectionID,
// CreateTime: 1,
// SegmentIDs: []int64{SegmentID},
// PartitionTags: []string{"partition_0", "partition_1"},
// Schema: &schemapb.CollectionSchema{
// Name: "schema",
// Description: "schema",
// AutoID: true,
// Fields: []*schemapb.FieldSchema{
// {
// FieldID: RowIDField,
// Name: "row_id",
// IsPrimaryKey: false,
// Description: "row_id",
// DataType: schemapb.DataType_Int64,
// },
// {
// FieldID: TimestampField,
// Name: "Timestamp",
// IsPrimaryKey: false,
// Description: "Timestamp",
// DataType: schemapb.DataType_Int64,
// },
// {
// FieldID: BoolField,
// Name: "field_bool",
// IsPrimaryKey: false,
// Description: "bool",
// DataType: 999,
// },
// },
// },
// }
// insertData := &InsertData{
// Data: map[int64]FieldData{
// RowIDField: &Int64FieldData{
// NumRows: []int64{2},
// Data: []int64{3, 4},
// },
// TimestampField: &Int64FieldData{
// NumRows: []int64{2},
// Data: []int64{3, 4},
// },
// BoolField: &BoolFieldData{
// NumRows: []int64{2},
// Data: []bool{true, false},
// },
// },
// }
// insertCodec := NewInsertCodec(schema)
// blobs, _, err := insertCodec.Serialize(PartitionID, SegmentID, insertData)
// assert.Nil(t, blobs)
// assert.NotNil(t, err)
//}
func TestMemorySize(t *testing.T) {
insertData1 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
Data: []int64{3},
},
TimestampField: &Int64FieldData{
Data: []int64{3},
},
BoolField: &BoolFieldData{
Data: []bool{true},
},
Int8Field: &Int8FieldData{
Data: []int8{3},
},
Int16Field: &Int16FieldData{
Data: []int16{3},
},
Int32Field: &Int32FieldData{
Data: []int32{3},
},
Int64Field: &Int64FieldData{
Data: []int64{3},
},
FloatField: &FloatFieldData{
Data: []float32{3},
},
DoubleField: &DoubleFieldData{
Data: []float64{3},
},
StringField: &StringFieldData{
Data: []string{"3"},
},
BinaryVectorField: &BinaryVectorFieldData{
Data: []byte{0},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
Data: []float32{4, 5, 6, 7},
Dim: 4,
},
},
}
assert.Equal(t, insertData1.Data[RowIDField].GetMemorySize(), 8)
assert.Equal(t, insertData1.Data[TimestampField].GetMemorySize(), 8)
assert.Equal(t, insertData1.Data[BoolField].GetMemorySize(), 1)
assert.Equal(t, insertData1.Data[Int8Field].GetMemorySize(), 1)
assert.Equal(t, insertData1.Data[Int16Field].GetMemorySize(), 2)
assert.Equal(t, insertData1.Data[Int32Field].GetMemorySize(), 4)
assert.Equal(t, insertData1.Data[Int64Field].GetMemorySize(), 8)
assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4)
assert.Equal(t, insertData1.Data[DoubleField].GetMemorySize(), 8)
assert.Equal(t, insertData1.Data[StringField].GetMemorySize(), 17)
assert.Equal(t, insertData1.Data[BinaryVectorField].GetMemorySize(), 5)
assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 4)
insertData2 := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{
Data: []int64{1, 2},
},
TimestampField: &Int64FieldData{
Data: []int64{1, 2},
},
BoolField: &BoolFieldData{
Data: []bool{true, false},
},
Int8Field: &Int8FieldData{
Data: []int8{1, 2},
},
Int16Field: &Int16FieldData{
Data: []int16{1, 2},
},
Int32Field: &Int32FieldData{
Data: []int32{1, 2},
},
Int64Field: &Int64FieldData{
Data: []int64{1, 2},
},
FloatField: &FloatFieldData{
Data: []float32{1, 2},
},
DoubleField: &DoubleFieldData{
Data: []float64{1, 2},
},
StringField: &StringFieldData{
Data: []string{"1", "23"},
},
BinaryVectorField: &BinaryVectorFieldData{
Data: []byte{0, 255},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
},
}
assert.Equal(t, insertData2.Data[RowIDField].GetMemorySize(), 16)
assert.Equal(t, insertData2.Data[TimestampField].GetMemorySize(), 16)
assert.Equal(t, insertData2.Data[BoolField].GetMemorySize(), 2)
assert.Equal(t, insertData2.Data[Int8Field].GetMemorySize(), 2)
assert.Equal(t, insertData2.Data[Int16Field].GetMemorySize(), 4)
assert.Equal(t, insertData2.Data[Int32Field].GetMemorySize(), 8)
assert.Equal(t, insertData2.Data[Int64Field].GetMemorySize(), 16)
assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8)
assert.Equal(t, insertData2.Data[DoubleField].GetMemorySize(), 16)
assert.Equal(t, insertData2.Data[StringField].GetMemorySize(), 35)
assert.Equal(t, insertData2.Data[BinaryVectorField].GetMemorySize(), 6)
assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 8)
insertDataEmpty := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{[]int64{}},
TimestampField: &Int64FieldData{[]int64{}},
BoolField: &BoolFieldData{[]bool{}},
Int8Field: &Int8FieldData{[]int8{}},
Int16Field: &Int16FieldData{[]int16{}},
Int32Field: &Int32FieldData{[]int32{}},
Int64Field: &Int64FieldData{[]int64{}},
FloatField: &FloatFieldData{[]float32{}},
DoubleField: &DoubleFieldData{[]float64{}},
StringField: &StringFieldData{[]string{}},
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
},
}
assert.Equal(t, insertDataEmpty.Data[RowIDField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[TimestampField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[BoolField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[Int8Field].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[Int16Field].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[Int32Field].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[Int64Field].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[FloatField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[DoubleField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[StringField].GetMemorySize(), 0)
assert.Equal(t, insertDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
}

View File

@ -128,54 +128,42 @@ func TestDataSorter(t *testing.T) {
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: []int64{3},
Data: []int64{3, 4, 2},
Data: []int64{3, 4, 2},
},
1: &Int64FieldData{
NumRows: []int64{3},
Data: []int64{3, 4, 5},
Data: []int64{3, 4, 5},
},
100: &BoolFieldData{
NumRows: []int64{3},
Data: []bool{true, false, true},
Data: []bool{true, false, true},
},
101: &Int8FieldData{
NumRows: []int64{3},
Data: []int8{3, 4, 5},
Data: []int8{3, 4, 5},
},
102: &Int16FieldData{
NumRows: []int64{3},
Data: []int16{3, 4, 5},
Data: []int16{3, 4, 5},
},
103: &Int32FieldData{
NumRows: []int64{3},
Data: []int32{3, 4, 5},
Data: []int32{3, 4, 5},
},
104: &Int64FieldData{
NumRows: []int64{3},
Data: []int64{3, 4, 5},
Data: []int64{3, 4, 5},
},
105: &FloatFieldData{
NumRows: []int64{3},
Data: []float32{3, 4, 5},
Data: []float32{3, 4, 5},
},
106: &DoubleFieldData{
NumRows: []int64{3},
Data: []float64{3, 4, 5},
Data: []float64{3, 4, 5},
},
107: &StringFieldData{
NumRows: []int64{2},
Data: []string{"3", "4", "5"},
Data: []string{"3", "4", "5"},
},
108: &BinaryVectorFieldData{
NumRows: []int64{3},
Data: []byte{0, 255, 128},
Dim: 8,
Data: []byte{0, 255, 128},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: []int64{3},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
Dim: 8,
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23},
Dim: 8,
},
},
}
@ -244,8 +232,7 @@ func TestDataSorter_Len(t *testing.T) {
insertData := &InsertData{
Data: map[int64]FieldData{
1: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{6, 4},
Data: []int64{6, 4},
},
},
}
@ -261,8 +248,7 @@ func TestDataSorter_Len(t *testing.T) {
insertData = &InsertData{
Data: map[int64]FieldData{
0: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{3, 4},
Data: []int8{3, 4},
},
},
}
@ -280,8 +266,7 @@ func TestDataSorter_Less(t *testing.T) {
insertData := &InsertData{
Data: map[int64]FieldData{
1: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{6, 4},
Data: []int64{6, 4},
},
},
}
@ -297,8 +282,7 @@ func TestDataSorter_Less(t *testing.T) {
insertData = &InsertData{
Data: map[int64]FieldData{
0: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{3, 4},
Data: []int8{3, 4},
},
},
}
@ -314,8 +298,7 @@ func TestDataSorter_Less(t *testing.T) {
insertData = &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{6, 4},
Data: []int64{6, 4},
},
},
}

View File

@ -185,54 +185,42 @@ func TestPrintBinlogFiles(t *testing.T) {
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
1: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
100: &BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
101: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{3, 4},
Data: []int8{3, 4},
},
102: &Int16FieldData{
NumRows: []int64{2},
Data: []int16{3, 4},
Data: []int16{3, 4},
},
103: &Int32FieldData{
NumRows: []int64{2},
Data: []int32{3, 4},
Data: []int32{3, 4},
},
104: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
105: &FloatFieldData{
NumRows: []int64{2},
Data: []float32{3, 4},
Data: []float32{3, 4},
},
106: &DoubleFieldData{
NumRows: []int64{2},
Data: []float64{3, 4},
Data: []float64{3, 4},
},
107: &StringFieldData{
NumRows: []int64{2},
Data: []string{"3", "4"},
Data: []string{"3", "4"},
},
108: &BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
},
},
}
@ -240,54 +228,42 @@ func TestPrintBinlogFiles(t *testing.T) {
insertDataSecond := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
1: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
100: &BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
Data: []bool{true, false},
},
101: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{1, 2},
Data: []int8{1, 2},
},
102: &Int16FieldData{
NumRows: []int64{2},
Data: []int16{1, 2},
Data: []int16{1, 2},
},
103: &Int32FieldData{
NumRows: []int64{2},
Data: []int32{1, 2},
Data: []int32{1, 2},
},
104: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
Data: []int64{1, 2},
},
105: &FloatFieldData{
NumRows: []int64{2},
Data: []float32{1, 2},
Data: []float32{1, 2},
},
106: &DoubleFieldData{
NumRows: []int64{2},
Data: []float64{1, 2},
Data: []float64{1, 2},
},
107: &StringFieldData{
NumRows: []int64{2},
Data: []string{"1", "2"},
Data: []string{"1", "2"},
},
108: &BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 8,
},
},
}

View File

@ -315,9 +315,8 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
vecs := readFloatVectors(blobReaders, dim)
idata.Data[field.FieldID] = &FloatVectorFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: vecs,
Dim: dim,
Data: vecs,
Dim: dim,
}
case schemapb.DataType_BinaryVector:
@ -330,39 +329,33 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
vecs := readBinaryVectors(blobReaders, dim)
idata.Data[field.FieldID] = &BinaryVectorFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: vecs,
Dim: dim,
Data: vecs,
Dim: dim,
}
case schemapb.DataType_Bool:
idata.Data[field.FieldID] = &BoolFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readBoolArray(blobReaders),
Data: readBoolArray(blobReaders),
}
case schemapb.DataType_Int8:
idata.Data[field.FieldID] = &Int8FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readInt8Array(blobReaders),
Data: readInt8Array(blobReaders),
}
case schemapb.DataType_Int16:
idata.Data[field.FieldID] = &Int16FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readInt16Array(blobReaders),
Data: readInt16Array(blobReaders),
}
case schemapb.DataType_Int32:
idata.Data[field.FieldID] = &Int32FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readInt32Array(blobReaders),
Data: readInt32Array(blobReaders),
}
case schemapb.DataType_Int64:
idata.Data[field.FieldID] = &Int64FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: nil,
Data: nil,
}
fieldData := idata.Data[field.FieldID].(*Int64FieldData)
@ -379,14 +372,12 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
case schemapb.DataType_Float:
idata.Data[field.FieldID] = &FloatFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readFloatArray(blobReaders),
Data: readFloatArray(blobReaders),
}
case schemapb.DataType_Double:
idata.Data[field.FieldID] = &DoubleFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: readDoubleArray(blobReaders),
Data: readDoubleArray(blobReaders),
}
}
}
@ -418,9 +409,8 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetVectors().GetFloatVector().GetData()
fieldData := &FloatVectorFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]float32, 0, len(srcData)),
Dim: dim,
Data: make([]float32, 0, len(srcData)),
Dim: dim,
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -436,9 +426,8 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetVectors().GetBinaryVector()
fieldData := &BinaryVectorFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]byte, 0, len(srcData)),
Dim: dim,
Data: make([]byte, 0, len(srcData)),
Dim: dim,
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -448,8 +437,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetBoolData().GetData()
fieldData := &BoolFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]bool, 0, len(srcData)),
Data: make([]bool, 0, len(srcData)),
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -459,8 +447,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
fieldData := &Int8FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]int8, 0, len(srcData)),
Data: make([]int8, 0, len(srcData)),
}
int8SrcData := make([]int8, len(srcData))
for i := 0; i < len(srcData); i++ {
@ -474,8 +461,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
fieldData := &Int16FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]int16, 0, len(srcData)),
Data: make([]int16, 0, len(srcData)),
}
int16SrcData := make([]int16, len(srcData))
for i := 0; i < len(srcData); i++ {
@ -489,8 +475,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetIntData().GetData()
fieldData := &Int32FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]int32, 0, len(srcData)),
Data: make([]int32, 0, len(srcData)),
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -498,8 +483,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
case schemapb.DataType_Int64:
fieldData := &Int64FieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]int64, 0),
Data: make([]int64, 0),
}
switch field.FieldID {
@ -523,8 +507,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetFloatData().GetData()
fieldData := &FloatFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]float32, 0, len(srcData)),
Data: make([]float32, 0, len(srcData)),
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -534,8 +517,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetDoubleData().GetData()
fieldData := &DoubleFieldData{
NumRows: []int64{int64(msg.NRows())},
Data: make([]float64, 0, len(srcData)),
Data: make([]float64, 0, len(srcData)),
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -544,8 +526,7 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
srcData := srcFields[field.FieldID].GetScalars().GetStringData().GetData()
fieldData := &StringFieldData{
NumRows: []int64{int64(msg.NumRows)},
Data: make([]string, 0, len(srcData)),
Data: make([]string, 0, len(srcData)),
}
fieldData.Data = append(fieldData.Data, srcData...)
@ -566,133 +547,113 @@ func InsertMsgToInsertData(msg *msgstream.InsertMsg, schema *schemapb.Collection
func mergeBoolField(data *InsertData, fid FieldID, field *BoolFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &BoolFieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*BoolFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeInt8Field(data *InsertData, fid FieldID, field *Int8FieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &Int8FieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*Int8FieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeInt16Field(data *InsertData, fid FieldID, field *Int16FieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &Int16FieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*Int16FieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeInt32Field(data *InsertData, fid FieldID, field *Int32FieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &Int32FieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*Int32FieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeInt64Field(data *InsertData, fid FieldID, field *Int64FieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &Int64FieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*Int64FieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeFloatField(data *InsertData, fid FieldID, field *FloatFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &FloatFieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*FloatFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeDoubleField(data *InsertData, fid FieldID, field *DoubleFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &DoubleFieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*DoubleFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeStringField(data *InsertData, fid FieldID, field *StringFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &StringFieldData{
NumRows: []int64{0},
Data: nil,
Data: nil,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*StringFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeBinaryVectorField(data *InsertData, fid FieldID, field *BinaryVectorFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &BinaryVectorFieldData{
NumRows: []int64{0},
Data: nil,
Dim: field.Dim,
Data: nil,
Dim: field.Dim,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*BinaryVectorFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
func mergeFloatVectorField(data *InsertData, fid FieldID, field *FloatVectorFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &FloatVectorFieldData{
NumRows: []int64{0},
Data: nil,
Dim: field.Dim,
Data: nil,
Dim: field.Dim,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*FloatVectorFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
fieldData.NumRows[0] += int64(field.RowNum())
}
// MergeFieldData merge field into data.

View File

@ -68,16 +68,13 @@ func TestCheckNumRows(t *testing.T) {
assert.True(t, checkNumRows())
f1 := &Int64FieldData{
NumRows: nil,
Data: []int64{1, 2, 3},
Data: []int64{1, 2, 3},
}
f2 := &Int64FieldData{
NumRows: nil,
Data: []int64{1, 2, 3},
Data: []int64{1, 2, 3},
}
f3 := &Int64FieldData{
NumRows: nil,
Data: []int64{1, 2, 3, 4},
Data: []int64{1, 2, 3, 4},
}
assert.True(t, checkNumRows(f1, f2))
@ -88,16 +85,13 @@ func TestCheckNumRows(t *testing.T) {
func TestSortFieldDataList(t *testing.T) {
f1 := &Int16FieldData{
NumRows: nil,
Data: []int16{1, 2, 3},
Data: []int16{1, 2, 3},
}
f2 := &Int32FieldData{
NumRows: nil,
Data: []int32{4, 5, 6},
Data: []int32{4, 5, 6},
}
f3 := &Int64FieldData{
NumRows: nil,
Data: []int64{7, 8, 9},
Data: []int64{7, 8, 9},
}
ls := fieldDataList{
@ -840,54 +834,42 @@ func TestMergeInsertData(t *testing.T) {
d1 := &InsertData{
Data: map[int64]FieldData{
common.RowIDField: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{1},
Data: []int64{1},
},
common.TimeStampField: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{1},
Data: []int64{1},
},
BoolField: &BoolFieldData{
NumRows: []int64{1},
Data: []bool{true},
Data: []bool{true},
},
Int8Field: &Int8FieldData{
NumRows: []int64{1},
Data: []int8{1},
Data: []int8{1},
},
Int16Field: &Int16FieldData{
NumRows: []int64{1},
Data: []int16{1},
Data: []int16{1},
},
Int32Field: &Int32FieldData{
NumRows: []int64{1},
Data: []int32{1},
Data: []int32{1},
},
Int64Field: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{1},
Data: []int64{1},
},
FloatField: &FloatFieldData{
NumRows: []int64{1},
Data: []float32{0},
Data: []float32{0},
},
DoubleField: &DoubleFieldData{
NumRows: []int64{1},
Data: []float64{0},
Data: []float64{0},
},
StringField: &StringFieldData{
NumRows: []int64{1},
Data: []string{"1"},
Data: []string{"1"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: []int64{1},
Data: []byte{0},
Dim: 8,
Data: []byte{0},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: []int64{1},
Data: []float32{0},
Dim: 1,
Data: []float32{0},
Dim: 1,
},
},
Infos: nil,
@ -895,54 +877,42 @@ func TestMergeInsertData(t *testing.T) {
d2 := &InsertData{
Data: map[int64]FieldData{
common.RowIDField: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{2},
Data: []int64{2},
},
common.TimeStampField: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{2},
Data: []int64{2},
},
BoolField: &BoolFieldData{
NumRows: []int64{1},
Data: []bool{false},
Data: []bool{false},
},
Int8Field: &Int8FieldData{
NumRows: []int64{1},
Data: []int8{2},
Data: []int8{2},
},
Int16Field: &Int16FieldData{
NumRows: []int64{1},
Data: []int16{2},
Data: []int16{2},
},
Int32Field: &Int32FieldData{
NumRows: []int64{1},
Data: []int32{2},
Data: []int32{2},
},
Int64Field: &Int64FieldData{
NumRows: []int64{1},
Data: []int64{2},
Data: []int64{2},
},
FloatField: &FloatFieldData{
NumRows: []int64{1},
Data: []float32{0},
Data: []float32{0},
},
DoubleField: &DoubleFieldData{
NumRows: []int64{1},
Data: []float64{0},
Data: []float64{0},
},
StringField: &StringFieldData{
NumRows: []int64{1},
Data: []string{"2"},
Data: []string{"2"},
},
BinaryVectorField: &BinaryVectorFieldData{
NumRows: []int64{1},
Data: []byte{0},
Dim: 8,
Data: []byte{0},
Dim: 8,
},
FloatVectorField: &FloatVectorFieldData{
NumRows: []int64{1},
Data: []float32{0},
Dim: 1,
Data: []float32{0},
Dim: 1,
},
},
Infos: nil,
@ -952,62 +922,50 @@ func TestMergeInsertData(t *testing.T) {
f, ok := merged.Data[common.RowIDField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
f, ok = merged.Data[common.TimeStampField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
f, ok = merged.Data[BoolField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*BoolFieldData).NumRows)
assert.Equal(t, []bool{true, false}, f.(*BoolFieldData).Data)
f, ok = merged.Data[Int8Field]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int8FieldData).NumRows)
assert.Equal(t, []int8{1, 2}, f.(*Int8FieldData).Data)
f, ok = merged.Data[Int16Field]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int16FieldData).NumRows)
assert.Equal(t, []int16{1, 2}, f.(*Int16FieldData).Data)
f, ok = merged.Data[Int32Field]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int32FieldData).NumRows)
assert.Equal(t, []int32{1, 2}, f.(*Int32FieldData).Data)
f, ok = merged.Data[Int64Field]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*Int64FieldData).NumRows)
assert.Equal(t, []int64{1, 2}, f.(*Int64FieldData).Data)
f, ok = merged.Data[FloatField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*FloatFieldData).NumRows)
assert.Equal(t, []float32{0, 0}, f.(*FloatFieldData).Data)
f, ok = merged.Data[DoubleField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*DoubleFieldData).NumRows)
assert.Equal(t, []float64{0, 0}, f.(*DoubleFieldData).Data)
f, ok = merged.Data[StringField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*StringFieldData).NumRows)
assert.Equal(t, []string{"1", "2"}, f.(*StringFieldData).Data)
f, ok = merged.Data[BinaryVectorField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*BinaryVectorFieldData).NumRows)
assert.Equal(t, []byte{0, 0}, f.(*BinaryVectorFieldData).Data)
f, ok = merged.Data[FloatVectorField]
assert.True(t, ok)
assert.Equal(t, []int64{2}, f.(*FloatVectorFieldData).NumRows)
assert.Equal(t, []float32{0, 0}, f.(*FloatVectorFieldData).Data)
}

View File

@ -88,26 +88,21 @@ func initBinlogFile(schema *etcdpb.CollectionMeta) []*Blob {
insertData := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
1: &Int64FieldData{
NumRows: []int64{2},
Data: []int64{3, 4},
Data: []int64{3, 4},
},
101: &Int8FieldData{
NumRows: []int64{2},
Data: []int8{3, 4},
Data: []int8{3, 4},
},
108: &BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255},
Dim: 8,
Data: []byte{0, 255},
Dim: 8,
},
109: &FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666},
Dim: 8,
Data: []float32{0, 1, 2, 3, 4, 5, 6, 7, 0, 111, 222, 333, 444, 555, 777, 666},
Dim: 8,
},
},
}

View File

@ -557,7 +557,6 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
// append the entity to primary key's FieldData
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, key)
field.(*storage.Int64FieldData).NumRows[0]++
shardList = append(shardList, int32(shardID))
}
@ -609,7 +608,6 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
// append the entity to primary key's FieldData
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, key)
field.(*storage.StringFieldData).NumRows[0]++
shardList = append(shardList, int32(shardID))
}
@ -773,7 +771,6 @@ func (p *BinlogAdapter) dispatchBoolToShards(data []bool, memoryData []map[stora
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, val)
field.(*storage.BoolFieldData).NumRows[0]++
}
return nil
@ -797,7 +794,6 @@ func (p *BinlogAdapter) dispatchInt8ToShards(data []int8, memoryData []map[stora
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, val)
field.(*storage.Int8FieldData).NumRows[0]++
}
return nil
@ -821,7 +817,6 @@ func (p *BinlogAdapter) dispatchInt16ToShards(data []int16, memoryData []map[sto
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, val)
field.(*storage.Int16FieldData).NumRows[0]++
}
return nil
@ -845,7 +840,6 @@ func (p *BinlogAdapter) dispatchInt32ToShards(data []int32, memoryData []map[sto
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, val)
field.(*storage.Int32FieldData).NumRows[0]++
}
return nil
@ -869,7 +863,6 @@ func (p *BinlogAdapter) dispatchInt64ToShards(data []int64, memoryData []map[sto
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, val)
field.(*storage.Int64FieldData).NumRows[0]++
}
return nil
@ -893,7 +886,6 @@ func (p *BinlogAdapter) dispatchFloatToShards(data []float32, memoryData []map[s
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, val)
field.(*storage.FloatFieldData).NumRows[0]++
}
return nil
@ -917,7 +909,6 @@ func (p *BinlogAdapter) dispatchDoubleToShards(data []float64, memoryData []map[
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, val)
field.(*storage.DoubleFieldData).NumRows[0]++
}
return nil
@ -941,7 +932,6 @@ func (p *BinlogAdapter) dispatchVarcharToShards(data []string, memoryData []map[
fields := memoryData[shardID] // initSegmentData() can ensure the existence, no need to check bound here
field := fields[fieldID] // initSegmentData() can ensure the existence, no need to check existence here
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, val)
field.(*storage.StringFieldData).NumRows[0]++
}
return nil
@ -983,7 +973,6 @@ func (p *BinlogAdapter) dispatchBinaryVecToShards(data []byte, dim int, memoryDa
binVecField.Data = append(binVecField.Data, val)
}
binVecField.NumRows[0]++
}
return nil
@ -1023,7 +1012,6 @@ func (p *BinlogAdapter) dispatchFloatVecToShards(data []float32, dim int, memory
val := data[dim*i+j]
floatVecField.Data = append(floatVecField.Data, val)
}
floatVecField.NumRows[0]++
}
return nil

View File

@ -54,8 +54,7 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
// rowID field is a hidden field with fieldID=0, it is always auto-generated by IDAllocator
// if primary key is int64 and autoID=true, primary key field is equal to rowID field
segmentData[common.RowIDField] = &storage.Int64FieldData{
Data: make([]int64, 0),
NumRows: []int64{0},
Data: make([]int64, 0),
}
for i := 0; i < len(collectionSchema.Fields); i++ {
@ -63,57 +62,47 @@ func initSegmentData(collectionSchema *schemapb.CollectionSchema) map[storage.Fi
switch schema.DataType {
case schemapb.DataType_Bool:
segmentData[schema.GetFieldID()] = &storage.BoolFieldData{
Data: make([]bool, 0),
NumRows: []int64{0},
Data: make([]bool, 0),
}
case schemapb.DataType_Float:
segmentData[schema.GetFieldID()] = &storage.FloatFieldData{
Data: make([]float32, 0),
NumRows: []int64{0},
Data: make([]float32, 0),
}
case schemapb.DataType_Double:
segmentData[schema.GetFieldID()] = &storage.DoubleFieldData{
Data: make([]float64, 0),
NumRows: []int64{0},
Data: make([]float64, 0),
}
case schemapb.DataType_Int8:
segmentData[schema.GetFieldID()] = &storage.Int8FieldData{
Data: make([]int8, 0),
NumRows: []int64{0},
Data: make([]int8, 0),
}
case schemapb.DataType_Int16:
segmentData[schema.GetFieldID()] = &storage.Int16FieldData{
Data: make([]int16, 0),
NumRows: []int64{0},
Data: make([]int16, 0),
}
case schemapb.DataType_Int32:
segmentData[schema.GetFieldID()] = &storage.Int32FieldData{
Data: make([]int32, 0),
NumRows: []int64{0},
Data: make([]int32, 0),
}
case schemapb.DataType_Int64:
segmentData[schema.GetFieldID()] = &storage.Int64FieldData{
Data: make([]int64, 0),
NumRows: []int64{0},
Data: make([]int64, 0),
}
case schemapb.DataType_BinaryVector:
dim, _ := getFieldDimension(schema)
segmentData[schema.GetFieldID()] = &storage.BinaryVectorFieldData{
Data: make([]byte, 0),
NumRows: []int64{0},
Dim: dim,
Data: make([]byte, 0),
Dim: dim,
}
case schemapb.DataType_FloatVector:
dim, _ := getFieldDimension(schema)
segmentData[schema.GetFieldID()] = &storage.FloatVectorFieldData{
Data: make([]float32, 0),
NumRows: []int64{0},
Dim: dim,
Data: make([]float32, 0),
Dim: dim,
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
segmentData[schema.GetFieldID()] = &storage.StringFieldData{
Data: make([]string, 0),
NumRows: []int64{0},
Data: make([]string, 0),
}
default:
log.Error("Import util: unsupported data type", zap.String("DataType", getTypeName(schema.DataType)))
@ -158,7 +147,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
if value, ok := obj.(bool); ok {
field.(*storage.BoolFieldData).Data = append(field.(*storage.BoolFieldData).Data, value)
field.(*storage.BoolFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for bool type field '%s'", obj, schema.GetName())
}
@ -173,7 +161,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return err
}
field.(*storage.FloatFieldData).Data = append(field.(*storage.FloatFieldData).Data, float32(value))
field.(*storage.FloatFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for float type field '%s'", obj, schema.GetName())
}
@ -188,7 +175,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return err
}
field.(*storage.DoubleFieldData).Data = append(field.(*storage.DoubleFieldData).Data, value)
field.(*storage.DoubleFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for double type field '%s'", obj, schema.GetName())
}
@ -202,7 +188,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return fmt.Errorf("failed to parse value '%v' for int8 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int8FieldData).Data = append(field.(*storage.Int8FieldData).Data, int8(value))
field.(*storage.Int8FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int8 type field '%s'", obj, schema.GetName())
}
@ -216,7 +201,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return fmt.Errorf("failed to parse value '%v' for int16 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int16FieldData).Data = append(field.(*storage.Int16FieldData).Data, int16(value))
field.(*storage.Int16FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int16 type field '%s'", obj, schema.GetName())
}
@ -230,7 +214,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return fmt.Errorf("failed to parse value '%v' for int32 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int32FieldData).Data = append(field.(*storage.Int32FieldData).Data, int32(value))
field.(*storage.Int32FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int32 type field '%s'", obj, schema.GetName())
}
@ -244,7 +227,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
return fmt.Errorf("failed to parse value '%v' for int64 field '%s', error: %w", num, schema.GetName(), err)
}
field.(*storage.Int64FieldData).Data = append(field.(*storage.Int64FieldData).Data, value)
field.(*storage.Int64FieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for int64 type field '%s'", obj, schema.GetName())
}
@ -279,7 +261,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
}
field.(*storage.BinaryVectorFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_FloatVector:
@ -310,7 +291,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
}
}
field.(*storage.FloatVectorFieldData).NumRows[0]++
return nil
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
@ -319,7 +299,6 @@ func initValidators(collectionSchema *schemapb.CollectionSchema, validators map[
validators[schema.GetFieldID()].convertFunc = func(obj interface{}, field storage.FieldData) error {
if value, ok := obj.(string); ok {
field.(*storage.StringFieldData).Data = append(field.(*storage.StringFieldData).Data, value)
field.(*storage.StringFieldData).NumRows[0]++
} else {
return fmt.Errorf("illegal value '%v' for varchar type field '%s'", obj, schema.GetName())
}

View File

@ -504,7 +504,7 @@ func Test_TryFlushBlocks(t *testing.T) {
}
blockSize := int64(1024)
maxTotalSize := int64(2048)
maxTotalSize := int64(4096)
shardNum := int32(3)
// prepare flush data, 3 shards, each shard 10 rows

View File

@ -240,7 +240,6 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
shard = hash % uint32(v.shardNum)
pkArray := v.segmentsData[shard][v.primaryKey].(*storage.StringFieldData)
pkArray.Data = append(pkArray.Data, pk)
pkArray.NumRows[0]++
} else {
// get/generate the row id
var pk int64
@ -269,13 +268,11 @@ func (v *JSONRowConsumer) Handle(rows []map[storage.FieldID]interface{}) error {
shard = hash % uint32(v.shardNum)
pkArray := v.segmentsData[shard][v.primaryKey].(*storage.Int64FieldData)
pkArray.Data = append(pkArray.Data, pk)
pkArray.NumRows[0]++
}
// set rowid field
rowIDField := v.segmentsData[shard][common.RowIDField].(*storage.Int64FieldData)
rowIDField.Data = append(rowIDField.Data, rowIDBegin+int64(i))
rowIDField.NumRows[0]++
// convert value and consume
for name, validator := range v.validators {

View File

@ -141,7 +141,6 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
for j := 0; j < rowCountEachShard; j++ {
pkFieldData.Data = append(pkFieldData.Data, int64(j))
}
pkFieldData.NumRows = []int64{int64(rowCountEachShard)}
}
err = consumer.flush(true)
@ -162,7 +161,6 @@ func Test_JSONRowConsumerFlush(t *testing.T) {
for j := 0; j < rowCountEachShard; j++ {
pkFieldData.Data = append(pkFieldData.Data, int64(j))
}
pkFieldData.NumRows = []int64{int64(rowCountEachShard)}
}
err = consumer.flush(true)
assert.Nil(t, err)
@ -208,7 +206,6 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
for i := 0; i < 10; i++ {
pkFieldData.Data = append(pkFieldData.Data, int64(i))
}
pkFieldData.NumRows = []int64{int64(10)}
// nil input will trigger flush
err = consumer.Handle(nil)
@ -222,7 +219,6 @@ func Test_JSONRowConsumerHandle(t *testing.T) {
for j := 0; j < rowCount; j++ {
pkFieldData.Data = append(pkFieldData.Data, int64(j))
}
pkFieldData.NumRows = []int64{int64(rowCount)}
input := make([]map[storage.FieldID]interface{}, rowCount)
for j := 0; j < rowCount; j++ {

View File

@ -444,8 +444,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.BoolFieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Int8:
data, err := columnReader.reader.ReadInt8(rowCount)
@ -455,8 +454,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.Int8FieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Int16:
data, err := columnReader.reader.ReadInt16(rowCount)
@ -466,8 +464,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.Int16FieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Int32:
data, err := columnReader.reader.ReadInt32(rowCount)
@ -477,8 +474,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.Int32FieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Int64:
data, err := columnReader.reader.ReadInt64(rowCount)
@ -488,8 +484,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.Int64FieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Float:
data, err := columnReader.reader.ReadFloat32(rowCount)
@ -499,8 +494,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.FloatFieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_Double:
data, err := columnReader.reader.ReadFloat64(rowCount)
@ -510,8 +504,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.DoubleFieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_VarChar:
data, err := columnReader.reader.ReadString(rowCount)
@ -521,8 +514,7 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.StringFieldData{
NumRows: []int64{int64(len(data))},
Data: data,
Data: data,
}, nil
case schemapb.DataType_BinaryVector:
data, err := columnReader.reader.ReadUint8(rowCount * (columnReader.dimension / 8))
@ -532,9 +524,8 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.BinaryVectorFieldData{
NumRows: []int64{int64(len(data) * 8 / columnReader.dimension)},
Data: data,
Dim: columnReader.dimension,
Data: data,
Dim: columnReader.dimension,
}, nil
case schemapb.DataType_FloatVector:
// float32/float64 numpy file can be used for float vector file, 2 reasons:
@ -564,9 +555,8 @@ func (p *NumpyParser) readData(columnReader *NumpyColumnReader, rowCount int) (s
}
return &storage.FloatVectorFieldData{
NumRows: []int64{int64(len(data) / columnReader.dimension)},
Data: data,
Dim: columnReader.dimension,
Data: data,
Dim: columnReader.dimension,
}, nil
default:
log.Error("Numpy parser: unsupported data type of field", zap.Any("dataType", columnReader.dataType),
@ -583,63 +573,54 @@ func (p *NumpyParser) appendFunc(schema *schemapb.FieldSchema) func(src storage.
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.BoolFieldData)
arr.Data = append(arr.Data, src.GetRow(n).(bool))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Float:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.FloatFieldData)
arr.Data = append(arr.Data, src.GetRow(n).(float32))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Double:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.DoubleFieldData)
arr.Data = append(arr.Data, src.GetRow(n).(float64))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Int8:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.Int8FieldData)
arr.Data = append(arr.Data, src.GetRow(n).(int8))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Int16:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.Int16FieldData)
arr.Data = append(arr.Data, src.GetRow(n).(int16))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Int32:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.Int32FieldData)
arr.Data = append(arr.Data, src.GetRow(n).(int32))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_Int64:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.Int64FieldData)
arr.Data = append(arr.Data, src.GetRow(n).(int64))
arr.NumRows[0]++
return nil
}
case schemapb.DataType_BinaryVector:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.BinaryVectorFieldData)
arr.Data = append(arr.Data, src.GetRow(n).([]byte)...)
arr.NumRows[0]++
return nil
}
case schemapb.DataType_FloatVector:
return func(src storage.FieldData, n int, target storage.FieldData) error {
arr := target.(*storage.FloatVectorFieldData)
arr.Data = append(arr.Data, src.GetRow(n).([]float32)...)
arr.NumRows[0]++
return nil
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
@ -736,8 +717,7 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie
rowIDField, ok := fieldsData[common.RowIDField]
if !ok {
rowIDField = &storage.Int64FieldData{
Data: make([]int64, 0),
NumRows: []int64{0},
Data: make([]int64, 0),
}
fieldsData[common.RowIDField] = rowIDField
}
@ -755,8 +735,7 @@ func (p *NumpyParser) splitFieldsData(fieldsData map[storage.FieldID]storage.Fie
}
primaryDataArr := &storage.Int64FieldData{
NumRows: []int64{int64(rowCount)},
Data: make([]int64, 0, rowCount),
Data: make([]int64, 0, rowCount),
}
for i := rowIDBegin; i < rowIDEnd; i++ {
primaryDataArr.Data = append(primaryDataArr.Data, i)