diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 808c1c5a3a..a98371c378 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -179,9 +179,9 @@ pulsar: port: 6650 # Port of Pulsar service. webport: 80 # Web port of of Pulsar service. If you connect direcly without proxy, should use 8080. # The maximum size of each message in Pulsar. Unit: Byte. - # By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. + # By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. # If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage. - maxMessageSize: 5242880 + maxMessageSize: 2097152 # Pulsar can be provisioned for specific tenants with appropriate capacity allocated to the tenant. # To share a Pulsar instance among multiple Milvus instances, you can change this to an Pulsar tenant rather than the default one for each Milvus instance before you start them. However, if you do not want Pulsar multi-tenancy, you are advised to change msgChannel.chanNamePrefix.cluster to the different value. tenant: public diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index be46569b66..fd86fc9d3c 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -178,7 +178,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { } // set field ID to insert field data - err = fillFieldIDBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema) + err = fillFieldPropertiesBySchema(it.insertMsg.GetFieldsData(), schema.CollectionSchema) if err != nil { log.Info("set fieldID to fieldData failed", zap.Error(err)) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index ed82be44bb..825323c1d3 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -198,7 +198,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid) } // set field ID to insert field data - err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema) + err = fillFieldPropertiesBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema) if err != nil { log.Warn("insert set fieldID to fieldData failed when upsert", zap.Error(err)) diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 9f07d7b5c9..498370c0e7 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -894,8 +894,8 @@ func autoGenDynamicFieldData(data [][]byte) *schemapb.FieldData { } } -// fillFieldIDBySchema set fieldID to fieldData according FieldSchemas -func fillFieldIDBySchema(columns []*schemapb.FieldData, schema *schemapb.CollectionSchema) error { +// fillFieldPropertiesBySchema set fieldID to fieldData according FieldSchemas +func fillFieldPropertiesBySchema(columns []*schemapb.FieldData, schema *schemapb.CollectionSchema) error { fieldName2Schema := make(map[string]*schemapb.FieldSchema) expectColumnNum := 0 @@ -915,6 +915,16 @@ func fillFieldIDBySchema(columns []*schemapb.FieldData, schema *schemapb.Collect if fieldSchema, ok := fieldName2Schema[fieldData.FieldName]; ok { fieldData.FieldId = fieldSchema.FieldID fieldData.Type = fieldSchema.DataType + + // Set the ElementType because it may not be set in the insert request. + if fieldData.Type == schemapb.DataType_Array { + fd, ok := fieldData.Field.(*schemapb.FieldData_Scalars) + if !ok { + return fmt.Errorf("field convert FieldData_Scalars fail in fieldData, fieldName: %s,"+ + " collectionName:%s", fieldData.FieldName, schema.Name) + } + fd.Scalars.GetArrayData().ElementType = fieldSchema.ElementType + } } else { return fmt.Errorf("fieldName %v not exist in collection schema", fieldData.FieldName) } diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 5ff4ee0e3f..6936de3c41 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -738,7 +738,7 @@ func TestFillFieldIDBySchema(t *testing.T) { } // length mismatch - assert.Error(t, fillFieldIDBySchema(columns, schema)) + assert.Error(t, fillFieldPropertiesBySchema(columns, schema)) schema = &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -748,7 +748,7 @@ func TestFillFieldIDBySchema(t *testing.T) { }, }, } - assert.NoError(t, fillFieldIDBySchema(columns, schema)) + assert.NoError(t, fillFieldPropertiesBySchema(columns, schema)) assert.Equal(t, "TestFillFieldIDBySchema", columns[0].FieldName) assert.Equal(t, schemapb.DataType_Int64, columns[0].Type) assert.Equal(t, int64(1), columns[0].FieldId) diff --git a/pkg/util/paramtable/service_param.go b/pkg/util/paramtable/service_param.go index 152e7e7046..d38d7f8988 100644 --- a/pkg/util/paramtable/service_param.go +++ b/pkg/util/paramtable/service_param.go @@ -33,12 +33,10 @@ import ( ) const ( - // SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message. - SuggestPulsarMaxMessageSize = 5 * 1024 * 1024 - defaultEtcdLogLevel = "info" - defaultEtcdLogPath = "stdout" - KafkaProducerConfigPrefix = "kafka.producer." - KafkaConsumerConfigPrefix = "kafka.consumer." + defaultEtcdLogLevel = "info" + defaultEtcdLogPath = "stdout" + KafkaProducerConfigPrefix = "kafka.producer." + KafkaConsumerConfigPrefix = "kafka.consumer." ) // ServiceParam is used to quickly and easily access all basic service configurations. @@ -682,9 +680,9 @@ Default value applies when Pulsar is running on the same network with Milvus.`, p.MaxMessageSize = ParamItem{ Key: "pulsar.maxMessageSize", Version: "2.0.0", - DefaultValue: strconv.Itoa(SuggestPulsarMaxMessageSize), + DefaultValue: "2097152", Doc: `The maximum size of each message in Pulsar. Unit: Byte. -By default, Pulsar can transmit at most 5 MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. +By default, Pulsar can transmit at most 2MB of data in a single message. When the size of inserted data is greater than this value, proxy fragments the data into multiple messages to ensure that they can be transmitted correctly. If the corresponding parameter in Pulsar remains unchanged, increasing this configuration will cause Milvus to fail, and reducing it produces no advantage.`, Export: true, } diff --git a/pkg/util/paramtable/service_param_test.go b/pkg/util/paramtable/service_param_test.go index acbca2d8eb..60eaf9edfc 100644 --- a/pkg/util/paramtable/service_param_test.go +++ b/pkg/util/paramtable/service_param_test.go @@ -96,7 +96,7 @@ func TestServiceParam(t *testing.T) { { assert.NotEqual(t, SParams.PulsarCfg.Address.GetValue(), "") t.Logf("pulsar address = %s", SParams.PulsarCfg.Address.GetValue()) - assert.Equal(t, SParams.PulsarCfg.MaxMessageSize.GetAsInt(), SuggestPulsarMaxMessageSize) + assert.Equal(t, SParams.PulsarCfg.MaxMessageSize.GetAsInt(), 2097152) } address := "pulsar://localhost:6650" diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index cdeef109da..2a3f54c612 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -196,6 +196,8 @@ func CalcColumnSize(column *schemapb.FieldData) int { for _, str := range column.GetScalars().GetJsonData().GetData() { res += len(str) } + default: + panic("Unknown data type:" + column.Type.String()) } return res } @@ -244,6 +246,8 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e // counting only the size of the vector data, ignoring other // bytes used in proto. res += len(vec.Contents[rowOffset]) + default: + panic("Unknown data type:" + fs.GetType().String()) } } return res, nil