fix: Fix SparseFloatVector data parse error for parquet (#33187)

Issue: #22837

Signed-off-by: Cai Yudong <yudong.cai@zilliz.com>
pull/33259/head
Cai Yudong 2024-05-21 15:09:39 +08:00 committed by GitHub
parent 2d6f12d48b
commit cb480d17c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 164 additions and 13 deletions

View File

@ -291,7 +291,7 @@ func ReadSparseFloatVectorData(pcr *FieldReader, count int64) (any, error) {
for _, str := range data.([]string) {
rowVec, err := typeutil.CreateSparseFloatRowFromJSON([]byte(str))
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s'", str))
return nil, merr.WrapErrImportFailed(fmt.Sprintf("Invalid JSON string for SparseFloatVector: '%s', err = %v", str, err))
}
byteArr = append(byteArr, rowVec)
elemCount := len(rowVec) / 8

View File

@ -1550,12 +1550,31 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
return nil, fmt.Errorf("empty JSON input")
}
// try format1
indices, ok1 := input["indices"].([]uint32)
values, ok2 := input["values"].([]float32)
jsonIndices, ok1 := input["indices"].([]interface{})
jsonValues, ok2 := input["values"].([]interface{})
// try format2
if !ok1 && !ok2 {
if ok1 && ok2 {
// try format1
for _, v1 := range jsonIndices {
if num1, suc1 := v1.(int); suc1 {
indices = append(indices, uint32(num1))
} else {
if num2, suc2 := v1.(float64); suc2 && num2 == float64(int(num2)) {
indices = append(indices, uint32(num2))
} else {
return nil, fmt.Errorf("invalid index type: %v(%s)", v1, reflect.TypeOf(v1))
}
}
}
for _, v2 := range jsonValues {
if num, ok := v2.(float64); ok {
values = append(values, float32(num))
} else {
return nil, fmt.Errorf("invalid value type: %s", reflect.TypeOf(v2))
}
}
} else if !ok1 && !ok2 {
// try format2
for k, v := range input {
idx, err := strconv.ParseUint(k, 0, 32)
if err != nil {
@ -1578,7 +1597,7 @@ func CreateSparseFloatRowFromMap(input map[string]interface{}) ([]byte, error) {
indices = append(indices, uint32(idx))
values = append(values, float32(val))
}
} else if ok1 != ok2 {
} else {
return nil, fmt.Errorf("invalid JSON input")
}

View File

@ -2120,39 +2120,45 @@ func TestValidateSparseFloatRows(t *testing.T) {
func TestParseJsonSparseFloatRow(t *testing.T) {
t.Run("valid row 1", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{1, 3, 5}, "values": []float32{1.0, 2.0, 3.0}}
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0, 3.0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid row 2", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{3, 1, 5}, "values": []float32{1.0, 2.0, 3.0}}
row := map[string]interface{}{"indices": []interface{}{3, 1, 5}, "values": []interface{}{1.0, 2.0, 3.0}}
res, err := CreateSparseFloatRowFromMap(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("invalid row 1", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{1, 3, 5}, "values": []float32{1.0, 2.0}}
row := map[string]interface{}{"indices": []interface{}{1, 3, 5}, "values": []interface{}{1.0, 2.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 2", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{1}, "values": []float32{1.0, 2.0}}
row := map[string]interface{}{"indices": []interface{}{1}, "values": []interface{}{1.0, 2.0}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 3", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{}, "values": []float32{}}
row := map[string]interface{}{"indices": []interface{}{}, "values": []interface{}{}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 4", func(t *testing.T) {
row := map[string]interface{}{"indices": []uint32{3}, "values": []float32{-0.2}}
row := map[string]interface{}{"indices": []interface{}{3}, "values": []interface{}{-0.2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid row 5", func(t *testing.T) {
row := map[string]interface{}{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
@ -2206,4 +2212,130 @@ func TestParseJsonSparseFloatRow(t *testing.T) {
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
t.Run("invalid dict row 7", func(t *testing.T) {
row := map[string]interface{}{"1.1": 1.0, "3": 2.0, "5": 3.0}
_, err := CreateSparseFloatRowFromMap(row)
assert.Error(t, err)
})
}
func TestParseJsonSparseFloatRowBytes(t *testing.T) {
t.Run("valid row 1", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid row 2", func(t *testing.T) {
row := []byte(`{"indices":[3,1,5],"values":[1.0,2.0,3.0]}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("invalid row 1", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0,3.0`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 2", func(t *testing.T) {
row := []byte(`{"indices":[1,3,5],"values":[1.0,2.0]`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 3", func(t *testing.T) {
row := []byte(`{"indices":[1],"values":[1.0,2.0]`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 4", func(t *testing.T) {
row := []byte(`{"indices":[],"values":[]`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 5", func(t *testing.T) {
row := []byte(`{"indices":[-3],"values":[0.2]`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 6", func(t *testing.T) {
row := []byte(`{"indices":[3],"values":[-0.2]`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid row 7", func(t *testing.T) {
row := []byte(`{"indices": []interface{}{3.1}, "values": []interface{}{0.2}}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("valid dict row 1", func(t *testing.T) {
row := []byte(`{"1": 1.0, "3": 2.0, "5": 3.0}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), res)
})
t.Run("valid dict row 2", func(t *testing.T) {
row := []byte(`{"3": 1.0, "1": 2.0, "5": 3.0}`)
res, err := CreateSparseFloatRowFromJSON(row)
assert.NoError(t, err)
assert.Equal(t, CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{2.0, 1.0, 3.0}), res)
})
t.Run("invalid dict row 1", func(t *testing.T) {
row := []byte(`{"a": 1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 2", func(t *testing.T) {
row := []byte(`{"1": "a", "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 3", func(t *testing.T) {
row := []byte(`{"1": "1.0", "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 4", func(t *testing.T) {
row := []byte(`{"1": 1.0, "3": 2.0, "5": }`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 5", func(t *testing.T) {
row := []byte(`{"-1": 1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 6", func(t *testing.T) {
row := []byte(`{"1": -1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 7", func(t *testing.T) {
row := []byte(`{}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
t.Run("invalid dict row 8", func(t *testing.T) {
row := []byte(`{"1.1": 1.0, "3": 2.0, "5": 3.0}`)
_, err := CreateSparseFloatRowFromJSON(row)
assert.Error(t, err)
})
}