feat: add multiple UDP writers (#23909)

Co-authored-by: Carlos Peon Costa <carlospec@inditex.com>
pull/24036/head
carlospeon 2022-12-19 22:46:42 +01:00 committed by GitHub
parent 144aca1f7f
commit 19d83dcad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 4 deletions

View File

@ -570,6 +570,9 @@
# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
# read-buffer = 0
# Number of parallel writers that will be started.
# writers = 1
###
### [continuous_queries]
###

View File

@ -41,6 +41,9 @@ const (
// Linux: sudo sysctl -w net.core.rmem_max=<read-buffer>
// BSD/Darwin: sudo sysctl -w kern.ipc.maxsockbuf=<read-buffer>
DefaultReadBuffer = 0
// DefaultWriters is the default number of writers.
DefaultWriters = 1
)
// Config holds various configuration settings for the UDP listener.
@ -55,6 +58,7 @@ type Config struct {
ReadBuffer int `toml:"read-buffer"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
Precision string `toml:"precision"`
Writers int `toml:"writers"`
}
// NewConfig returns a new instance of Config with defaults.
@ -66,6 +70,7 @@ func NewConfig() Config {
BatchSize: DefaultBatchSize,
BatchPending: DefaultBatchPending,
BatchTimeout: toml.Duration(DefaultBatchTimeout),
Writers: DefaultWriters,
}
}
@ -91,6 +96,9 @@ func (c *Config) WithDefaults() *Config {
if d.ReadBuffer == 0 {
d.ReadBuffer = DefaultReadBuffer
}
if d.Writers == 0 {
d.Writers = DefaultWriters
}
return &d
}
@ -100,7 +108,7 @@ type Configs []Config
// Diagnostics returns one set of diagnostics for all of the Configs.
func (c Configs) Diagnostics() (*diagnostics.Diagnostics, error) {
d := &diagnostics.Diagnostics{
Columns: []string{"enabled", "bind-address", "database", "retention-policy", "batch-size", "batch-pending", "batch-timeout", "precision"},
Columns: []string{"enabled", "bind-address", "database", "retention-policy", "batch-size", "batch-pending", "batch-timeout", "precision", "writers"},
}
for _, cc := range c {
@ -109,7 +117,7 @@ func (c Configs) Diagnostics() (*diagnostics.Diagnostics, error) {
continue
}
r := []interface{}{true, cc.BindAddress, cc.Database, cc.RetentionPolicy, cc.BatchSize, cc.BatchPending, cc.BatchTimeout, cc.Precision}
r := []interface{}{true, cc.BindAddress, cc.Database, cc.RetentionPolicy, cc.BatchSize, cc.BatchPending, cc.BatchTimeout, cc.Precision, cc.Writers}
d.AddRow(r)
}

View File

@ -117,10 +117,12 @@ func (s *Service) Open() (err error) {
s.Logger.Info("Started listening on UDP", zap.String("addr", s.config.BindAddress))
s.wg.Add(3)
s.wg.Add(2 + s.config.Writers)
go s.serve()
go s.parser()
go s.writer()
for i := 0; i < s.config.Writers; i++ {
go s.writer()
}
return nil
}