feat: add `--storage-write-timeout` flag to set write request timeouts (#22617)
parent
a01d42db37
commit
2795c600c1
|
@ -456,6 +456,12 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
|
||||||
},
|
},
|
||||||
|
|
||||||
// storage configuration
|
// storage configuration
|
||||||
|
{
|
||||||
|
DestP: &o.StorageConfig.WriteTimeout,
|
||||||
|
Flag: "storage-write-timeout",
|
||||||
|
Default: o.StorageConfig.WriteTimeout,
|
||||||
|
Desc: "The max amount of time the engine will spend completing a write request before cancelling with a timeout.",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
DestP: &o.StorageConfig.Data.WALFsyncDelay,
|
DestP: &o.StorageConfig.Data.WALFsyncDelay,
|
||||||
Flag: "storage-wal-fsync-delay",
|
Flag: "storage-wal-fsync-delay",
|
||||||
|
|
|
@ -1,14 +1,20 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/tsdb"
|
"github.com/influxdata/influxdb/v2/tsdb"
|
||||||
"github.com/influxdata/influxdb/v2/v1/services/precreator"
|
"github.com/influxdata/influxdb/v2/v1/services/precreator"
|
||||||
"github.com/influxdata/influxdb/v2/v1/services/retention"
|
"github.com/influxdata/influxdb/v2/v1/services/retention"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DefaultWriteTimeout is the default timeout for a complete write to succeed.
|
||||||
|
const DefaultWriteTimeout = 10 * time.Second
|
||||||
|
|
||||||
// Config holds the configuration for an Engine.
|
// Config holds the configuration for an Engine.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Data tsdb.Config
|
Data tsdb.Config
|
||||||
|
WriteTimeout time.Duration
|
||||||
|
|
||||||
RetentionService retention.Config
|
RetentionService retention.Config
|
||||||
PrecreatorConfig precreator.Config
|
PrecreatorConfig precreator.Config
|
||||||
|
@ -18,6 +24,7 @@ type Config struct {
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Data: tsdb.NewConfig(),
|
Data: tsdb.NewConfig(),
|
||||||
|
WriteTimeout: DefaultWriteTimeout,
|
||||||
RetentionService: retention.NewConfig(),
|
RetentionService: retention.NewConfig(),
|
||||||
PrecreatorConfig: precreator.NewConfig(),
|
PrecreatorConfig: precreator.NewConfig(),
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
||||||
e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine
|
e.tsdbStore.EngineOptions.EngineVersion = c.Data.Engine
|
||||||
e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index
|
e.tsdbStore.EngineOptions.IndexVersion = c.Data.Index
|
||||||
|
|
||||||
pw := coordinator.NewPointsWriter()
|
pw := coordinator.NewPointsWriter(c.WriteTimeout)
|
||||||
pw.TSDBStore = e.tsdbStore
|
pw.TSDBStore = e.tsdbStore
|
||||||
pw.MetaClient = e.metaClient
|
pw.MetaClient = e.metaClient
|
||||||
e.pointsWriter = pw
|
e.pointsWriter = pw
|
||||||
|
|
|
@ -3,15 +3,10 @@
|
||||||
package coordinator
|
package coordinator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/toml"
|
"github.com/influxdata/influxdb/v2/toml"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefaultWriteTimeout is the default timeout for a complete write to succeed.
|
|
||||||
DefaultWriteTimeout = 10 * time.Second
|
|
||||||
|
|
||||||
// DefaultMaxConcurrentQueries is the maximum number of running queries.
|
// DefaultMaxConcurrentQueries is the maximum number of running queries.
|
||||||
// A value of zero will make the maximum query limit unlimited.
|
// A value of zero will make the maximum query limit unlimited.
|
||||||
DefaultMaxConcurrentQueries = 0
|
DefaultMaxConcurrentQueries = 0
|
||||||
|
@ -27,7 +22,6 @@ const (
|
||||||
|
|
||||||
// Config represents the configuration for the coordinator service.
|
// Config represents the configuration for the coordinator service.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
WriteTimeout toml.Duration `toml:"write-timeout"`
|
|
||||||
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
|
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
|
||||||
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
|
LogQueriesAfter toml.Duration `toml:"log-queries-after"`
|
||||||
MaxSelectPointN int `toml:"max-select-point"`
|
MaxSelectPointN int `toml:"max-select-point"`
|
||||||
|
@ -38,7 +32,6 @@ type Config struct {
|
||||||
// NewConfig returns an instance of Config with defaults.
|
// NewConfig returns an instance of Config with defaults.
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
WriteTimeout: toml.Duration(DefaultWriteTimeout),
|
|
||||||
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
|
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
|
||||||
MaxSelectPointN: DefaultMaxSelectPointN,
|
MaxSelectPointN: DefaultMaxSelectPointN,
|
||||||
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
|
MaxSelectSeriesN: DefaultMaxSelectSeriesN,
|
||||||
|
|
|
@ -1,24 +0,0 @@
|
||||||
package coordinator_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
|
||||||
"github.com/influxdata/influxdb/v2/v1/coordinator"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestConfig_Parse(t *testing.T) {
|
|
||||||
// Parse configuration.
|
|
||||||
var c coordinator.Config
|
|
||||||
if _, err := toml.Decode(`
|
|
||||||
write-timeout = "20s"
|
|
||||||
`, &c); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate configuration.
|
|
||||||
if time.Duration(c.WriteTimeout) != 20*time.Second {
|
|
||||||
t.Fatalf("unexpected write timeout s: %s", c.WriteTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -85,10 +85,10 @@ func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPointsWriter returns a new instance of PointsWriter for a node.
|
// NewPointsWriter returns a new instance of PointsWriter for a node.
|
||||||
func NewPointsWriter() *PointsWriter {
|
func NewPointsWriter(writeTimeout time.Duration) *PointsWriter {
|
||||||
return &PointsWriter{
|
return &PointsWriter{
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
WriteTimeout: DefaultWriteTimeout,
|
WriteTimeout: writeTimeout,
|
||||||
Logger: zap.NewNop(),
|
Logger: zap.NewNop(),
|
||||||
stats: &WriteStatistics{},
|
stats: &WriteStatistics{},
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
|
||||||
return &sg, nil
|
return &sg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c := coordinator.NewPointsWriter()
|
c := coordinator.NewPointsWriter(time.Second)
|
||||||
c.MetaClient = ms
|
c.MetaClient = ms
|
||||||
|
|
||||||
pr := &coordinator.WritePointsRequest{
|
pr := &coordinator.WritePointsRequest{
|
||||||
|
@ -161,7 +161,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
|
||||||
panic("should not get here")
|
panic("should not get here")
|
||||||
}
|
}
|
||||||
|
|
||||||
c := coordinator.NewPointsWriter()
|
c := coordinator.NewPointsWriter(time.Second)
|
||||||
c.MetaClient = ms
|
c.MetaClient = ms
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
pr := &coordinator.WritePointsRequest{
|
pr := &coordinator.WritePointsRequest{
|
||||||
|
@ -217,7 +217,7 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
|
||||||
return &rp.ShardGroups[0], nil
|
return &rp.ShardGroups[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c := coordinator.NewPointsWriter()
|
c := coordinator.NewPointsWriter(time.Second)
|
||||||
c.MetaClient = ms
|
c.MetaClient = ms
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
pr := &coordinator.WritePointsRequest{
|
pr := &coordinator.WritePointsRequest{
|
||||||
|
@ -338,7 +338,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
||||||
return subPoints
|
return subPoints
|
||||||
}
|
}
|
||||||
|
|
||||||
c := coordinator.NewPointsWriter()
|
c := coordinator.NewPointsWriter(time.Second)
|
||||||
c.MetaClient = ms
|
c.MetaClient = ms
|
||||||
c.TSDBStore = store
|
c.TSDBStore = store
|
||||||
c.AddWriteSubscriber(sub.Points())
|
c.AddWriteSubscriber(sub.Points())
|
||||||
|
@ -414,7 +414,7 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
|
||||||
return subPoints
|
return subPoints
|
||||||
}
|
}
|
||||||
|
|
||||||
c := coordinator.NewPointsWriter()
|
c := coordinator.NewPointsWriter(time.Second)
|
||||||
c.MetaClient = ms
|
c.MetaClient = ms
|
||||||
c.TSDBStore = store
|
c.TSDBStore = store
|
||||||
c.AddWriteSubscriber(sub.Points())
|
c.AddWriteSubscriber(sub.Points())
|
||||||
|
|
Loading…
Reference in New Issue