Merge pull request #9204 from influxdata/jw-tsm-sync
Fix higher disk utilization regressionpull/9211/head
commit
f250b64721
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -21,6 +21,18 @@
|
|||
- [#9144](https://github.com/influxdata/influxdb/issues/9144): Fix query compilation so multiple nested distinct calls is allowable
|
||||
- [#8789](https://github.com/influxdata/influxdb/issues/8789): Fix CLI to allow quoted database names in use statement
|
||||
|
||||
## v1.4.3 [unreleased]
|
||||
|
||||
### Configuration Changes
|
||||
|
||||
#### `[data]` Section
|
||||
|
||||
The default value for `cache-snapshot-memory-size` has been changed from `25m` to `256m`.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
- [#9201](https://github.com/influxdata/influxdb/issues/9201): Fix higher disk i/o utilization
|
||||
|
||||
## v1.4.2 [2017-11-15]
|
||||
|
||||
Refer to the 1.4.0 breaking changes section if `influxd` fails to start with an `incompatible tsi1 index MANIFEST` error.
|
||||
|
|
|
@ -79,7 +79,7 @@
|
|||
# snapshot the cache and write it to a TSM file, freeing up memory
|
||||
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
|
||||
# Values without a size suffix are in bytes.
|
||||
# cache-snapshot-memory-size = "25m"
|
||||
# cache-snapshot-memory-size = "256m"
|
||||
|
||||
# CacheSnapshotWriteColdDuration is the length of time at
|
||||
# which the engine will snapshot the cache and write it to
|
||||
|
|
|
@ -26,7 +26,7 @@ const (
|
|||
|
||||
// DefaultCacheSnapshotMemorySize is the size at which the engine will
|
||||
// snapshot the cache and write it to a TSM file, freeing up memory
|
||||
DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB
|
||||
DefaultCacheSnapshotMemorySize = 256 * 1024 * 1024 // 256MB
|
||||
|
||||
// DefaultCacheSnapshotWriteColdDuration is the length of time at which
|
||||
// the engine will snapshot the cache and write it to a new TSM file if
|
||||
|
|
|
@ -263,16 +263,9 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
|
|||
minGenerations = level + 1
|
||||
}
|
||||
|
||||
// Each compaction group should run against 4 generations. For level 1, since these
|
||||
// can get created much more quickly, bump the grouping to 8 to keep file counts lower.
|
||||
groupSize := 4
|
||||
if level == 1 {
|
||||
groupSize = 8
|
||||
}
|
||||
|
||||
var cGroups []CompactionGroup
|
||||
for _, group := range levelGroups {
|
||||
for _, chunk := range group.chunk(groupSize) {
|
||||
for _, chunk := range group.chunk(4) {
|
||||
var cGroup CompactionGroup
|
||||
var hasTombstones bool
|
||||
for _, gen := range chunk {
|
||||
|
@ -1027,16 +1020,28 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
|
|||
}
|
||||
|
||||
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
|
||||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
||||
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return errCompactionInProgress{err: err}
|
||||
}
|
||||
|
||||
// Create the write for the new TSM file.
|
||||
w, err := NewTSMWriter(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
var w TSMWriter
|
||||
|
||||
// Use a disk based TSM buffer if it looks like we might create a big index
|
||||
// in memory.
|
||||
if iter.EstimatedIndexSize() > 64*1024*1024 {
|
||||
w, err = NewTSMWriterWithDiskBuffer(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
w, err = NewTSMWriter(fd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
closeErr := w.Close()
|
||||
if err == nil {
|
||||
|
@ -1145,6 +1150,10 @@ type KeyIterator interface {
|
|||
|
||||
// Err returns any errors encountered during iteration.
|
||||
Err() error
|
||||
|
||||
// EstimatedIndexSize returns the estimated size of the index that would
|
||||
// be required to store all the series and entries in the KeyIterator.
|
||||
EstimatedIndexSize() int
|
||||
}
|
||||
|
||||
// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces
|
||||
|
@ -1278,6 +1287,14 @@ func (k *tsmKeyIterator) hasMergedValues() bool {
|
|||
len(k.mergedBooleanValues) > 0
|
||||
}
|
||||
|
||||
func (k *tsmKeyIterator) EstimatedIndexSize() int {
|
||||
var size uint32
|
||||
for _, r := range k.readers {
|
||||
size += r.IndexSize()
|
||||
}
|
||||
return int(size) / len(k.readers)
|
||||
}
|
||||
|
||||
// Next returns true if there are any values remaining in the iterator.
|
||||
func (k *tsmKeyIterator) Next() bool {
|
||||
RETRY:
|
||||
|
@ -1516,6 +1533,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
|
|||
return cki
|
||||
}
|
||||
|
||||
func (c *cacheKeyIterator) EstimatedIndexSize() int {
|
||||
// We return 0 here since we already have all the entries in memory to write an index.
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *cacheKeyIterator) encode() {
|
||||
concurrency := runtime.GOMAXPROCS(0)
|
||||
n := len(c.ready)
|
||||
|
|
|
@ -1792,14 +1792,6 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
|||
Path: "08-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
tsm1.FileStat{
|
||||
Path: "09-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
tsm1.FileStat{
|
||||
Path: "10-01.tsm1",
|
||||
Size: 1 * 1024 * 1024,
|
||||
},
|
||||
}
|
||||
|
||||
cp := tsm1.NewDefaultPlanner(
|
||||
|
@ -1810,8 +1802,8 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) {
|
|||
}, tsdb.DefaultCompactFullWriteColdDuration,
|
||||
)
|
||||
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
|
||||
expFiles2 := []tsm1.FileStat{data[8], data[9]}
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
|
||||
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
|
||||
|
||||
tsm := cp.PlanLevel(1)
|
||||
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
||||
|
@ -1887,8 +1879,8 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) {
|
|||
}, tsdb.DefaultCompactFullWriteColdDuration,
|
||||
)
|
||||
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}
|
||||
expFiles2 := []tsm1.FileStat{data[8], data[9]}
|
||||
expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]}
|
||||
expFiles2 := []tsm1.FileStat{data[4], data[5], data[6], data[7]}
|
||||
|
||||
tsm := cp.PlanLevel(1)
|
||||
if exp, got := len(expFiles1), len(tsm[0]); got != exp {
|
||||
|
|
|
@ -1509,7 +1509,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
|
|||
}
|
||||
|
||||
func (e *Engine) compact(quit <-chan struct{}) {
|
||||
t := time.NewTicker(time.Second)
|
||||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
|
|
|
@ -97,6 +97,10 @@ const (
|
|||
|
||||
// max length of a key in an index entry (measurement + tags)
|
||||
maxKeyLength = (1 << (2 * 8)) - 1
|
||||
|
||||
// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
|
||||
// long pauses due to very large fsyncs at the end of writing a TSM file.
|
||||
fsyncEvery = 512 * 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {
|
|||
|
||||
// NewIndexWriter returns a new IndexWriter.
|
||||
func NewIndexWriter() IndexWriter {
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 4096))
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
|
||||
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
|
||||
}
|
||||
|
||||
|
@ -253,6 +257,9 @@ type indexBlock struct {
|
|||
type directIndex struct {
|
||||
keyCount int
|
||||
size uint32
|
||||
|
||||
// The bytes written count of when we last fsync'd
|
||||
lastSync uint32
|
||||
fd *os.File
|
||||
buf *bytes.Buffer
|
||||
|
||||
|
@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
return io.Copy(w, bufio.NewReader(d.fd))
|
||||
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
|
||||
}
|
||||
|
||||
func (d *directIndex) flush(w io.Writer) (int64, error) {
|
||||
|
@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
|
|||
d.indexEntries.Type = 0
|
||||
d.indexEntries.entries = d.indexEntries.entries[:0]
|
||||
|
||||
// If this is a disk based index and we've written more than the fsync threshold,
|
||||
// fsync the data to avoid long pauses later on.
|
||||
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
|
||||
if err := d.fd.Sync(); err != nil {
|
||||
return N, err
|
||||
}
|
||||
d.lastSync = d.size
|
||||
}
|
||||
|
||||
return N, nil
|
||||
|
||||
}
|
||||
|
@ -486,18 +502,30 @@ type tsmWriter struct {
|
|||
w *bufio.Writer
|
||||
index IndexWriter
|
||||
n int64
|
||||
|
||||
// The bytes written count of when we last fsync'd
|
||||
lastSync int64
|
||||
}
|
||||
|
||||
// NewTSMWriter returns a new TSMWriter writing to w.
|
||||
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
|
||||
index := NewIndexWriter()
|
||||
return &tsmWriter{wrapped: w, w: bufio.NewWriterSize(w, 1024*1024), index: index}, nil
|
||||
}
|
||||
|
||||
// NewTSMWriterWithDiskBuffer returns a new TSMWriter writing to w and will use a disk
|
||||
// based buffer for the TSM index if possible.
|
||||
func NewTSMWriterWithDiskBuffer(w io.Writer) (TSMWriter, error) {
|
||||
var index IndexWriter
|
||||
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
|
||||
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
|
||||
// Make sure is a File so we can write the temp index alongside it.
|
||||
if fw, ok := w.(*os.File); ok {
|
||||
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
index = NewDiskIndexWriter(f)
|
||||
} else {
|
||||
// w is not a file, just use an inmem index
|
||||
index = NewIndexWriter()
|
||||
}
|
||||
|
||||
|
@ -612,6 +640,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
|
|||
// Increment file position pointer (checksum + block len)
|
||||
t.n += int64(n)
|
||||
|
||||
// fsync the file periodically to avoid long pauses with very big files.
|
||||
if t.n-t.lastSync > fsyncEvery {
|
||||
if err := t.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.lastSync = t.n
|
||||
}
|
||||
|
||||
if len(t.index.Entries(key)) >= maxIndexEntries {
|
||||
return ErrMaxBlocksExceeded
|
||||
}
|
||||
|
@ -646,6 +682,10 @@ func (t *tsmWriter) Flush() error {
|
|||
return err
|
||||
}
|
||||
|
||||
return t.sync()
|
||||
}
|
||||
|
||||
func (t *tsmWriter) sync() error {
|
||||
if f, ok := t.wrapped.(*os.File); ok {
|
||||
if err := f.Sync(); err != nil {
|
||||
return err
|
||||
|
|
|
@ -169,6 +169,12 @@ func (s *Store) loadShards() error {
|
|||
lim := s.EngineOptions.Config.MaxConcurrentCompactions
|
||||
if lim == 0 {
|
||||
lim = runtime.GOMAXPROCS(0) / 2 // Default to 50% of cores for compactions
|
||||
|
||||
// On systems with more cores, cap at 4 to reduce disk utilization
|
||||
if lim > 4 {
|
||||
lim = 4
|
||||
}
|
||||
|
||||
if lim < 1 {
|
||||
lim = 1
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue