fix: iaccurate size estimation for encoded array data (#36373)

issue: #36029

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/36470/head
jaime 2024-09-24 14:51:14 +08:00 committed by GitHub
parent 350dde666d
commit 52cce4de58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 29 additions and 17 deletions

View File

@ -179,9 +179,9 @@ pulsar:
port: 6650 # Port of Pulsar service.
webport: 80 # Web port of of Pulsar service. If you connect direcly without proxy, should use 8080.
# The maximum size of each message in Pulsar. Unit: Byte.
# By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
# By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
# If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage.
maxMessageSize: 5242880
maxMessageSize: 2097152
# Pulsar can be provisioned for specific tenants with appropriate capacity allocated to the tenant.
# To share a Pulsar instance among multiple Milvus instances, you can change this to an Pulsar tenant rather than the default one for each Milvus instance before you start them. However, if you do not want Pulsar multi-tenancy, you are advised to change msgChannel.chanNamePrefix.cluster to the different value.
tenant: public

View File

@ -178,7 +178,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
}
// set field ID to insert field data
err = fillFieldIDBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema)
err = fillFieldPropertiesBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema)
if err != nil {
log.Info("set fieldID to fieldData failed",
zap.Error(err))

View File

@ -198,7 +198,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}
// set field ID to insert field data
err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
err = fillFieldPropertiesBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
if err != nil {
log.Warn("insert set fieldID to fieldData failed when upsert",
zap.Error(err))

View File

@ -894,8 +894,8 @@ func autoGenDynamicFieldData(data [][]byte) *schemapb.FieldData {
}
}
// fillFieldIDBySchema set fieldID to fieldData according FieldSchemas
func fillFieldIDBySchema(columns []*schemapb.FieldData, schema *schemapb.CollectionSchema) error {
// fillFieldPropertiesBySchema set fieldID to fieldData according FieldSchemas
func fillFieldPropertiesBySchema(columns []*schemapb.FieldData, schema *schemapb.CollectionSchema) error {
fieldName2Schema := make(map[string]*schemapb.FieldSchema)
expectColumnNum := 0
@ -915,6 +915,16 @@ func fillFieldIDBySchema(columns []*schemapb.FieldData, schema *schemapb.Collect
if fieldSchema, ok := fieldName2Schema[fieldData.FieldName]; ok {
fieldData.FieldId = fieldSchema.FieldID
fieldData.Type = fieldSchema.DataType
// Set the ElementType because it may not be set in the insert request.
if fieldData.Type == schemapb.DataType_Array {
fd, ok := fieldData.Field.(*schemapb.FieldData_Scalars)
if !ok {
return fmt.Errorf("field convert FieldData_Scalars fail in fieldData, fieldName: %s,"+
" collectionName:%s", fieldData.FieldName, schema.Name)
}
fd.Scalars.GetArrayData().ElementType = fieldSchema.ElementType
}
} else {
return fmt.Errorf("fieldName %v not exist in collection schema", fieldData.FieldName)
}

View File

@ -738,7 +738,7 @@ func TestFillFieldIDBySchema(t *testing.T) {
}
// length mismatch
assert.Error(t, fillFieldIDBySchema(columns, schema))
assert.Error(t, fillFieldPropertiesBySchema(columns, schema))
schema = &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
@ -748,7 +748,7 @@ func TestFillFieldIDBySchema(t *testing.T) {
},
},
}
assert.NoError(t, fillFieldIDBySchema(columns, schema))
assert.NoError(t, fillFieldPropertiesBySchema(columns, schema))
assert.Equal(t, "TestFillFieldIDBySchema", columns[0].FieldName)
assert.Equal(t, schemapb.DataType_Int64, columns[0].Type)
assert.Equal(t, int64(1), columns[0].FieldId)

View File

@ -33,12 +33,10 @@ import (
)
const (
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
defaultEtcdLogLevel = "info"
defaultEtcdLogPath = "stdout"
KafkaProducerConfigPrefix = "kafka.producer."
KafkaConsumerConfigPrefix = "kafka.consumer."
defaultEtcdLogLevel = "info"
defaultEtcdLogPath = "stdout"
KafkaProducerConfigPrefix = "kafka.producer."
KafkaConsumerConfigPrefix = "kafka.consumer."
)
// ServiceParam is used to quickly and easily access all basic service configurations.
@ -682,9 +680,9 @@ Default value applies when Pulsar is running on the same network with Milvus.`,
p.MaxMessageSize = ParamItem{
Key: "pulsar.maxMessageSize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(SuggestPulsarMaxMessageSize),
DefaultValue: "2097152",
Doc: `The maximum size of each message in Pulsar. Unit: Byte.
By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly.
If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage.`,
Export: true,
}

View File

@ -96,7 +96,7 @@ func TestServiceParam(t *testing.T) {
{
assert.NotEqual(t, SParams.PulsarCfg.Address.GetValue(), "")
t.Logf("pulsar address = %s", SParams.PulsarCfg.Address.GetValue())
assert.Equal(t, SParams.PulsarCfg.MaxMessageSize.GetAsInt(), SuggestPulsarMaxMessageSize)
assert.Equal(t, SParams.PulsarCfg.MaxMessageSize.GetAsInt(), 2097152)
}
address := "pulsar://localhost:6650"

View File

@ -196,6 +196,8 @@ func CalcColumnSize(column *schemapb.FieldData) int {
for _, str := range column.GetScalars().GetJsonData().GetData() {
res += len(str)
}
default:
panic("Unknown data type:" + column.Type.String())
}
return res
}
@ -244,6 +246,8 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e
// counting only the size of the vector data, ignoring other
// bytes used in proto.
res += len(vec.Contents[rowOffset])
default:
panic("Unknown data type:" + fs.GetType().String())
}
}
return res, nil