mirror of https://github.com/milvus-io/milvus.git
pr: https://github.com/milvus-io/milvus/pull/28642 issue https://github.com/milvus-io/milvus/issues/28588 Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com> Co-authored-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/29324/head
parent
d34ada83cb
commit
90847fbcfd
|
@ -59,13 +59,13 @@ func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerCo
|
|||
func NewKafkaClientInstanceWithConfig(ctx context.Context, config *paramtable.KafkaConfig) (*kafkaClient, error) {
|
||||
kafkaConfig := getBasicConfig(config.Address.GetValue())
|
||||
|
||||
// connection setup timeout, default as 30000ms
|
||||
// connection setup timeout, default as 30000ms, available range is [1000, 2147483647]
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
if deadline.Before(time.Now()) {
|
||||
return nil, errors.New("context timeout when new kafka client")
|
||||
}
|
||||
timeout := time.Until(deadline).Milliseconds()
|
||||
kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout)
|
||||
// timeout := time.Until(deadline).Milliseconds()
|
||||
// kafkaConfig.SetKey("socket.connection.setup.timeout.ms", timeout)
|
||||
}
|
||||
|
||||
if (config.SaslUsername.GetValue() == "" && config.SaslPassword.GetValue() != "") ||
|
||||
|
|
Loading…
Reference in New Issue