Add WAL configuration options

pull/3717/head
Paul Dix 2015-08-18 16:59:54 -04:00
parent 30bcd3e0e4
commit 9df3b7d828
14 changed files with 142 additions and 43 deletions

View File

@ -68,6 +68,9 @@ type Server struct {
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, version string) (*Server, error) {
// Construct base meta store and data store.
tsdbStore := tsdb.NewStore(c.Data.Dir)
tsdbStore.EngineOptions.Config = c.Data
s := &Server{
version: version,
err: make(chan error),
@ -77,7 +80,7 @@ func NewServer(c *Config, version string) (*Server, error) {
BindAddress: c.Meta.BindAddress,
MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdb.NewStore(c.Data.Dir),
TSDBStore: tsdbStore,
reportingDisabled: c.ReportingDisabled,
}

View File

@ -35,6 +35,7 @@ func NewServer(c *run.Config) *Server {
Server: srv,
Config: c,
}
s.TSDBStore.EngineOptions.Config = c.Data
configureLogging(&s)
return &s
}
@ -155,6 +156,7 @@ func NewConfig() *run.Config {
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)
c.Data.Dir = MustTempFile()
c.Data.WALDir = MustTempFile()
c.HintedHandoff.Dir = MustTempFile()

View File

@ -36,10 +36,37 @@ reporting-disabled = false
[data]
dir = "/var/opt/influxdb/data"
# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't
# apply to any new shards created after upgrading to a version > 0.9.3.
max-wal-size = 104857600 # Maximum size the WAL can reach before a flush. Defaults to 100MB.
wal-flush-interval = "10m" # Maximum time data can sit in WAL before a flush.
wal-partition-flush-delay = "2s" # The delay time between each WAL partition being flushed.
# These are the WAL settings for the storage engine >= 0.9.3
wal-dir = "/var/opt/influxdb/wal"
wal-enable-logging = true
# When a series in the WAL in-memory cache reaches this size in bytes it is marked as ready to
# flush to the index
# wal-ready-series-size = 25600
# Flush and compact a partition once this ratio of series are over the ready size
# wal-compaction-threshold = 0.6
# Force a flush and compaction if any series in a partition gets above this size in bytes
# wal-max-series-size = 2097152
# Force a flush of all series and full compaction if there have been no writes in this
# amount of time. This is useful for ensuring that shards that are cold for writes don't
# keep a bunch of data cached in memory and in the WAL.
# wal-flush-cold-interval = "10m"
# Force a partition to flush its largest series if it reaches this approximate size in
# bytes. Remember there are 5 partitions so you'll need at least 5x this amount of memory.
# The more memory you have, the bigger this can be.
# wal-partition-size-threshold = 20971520
###
### [cluster]
###

View File

@ -16,13 +16,48 @@ const (
// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
DefaultWALPartitionFlushDelay = 2 * time.Second
// tsdb/engine/wal configuration options
// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024
// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5
// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024
// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 20 * time.Second
// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
)
type Config struct {
Dir string `toml:"dir"`
Dir string `toml:"dir"`
// WAL config options for b1 (introduced in 0.9.2)
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`
// WAL configuration options for bz1 (introduced in 0.9.3)
WALDir string `toml:"wal-dir"`
WALEnableLogging bool `toml:"wal-enable-logging"`
WALReadySeriesSize int `toml:"wal-ready-series-size"`
WALCompactionThreshold float64 `toml:"wal-compaction-threshold"`
WALMaxSeriesSize int `toml:"wal-max-series-size"`
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
}
func NewConfig() Config {
@ -30,5 +65,12 @@ func NewConfig() Config {
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),
WALEnableLogging: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
}
}

View File

@ -102,6 +102,8 @@ type EngineOptions struct {
MaxWALSize int
WALFlushInterval time.Duration
WALPartitionFlushDelay time.Duration
Config Config
}
// NewEngineOptions returns the default options.
@ -111,6 +113,7 @@ func NewEngineOptions() EngineOptions {
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: DefaultWALFlushInterval,
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
Config: NewConfig(),
}
}

View File

@ -28,10 +28,6 @@ var (
const (
// Format is the file format name of this engine.
Format = "bz1"
// WALDir is the suffixe that is put on the path for
// where the WAL files should be kept for a given shard.
WALDir = "_wal"
)
func init() {
@ -72,7 +68,14 @@ type WAL interface {
// NewEngine returns a new instance of Engine.
func NewEngine(path string, opt tsdb.EngineOptions) tsdb.Engine {
// create the writer with a directory of the same name as the shard, but with the wal extension
w := wal.NewLog(filepath.Join(filepath.Dir(path), filepath.Base(path)+WALDir))
w := wal.NewLog(filepath.Join(opt.Config.WALDir, filepath.Base(path)))
w.ReadySeriesSize = opt.Config.WALReadySeriesSize
w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval)
w.MaxSeriesSize = opt.Config.WALMaxSeriesSize
w.CompactionThreshold = opt.Config.WALCompactionThreshold
w.PartitionSizeThreshold = opt.Config.WALPartitionSizeThreshold
w.ReadySeriesSize = opt.Config.WALReadySeriesSize
e := &Engine{
path: path,

View File

@ -45,28 +45,6 @@ const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024
// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024
// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5
// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024
// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 20 * time.Second
// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
// PartitionCount is the number of partitions in the WAL
PartitionCount = 5
@ -171,6 +149,9 @@ type Log struct {
// Index is the database that series data gets flushed to once it gets compacted
// out of the WAL.
Index IndexWriter
// EnableLogging specifies if detailed logs should be output
EnableLogging bool
}
// IndexWriter is an interface for the indexed database the WAL flushes data to
@ -188,12 +169,12 @@ func NewLog(path string) *Log {
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushColdInterval: DefaultFlushColdInterval,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MaxSeriesSize: DefaultMaxSeriesSize,
CompactionThreshold: DefaultCompactionThreshold,
PartitionSizeThreshold: DefaultPartitionSizeThreshold,
ReadySeriesSize: DefaultReadySeriesSize,
MaxSeriesSize: tsdb.DefaultMaxSeriesSize,
CompactionThreshold: tsdb.DefaultCompactionThreshold,
PartitionSizeThreshold: tsdb.DefaultPartitionSizeThreshold,
ReadySeriesSize: tsdb.DefaultReadySeriesSize,
partitionCount: PartitionCount,
flushCheckInterval: defaultFlushCheckInterval,
}
@ -217,6 +198,7 @@ func (l *Log) Open() error {
if err != nil {
return err
}
p.enableLogging = l.EnableLogging
l.partitions[uint8(i)] = p
}
if err := l.openPartitionFiles(); err != nil {
@ -671,8 +653,8 @@ func (l *Log) partition(key []byte) *Partition {
if p == nil {
if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil {
panic(err)
} else {
p.enableLogging = l.EnableLogging
l.partitions[id] = p
}
}
@ -715,6 +697,8 @@ type Partition struct {
// be flushed because it has been idle for writes.
flushColdInterval time.Duration
lastWriteTime time.Time
enableLogging bool
}
func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) {

View File

@ -334,7 +334,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i))
// ensure that as a whole its not ready for flushing yet
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != noFlush {
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != noFlush {
t.Fatal("expected partition 1 to return false from shouldFlush")
}
}
@ -354,7 +354,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
}
// ensure it is marked as should flush because of the threshold
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != thresholdFlush {
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != thresholdFlush {
t.Fatal("expected partition 1 to return true from shouldFlush")
}
@ -451,7 +451,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
time.Sleep(700 * time.Millisecond)
// ensure that as a whole its not ready for flushing yet
if f := log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold); f != noFlush {
if f := log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold); f != noFlush {
t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f)
}

View File

@ -5,6 +5,7 @@ import (
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
"time"
@ -947,6 +948,8 @@ func testStore() *tsdb.Store {
path, _ := ioutil.TempDir("", "")
store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
err := store.Open()
if err != nil {
panic(err)

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"testing"
@ -494,7 +495,9 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
func mustCreateShard(dir string) *tsdb.Shard {
tmpShard := path.Join(dir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(dir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
}

View File

@ -54,7 +54,9 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
@ -84,7 +86,9 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) {
// Restart store.
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
@ -145,7 +149,9 @@ func TestDropSeriesStatement(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
@ -215,7 +221,9 @@ func TestDropMeasurementStatement(t *testing.T) {
validateDrop()
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
validateDrop()
@ -279,7 +287,9 @@ func TestDropDatabase(t *testing.T) {
}
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
executor.ShardMapper = &testShardMapper{store: store}
@ -344,6 +354,8 @@ func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) {
path, _ := ioutil.TempDir("", "")
store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
err := store.Open()
if err != nil {
panic(err)

View File

@ -20,7 +20,10 @@ func TestShardWriteAndIndex(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -66,7 +69,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()
index = tsdb.NewDatabaseIndex()
sh = tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
sh = tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}
@ -87,7 +90,10 @@ func TestShardWriteAddNewField(t *testing.T) {
tmpShard := path.Join(tmpDir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error openeing shard: %s", err.Error())
}

View File

@ -14,9 +14,12 @@ import (
)
func NewStore(path string) *Store {
opts := NewEngineOptions()
opts.Config = NewConfig()
return &Store{
path: path,
EngineOptions: NewEngineOptions(),
EngineOptions: opts,
Logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
}
}

View File

@ -22,6 +22,7 @@ func TestStoreOpen(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -49,6 +50,7 @@ func TestStoreOpenShard(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -82,6 +84,7 @@ func TestStoreOpenShardCreateDelete(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -129,6 +132,7 @@ func TestStoreOpenNotDatabaseDir(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -159,6 +163,7 @@ func TestStoreOpenNotRPDir(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -195,6 +200,7 @@ func TestStoreOpenShardBadShardPath(t *testing.T) {
}
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -221,6 +227,7 @@ func TestStoreEnsureSeriesPersistedInNewShards(t *testing.T) {
defer os.RemoveAll(dir)
s := tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}
@ -255,6 +262,7 @@ func TestStoreEnsureSeriesPersistedInNewShards(t *testing.T) {
s.Close()
s = tsdb.NewStore(dir)
s.EngineOptions.Config.WALDir = filepath.Join(dir, "wal")
if err := s.Open(); err != nil {
t.Fatalf("Store.Open() failed: %v", err)
}