Allow WAL inter-flush time to be configurable
parent
569d23c269
commit
ca86fa2633
|
@ -83,6 +83,7 @@ func NewServer(c *Config, version string) (*Server, error) {
|
|||
// Copy TSDB configuration.
|
||||
s.TSDBStore.MaxWALSize = c.Data.MaxWALSize
|
||||
s.TSDBStore.WALFlushInterval = time.Duration(c.Data.WALFlushInterval)
|
||||
s.TSDBStore.WALPartitionFlushDelay = time.Duration(c.Data.WALPartitionFlushDelay)
|
||||
|
||||
// Initialize query executor.
|
||||
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
|
||||
|
|
|
@ -13,17 +13,22 @@ const (
|
|||
// DefaultWALFlushInterval is the frequency the WAL will get flushed if
|
||||
// it doesn't reach its size threshold.
|
||||
DefaultWALFlushInterval = 10 * time.Minute
|
||||
|
||||
// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
|
||||
DefaultWALPartitionFlushDelay = 2 * time.Second
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Dir string `toml:"dir"`
|
||||
MaxWALSize int `toml:"max-wal-size"`
|
||||
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
|
||||
Dir string `toml:"dir"`
|
||||
MaxWALSize int `toml:"max-wal-size"`
|
||||
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
|
||||
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
|
||||
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,8 +69,9 @@ type Shard struct {
|
|||
logger *log.Logger
|
||||
|
||||
// The maximum size and time thresholds for flushing the WAL.
|
||||
MaxWALSize int
|
||||
WALFlushInterval time.Duration
|
||||
MaxWALSize int
|
||||
WALFlushInterval time.Duration
|
||||
WALPartitionFlushDelay time.Duration
|
||||
|
||||
// The writer used by the logger.
|
||||
LogOutput io.Writer
|
||||
|
@ -84,8 +85,9 @@ func NewShard(index *DatabaseIndex, path string) *Shard {
|
|||
flush: make(chan struct{}, 1),
|
||||
measurementFields: make(map[string]*measurementFields),
|
||||
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: DefaultWALFlushInterval,
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: DefaultWALFlushInterval,
|
||||
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
|
||||
|
||||
LogOutput: os.Stderr,
|
||||
}
|
||||
|
@ -372,7 +374,7 @@ func (s *Shard) Flush() error {
|
|||
}
|
||||
|
||||
// Wait momentarily so other threads can process.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(s.WALPartitionFlushDelay)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
|
|
|
@ -142,6 +142,7 @@ func TestShard_Autoflush(t *testing.T) {
|
|||
sh := NewShard(NewDatabaseIndex(), filepath.Join(path, "shard"))
|
||||
sh.MaxWALSize = 1024 // 1KB
|
||||
sh.WALFlushInterval = 1 * time.Hour
|
||||
sh.WALPartitionFlushDelay = 1 * time.Millisecond
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -160,7 +161,7 @@ func TestShard_Autoflush(t *testing.T) {
|
|||
}
|
||||
|
||||
// Wait for autoflush.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Make sure we have series buckets created outside the WAL.
|
||||
if n, err := sh.SeriesCount(); err != nil {
|
||||
|
@ -179,6 +180,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
|
|||
sh := NewShard(NewDatabaseIndex(), filepath.Join(path, "shard"))
|
||||
sh.MaxWALSize = 10 * 1024 * 1024 // 10MB
|
||||
sh.WALFlushInterval = 100 * time.Millisecond
|
||||
sh.WALPartitionFlushDelay = 1 * time.Millisecond
|
||||
if err := sh.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -197,7 +199,7 @@ func TestShard_Autoflush_FlushInterval(t *testing.T) {
|
|||
}
|
||||
|
||||
// Wait for time-based flush.
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Make sure we have series buckets created outside the WAL.
|
||||
if n, err := sh.SeriesCount(); err != nil {
|
||||
|
|
|
@ -16,10 +16,11 @@ import (
|
|||
|
||||
func NewStore(path string) *Store {
|
||||
return &Store{
|
||||
path: path,
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: DefaultWALFlushInterval,
|
||||
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
||||
path: path,
|
||||
MaxWALSize: DefaultMaxWALSize,
|
||||
WALFlushInterval: DefaultWALFlushInterval,
|
||||
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
|
||||
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,8 +35,9 @@ type Store struct {
|
|||
databaseIndexes map[string]*DatabaseIndex
|
||||
shards map[uint64]*Shard
|
||||
|
||||
MaxWALSize int
|
||||
WALFlushInterval time.Duration
|
||||
MaxWALSize int
|
||||
WALFlushInterval time.Duration
|
||||
WALPartitionFlushDelay time.Duration
|
||||
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
@ -104,6 +106,7 @@ func (s *Store) newShard(index *DatabaseIndex, path string) *Shard {
|
|||
sh := NewShard(index, path)
|
||||
sh.MaxWALSize = s.MaxWALSize
|
||||
sh.WALFlushInterval = s.WALFlushInterval
|
||||
sh.WALPartitionFlushDelay = s.WALPartitionFlushDelay
|
||||
return sh
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue