enhance: add type info for payload writer error message and add log when querynode find new collection (#32522)

relate: https://github.com/milvus-io/milvus/issues/32668

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/32818/head
aoiasd 2024-05-07 14:45:29 +08:00 committed by GitHub
parent 29f3cb692b
commit 31dca3249e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 339 additions and 43 deletions

View File

@ -93,6 +93,7 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec
return
}
log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema))
collection := NewCollection(collectionID, schema, meta, loadMeta)
collection.Ref(1)
m.collections[collectionID] = collection

View File

@ -181,11 +181,11 @@ func (w *NativePayloadWriter) AddDataToPayload(data interface{}, dim ...int) err
func (w *NativePayloadWriter) AddBoolToPayload(data []bool) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished bool payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into bool payload")
}
builder, ok := w.builder.(*array.BooleanBuilder)
@ -199,16 +199,16 @@ func (w *NativePayloadWriter) AddBoolToPayload(data []bool) error {
func (w *NativePayloadWriter) AddByteToPayload(data []byte) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished byte payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into byte payload")
}
builder, ok := w.builder.(*array.Int8Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast ByteBuilder")
}
builder.Reserve(len(data))
@ -221,16 +221,16 @@ func (w *NativePayloadWriter) AddByteToPayload(data []byte) error {
func (w *NativePayloadWriter) AddInt8ToPayload(data []int8) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished int8 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into int8 payload")
}
builder, ok := w.builder.(*array.Int8Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast Int8Builder")
}
builder.AppendValues(data, nil)
@ -239,16 +239,16 @@ func (w *NativePayloadWriter) AddInt8ToPayload(data []int8) error {
func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished int16 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into int64 payload")
}
builder, ok := w.builder.(*array.Int16Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast Int16Builder")
}
builder.AppendValues(data, nil)
@ -257,16 +257,16 @@ func (w *NativePayloadWriter) AddInt16ToPayload(data []int16) error {
func (w *NativePayloadWriter) AddInt32ToPayload(data []int32) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished int32 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into int32 payload")
}
builder, ok := w.builder.(*array.Int32Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast Int32Builder")
}
builder.AppendValues(data, nil)
@ -275,16 +275,16 @@ func (w *NativePayloadWriter) AddInt32ToPayload(data []int32) error {
func (w *NativePayloadWriter) AddInt64ToPayload(data []int64) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished int64 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into int64 payload")
}
builder, ok := w.builder.(*array.Int64Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast Int64Builder")
}
builder.AppendValues(data, nil)
@ -293,16 +293,16 @@ func (w *NativePayloadWriter) AddInt64ToPayload(data []int64) error {
func (w *NativePayloadWriter) AddFloatToPayload(data []float32) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished float payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into float payload")
}
builder, ok := w.builder.(*array.Float32Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast FloatBuilder")
}
builder.AppendValues(data, nil)
@ -311,16 +311,16 @@ func (w *NativePayloadWriter) AddFloatToPayload(data []float32) error {
func (w *NativePayloadWriter) AddDoubleToPayload(data []float64) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished double payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into double payload")
}
builder, ok := w.builder.(*array.Float64Builder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast DoubleBuilder")
}
builder.AppendValues(data, nil)
@ -329,12 +329,12 @@ func (w *NativePayloadWriter) AddDoubleToPayload(data []float64) error {
func (w *NativePayloadWriter) AddOneStringToPayload(data string) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished string payload")
}
builder, ok := w.builder.(*array.StringBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast StringBuilder")
}
builder.Append(data)
@ -344,7 +344,7 @@ func (w *NativePayloadWriter) AddOneStringToPayload(data string) error {
func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished array payload")
}
bytes, err := proto.Marshal(data)
@ -354,7 +354,7 @@ func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) e
builder, ok := w.builder.(*array.BinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast BinaryBuilder")
}
builder.Append(bytes)
@ -364,12 +364,12 @@ func (w *NativePayloadWriter) AddOneArrayToPayload(data *schemapb.ScalarField) e
func (w *NativePayloadWriter) AddOneJSONToPayload(data []byte) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished json payload")
}
builder, ok := w.builder.(*array.BinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast JsonBuilder")
}
builder.Append(data)
@ -379,16 +379,16 @@ func (w *NativePayloadWriter) AddOneJSONToPayload(data []byte) error {
func (w *NativePayloadWriter) AddBinaryVectorToPayload(data []byte, dim int) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished binary vector payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into binary vector payload")
}
builder, ok := w.builder.(*array.FixedSizeBinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast BinaryVectorBuilder")
}
byteLength := dim / 8
@ -403,16 +403,16 @@ func (w *NativePayloadWriter) AddBinaryVectorToPayload(data []byte, dim int) err
func (w *NativePayloadWriter) AddFloatVectorToPayload(data []float32, dim int) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished float vector payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into float vector payload")
}
builder, ok := w.builder.(*array.FixedSizeBinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast FloatVectorBuilder")
}
byteLength := dim * 4
@ -434,16 +434,16 @@ func (w *NativePayloadWriter) AddFloatVectorToPayload(data []float32, dim int) e
func (w *NativePayloadWriter) AddFloat16VectorToPayload(data []byte, dim int) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished float16 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into float16 payload")
}
builder, ok := w.builder.(*array.FixedSizeBinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast Float16Builder")
}
byteLength := dim * 2
@ -459,16 +459,16 @@ func (w *NativePayloadWriter) AddFloat16VectorToPayload(data []byte, dim int) er
func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished BFloat16 payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into payload")
return errors.New("can't add empty msgs into BFloat16 payload")
}
builder, ok := w.builder.(*array.FixedSizeBinaryBuilder)
if !ok {
return errors.New("failed to cast ArrayBuilder")
return errors.New("failed to cast BFloat16Builder")
}
byteLength := dim * 2
@ -484,11 +484,11 @@ func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) e
func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error {
if w.finished {
return errors.New("can't append data to finished writer")
return errors.New("can't append data to finished sparse float vector payload")
}
builder, ok := w.builder.(*array.BinaryBuilder)
if !ok {
return errors.New("failed to cast BinaryBuilder")
return errors.New("failed to cast SparseFloatVectorBuilder")
}
length := len(data.SparseFloatArray.Contents)
builder.Reserve(length)

View File

@ -0,0 +1,295 @@
package storage
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
func TestPayloadWriter_Failed(t *testing.T) {
t.Run("Test Bool", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Bool)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddBoolToPayload([]bool{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddBoolToPayload([]bool{false})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddBoolToPayload([]bool{false})
require.Error(t, err)
})
t.Run("Test Byte", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Int8)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddByteToPayload([]byte{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddByteToPayload([]byte{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddByteToPayload([]byte{0})
require.Error(t, err)
})
t.Run("Test Int8", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Int8)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt8ToPayload([]int8{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddInt8ToPayload([]int8{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt8ToPayload([]int8{0})
require.Error(t, err)
})
t.Run("Test Int16", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Int16)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt16ToPayload([]int16{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddInt16ToPayload([]int16{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt16ToPayload([]int16{0})
require.Error(t, err)
})
t.Run("Test Int32", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Int32)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt32ToPayload([]int32{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddInt32ToPayload([]int32{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt32ToPayload([]int32{0})
require.Error(t, err)
})
t.Run("Test Int64", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt64ToPayload([]int64{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddInt64ToPayload([]int64{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddInt64ToPayload([]int64{0})
require.Error(t, err)
})
t.Run("Test Float", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Float)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddFloatToPayload([]float32{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddFloatToPayload([]float32{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddFloatToPayload([]float32{0})
require.Error(t, err)
})
t.Run("Test Double", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Double)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddDoubleToPayload([]float64{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddDoubleToPayload([]float64{0})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddDoubleToPayload([]float64{0})
require.Error(t, err)
})
t.Run("Test String", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_String)
require.Nil(t, err)
require.NotNil(t, w)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddOneStringToPayload("test")
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddOneStringToPayload("test")
require.Error(t, err)
})
t.Run("Test Array", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_Array)
require.Nil(t, err)
require.NotNil(t, w)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddOneArrayToPayload(&schemapb.ScalarField{})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddOneArrayToPayload(&schemapb.ScalarField{})
require.Error(t, err)
})
t.Run("Test Json", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_JSON)
require.Nil(t, err)
require.NotNil(t, w)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddOneJSONToPayload([]byte{0, 1})
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddOneJSONToPayload([]byte{0, 1})
require.Error(t, err)
})
t.Run("Test BinaryVector", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_BinaryVector, 8)
require.Nil(t, err)
require.NotNil(t, w)
data := make([]byte, 8)
for i := 0; i < 8; i++ {
data[i] = 1
}
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddBinaryVectorToPayload(data, 8)
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddBinaryVectorToPayload(data, 8)
require.Error(t, err)
})
t.Run("Test FloatVector", func(t *testing.T) {
w, err := NewPayloadWriter(schemapb.DataType_FloatVector, 8)
require.Nil(t, err)
require.NotNil(t, w)
data := make([]float32, 8)
for i := 0; i < 8; i++ {
data[i] = 1
}
err = w.AddFloatToPayload([]float32{})
require.Error(t, err)
err = w.FinishPayloadWriter()
require.NoError(t, err)
err = w.AddFloatToPayload(data)
require.Error(t, err)
w, err = NewPayloadWriter(schemapb.DataType_Int64)
require.Nil(t, err)
require.NotNil(t, w)
err = w.AddFloatToPayload(data)
require.Error(t, err)
})
}