diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index acc888bc2c..67cab267ce 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1027,7 +1027,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ } func (c *Compactor) write(path string, iter KeyIterator) (err error) { - fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return errCompactionInProgress{err: err} } diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index b00ec3c4af..146a9fea17 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -97,6 +97,10 @@ const ( // max length of a key in an index entry (measurement + tags) maxKeyLength = (1 << (2 * 8)) - 1 + + // The threshold amount data written before we periodically fsync a TSM file. This helps avoid + // long pauses due to very large fsyncs at the end of writing a TSM file. + fsyncEvery = 512 * 1024 * 1024 ) var ( @@ -233,7 +237,7 @@ func (e *IndexEntry) String() string { // NewIndexWriter returns a new IndexWriter. func NewIndexWriter() IndexWriter { - buf := bytes.NewBuffer(make([]byte, 0, 4096)) + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) return &directIndex{buf: buf, w: bufio.NewWriter(buf)} } @@ -253,6 +257,9 @@ type indexBlock struct { type directIndex struct { keyCount int size uint32 + + // The bytes written count of when we last fsync'd + lastSync uint32 fd *os.File buf *bytes.Buffer @@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) { return 0, err } - return io.Copy(w, bufio.NewReader(d.fd)) + return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024)) } func (d *directIndex) flush(w io.Writer) (int64, error) { @@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) { d.indexEntries.Type = 0 d.indexEntries.entries = d.indexEntries.entries[:0] + // If this is a disk based index and we've written more than the fsync threshold, + // fsync the data to avoid long pauses later on. + if d.fd != nil && d.size-d.lastSync > fsyncEvery { + if err := d.fd.Sync(); err != nil { + return N, err + } + d.lastSync = d.size + } + return N, nil } @@ -486,13 +502,16 @@ type tsmWriter struct { w *bufio.Writer index IndexWriter n int64 + + // The bytes written count of when we last fsync'd + lastSync int64 } // NewTSMWriter returns a new TSMWriter writing to w. func NewTSMWriter(w io.Writer) (TSMWriter, error) { var index IndexWriter if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") { - f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666) + f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { return nil, err } @@ -612,6 +631,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte) // Increment file position pointer (checksum + block len) t.n += int64(n) + // fsync the file periodically to avoid long pauses with very big files. + if t.n-t.lastSync > fsyncEvery { + if err := t.sync(); err != nil { + return err + } + t.lastSync = t.n + } + if len(t.index.Entries(key)) >= maxIndexEntries { return ErrMaxBlocksExceeded } @@ -646,6 +673,10 @@ func (t *tsmWriter) Flush() error { return err } + return t.sync() +} + +func (t *tsmWriter) sync() error { if f, ok := t.wrapped.(*os.File); ok { if err := f.Sync(); err != nil { return err