mirror of https://github.com/milvus-io/milvus.git
Fix compaction generates wrong insert binlogs (#12204)
This PR - changes Get() to GetRow() in BinlogIterator - removes Get() and Length() of FieldData - adds ut in BinlogIterator for floatvector and binaryvector - fix a log bug Fixes: #12146 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/12230/head
parent
37c331bf65
commit
6bf0c4326d
|
@ -571,15 +571,15 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
case schemapb.DataType_FloatVector:
|
||||
var data = &storage.FloatVectorFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]float32, 0, len(content)),
|
||||
Data: []float32{},
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
r, ok := c.(float32)
|
||||
r, ok := c.([]float32)
|
||||
if !ok {
|
||||
return nil, errTransferType
|
||||
}
|
||||
data.Data = append(data.Data, r)
|
||||
data.Data = append(data.Data, r...)
|
||||
}
|
||||
|
||||
data.Dim = len(data.Data) / int(numRows)
|
||||
|
@ -588,15 +588,15 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}
|
|||
case schemapb.DataType_BinaryVector:
|
||||
var data = &storage.BinaryVectorFieldData{
|
||||
NumRows: numOfRows,
|
||||
Data: make([]byte, 0, len(content)),
|
||||
Data: []byte{},
|
||||
}
|
||||
|
||||
for _, c := range content {
|
||||
r, ok := c.(byte)
|
||||
r, ok := c.([]byte)
|
||||
if !ok {
|
||||
return nil, errTransferType
|
||||
}
|
||||
data.Data = append(data.Data, r)
|
||||
data.Data = append(data.Data, r...)
|
||||
}
|
||||
|
||||
data.Dim = len(data.Data) * 8 / int(numRows)
|
||||
|
|
|
@ -76,8 +76,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
{true, schemapb.DataType_Int64, []interface{}{int64(1), int64(2)}, "valid int64"},
|
||||
{true, schemapb.DataType_Float, []interface{}{float32(1), float32(2)}, "valid float32"},
|
||||
{true, schemapb.DataType_Double, []interface{}{float64(1), float64(2)}, "valid float64"},
|
||||
{true, schemapb.DataType_FloatVector, []interface{}{float32(1), float32(2)}, "valid floatvector"},
|
||||
{true, schemapb.DataType_BinaryVector, []interface{}{byte(255), byte(1)}, "valid binaryvector"},
|
||||
{true, schemapb.DataType_FloatVector, []interface{}{[]float32{1.0, 2.0}}, "valid floatvector"},
|
||||
{true, schemapb.DataType_BinaryVector, []interface{}{[]byte{255}}, "valid binaryvector"},
|
||||
{false, schemapb.DataType_Bool, []interface{}{1, 2}, "invalid bool"},
|
||||
{false, schemapb.DataType_Int8, []interface{}{nil, nil}, "invalid int8"},
|
||||
{false, schemapb.DataType_Int16, []interface{}{nil, nil}, "invalid int16"},
|
||||
|
@ -95,7 +95,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
if test.isvalid {
|
||||
fd, err := interface2FieldData(test.tp, test.content, 2)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, fd.Length())
|
||||
assert.Equal(t, 2, fd.RowNum())
|
||||
} else {
|
||||
fd, err := interface2FieldData(test.tp, test.content, 2)
|
||||
assert.Error(t, err)
|
||||
|
|
|
@ -109,7 +109,7 @@ func (loader *indexLoader) getIndexBinlog(indexPath []string) ([][]byte, indexPa
|
|||
var indexName string
|
||||
indexCodec := storage.NewIndexFileBinlogCodec()
|
||||
for _, p := range indexPath {
|
||||
log.Debug("", zap.String("load path", fmt.Sprintln(indexPath)))
|
||||
log.Debug("", zap.String("load path", fmt.Sprintln(p)))
|
||||
indexPiece, err := loader.kv.Load(p)
|
||||
if err != nil {
|
||||
return nil, nil, "", err
|
||||
|
|
|
@ -83,13 +83,13 @@ func (itr *InsertBinlogIterator) Next() (interface{}, error) {
|
|||
|
||||
m := make(map[FieldID]interface{})
|
||||
for fieldID, fieldData := range itr.data.Data {
|
||||
m[fieldID] = fieldData.Get(itr.pos)
|
||||
m[fieldID] = fieldData.GetRow(itr.pos)
|
||||
}
|
||||
|
||||
v := &Value{
|
||||
ID: itr.data.Data[rootcoord.RowIDField].Get(itr.pos).(int64),
|
||||
Timestamp: itr.data.Data[rootcoord.TimeStampField].Get(itr.pos).(int64),
|
||||
PK: itr.data.Data[itr.PKfieldID].Get(itr.pos).(int64),
|
||||
ID: itr.data.Data[rootcoord.RowIDField].GetRow(itr.pos).(int64),
|
||||
Timestamp: itr.data.Data[rootcoord.TimeStampField].GetRow(itr.pos).(int64),
|
||||
PK: itr.data.Data[itr.PKfieldID].GetRow(itr.pos).(int64),
|
||||
IsDeleted: false,
|
||||
Value: m,
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ func (itr *InsertBinlogIterator) hasNext() bool {
|
|||
if !ok {
|
||||
return false
|
||||
}
|
||||
return itr.pos < itr.data.Data[rootcoord.RowIDField].Length()
|
||||
return itr.pos < itr.data.Data[rootcoord.RowIDField].RowNum()
|
||||
}
|
||||
|
||||
func (itr *InsertBinlogIterator) isDisposed() bool {
|
||||
|
|
|
@ -26,18 +26,49 @@ func generateTestData(t *testing.T, num int) []*Blob {
|
|||
{FieldID: rootcoord.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
|
||||
{FieldID: rootcoord.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
|
||||
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
|
||||
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector},
|
||||
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector},
|
||||
}}
|
||||
insertCodec := NewInsertCodec(&etcdpb.CollectionMeta{ID: 1, Schema: schema})
|
||||
|
||||
data := &InsertData{Data: map[FieldID]FieldData{rootcoord.TimeStampField: &Int64FieldData{Data: []int64{}}, rootcoord.RowIDField: &Int64FieldData{Data: []int64{}}, 101: &Int32FieldData{Data: []int32{}}}}
|
||||
var (
|
||||
field0 []int64
|
||||
field1 []int64
|
||||
field101 []int32
|
||||
field102 []float32
|
||||
field103 []byte
|
||||
)
|
||||
|
||||
for i := 1; i <= num; i++ {
|
||||
field0 := data.Data[rootcoord.TimeStampField].(*Int64FieldData)
|
||||
field0.Data = append(field0.Data, int64(i))
|
||||
field1 := data.Data[rootcoord.RowIDField].(*Int64FieldData)
|
||||
field1.Data = append(field1.Data, int64(i))
|
||||
field2 := data.Data[101].(*Int32FieldData)
|
||||
field2.Data = append(field2.Data, int32(i))
|
||||
field0 = append(field0, int64(i))
|
||||
field1 = append(field1, int64(i))
|
||||
field101 = append(field101, int32(i))
|
||||
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
field102 = append(field102, f102...)
|
||||
field103 = append(field103, byte(i))
|
||||
}
|
||||
|
||||
data := &InsertData{Data: map[FieldID]FieldData{
|
||||
rootcoord.RowIDField: &Int64FieldData{Data: field0},
|
||||
rootcoord.TimeStampField: &Int64FieldData{Data: field1},
|
||||
101: &Int32FieldData{Data: field101},
|
||||
102: &FloatVectorFieldData{
|
||||
NumRows: []int64{int64(num)},
|
||||
Data: field102,
|
||||
Dim: 8,
|
||||
},
|
||||
103: &BinaryVectorFieldData{
|
||||
NumRows: []int64{int64(num)},
|
||||
Data: field103,
|
||||
Dim: 8,
|
||||
},
|
||||
}}
|
||||
|
||||
blobs, _, err := insertCodec.Serialize(1, 1, data)
|
||||
assert.Nil(t, err)
|
||||
return blobs
|
||||
|
@ -74,12 +105,24 @@ func TestInsertlogIterator(t *testing.T) {
|
|||
v, err := itr.Next()
|
||||
assert.Nil(t, err)
|
||||
value := v.(*Value)
|
||||
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
int64(i),
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{rootcoord.TimeStampField: int64(i), rootcoord.RowIDField: int64(i), 101: int32(i)},
|
||||
map[FieldID]interface{}{
|
||||
rootcoord.TimeStampField: int64(i),
|
||||
rootcoord.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
assert.EqualValues(t, expected, value)
|
||||
}
|
||||
|
@ -120,12 +163,23 @@ func TestMergeIterator(t *testing.T) {
|
|||
v, err := itr.Next()
|
||||
assert.Nil(t, err)
|
||||
value := v.(*Value)
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
int64(i),
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{rootcoord.TimeStampField: int64(i), rootcoord.RowIDField: int64(i), 101: int32(i)},
|
||||
map[FieldID]interface{}{
|
||||
rootcoord.TimeStampField: int64(i),
|
||||
rootcoord.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
assert.EqualValues(t, expected, value)
|
||||
}
|
||||
|
@ -144,12 +198,23 @@ func TestMergeIterator(t *testing.T) {
|
|||
itr := NewMergeIterator(iterators)
|
||||
|
||||
for i := 1; i <= 3; i++ {
|
||||
f102 := make([]float32, 8)
|
||||
for j := range f102 {
|
||||
f102[j] = float32(i)
|
||||
}
|
||||
|
||||
expected := &Value{
|
||||
int64(i),
|
||||
int64(i),
|
||||
int64(i),
|
||||
false,
|
||||
map[FieldID]interface{}{rootcoord.TimeStampField: int64(i), rootcoord.RowIDField: int64(i), 101: int32(i)},
|
||||
map[FieldID]interface{}{
|
||||
rootcoord.TimeStampField: int64(i),
|
||||
rootcoord.RowIDField: int64(i),
|
||||
101: int32(i),
|
||||
102: f102,
|
||||
103: []byte{byte(i)},
|
||||
},
|
||||
}
|
||||
for j := 0; j < 2; j++ {
|
||||
assert.True(t, itr.HasNext())
|
||||
|
|
|
@ -101,8 +101,6 @@ func (b Blob) GetValue() []byte {
|
|||
}
|
||||
|
||||
type FieldData interface {
|
||||
Length() int
|
||||
Get(i int) interface{}
|
||||
GetMemorySize() int
|
||||
RowNum() int
|
||||
GetRow(i int) interface{}
|
||||
|
@ -151,28 +149,6 @@ type FloatVectorFieldData struct {
|
|||
Dim int
|
||||
}
|
||||
|
||||
func (data *BoolFieldData) Length() int { return len(data.Data) }
|
||||
func (data *Int8FieldData) Length() int { return len(data.Data) }
|
||||
func (data *Int16FieldData) Length() int { return len(data.Data) }
|
||||
func (data *Int32FieldData) Length() int { return len(data.Data) }
|
||||
func (data *Int64FieldData) Length() int { return len(data.Data) }
|
||||
func (data *FloatFieldData) Length() int { return len(data.Data) }
|
||||
func (data *DoubleFieldData) Length() int { return len(data.Data) }
|
||||
func (data *StringFieldData) Length() int { return len(data.Data) }
|
||||
func (data *BinaryVectorFieldData) Length() int { return len(data.Data) }
|
||||
func (data *FloatVectorFieldData) Length() int { return len(data.Data) }
|
||||
|
||||
func (data *BoolFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *Int8FieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *Int16FieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *Int32FieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *Int64FieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *FloatFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *DoubleFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *StringFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *BinaryVectorFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
func (data *FloatVectorFieldData) Get(i int) interface{} { return data.Data[i] }
|
||||
|
||||
func (data *BoolFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int8FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int16FieldData) RowNum() int { return len(data.Data) }
|
||||
|
|
Loading…
Reference in New Issue