Switch O_SYNC to periodic fsync

O_SYNC was added with writing TSM files to fix an issue where the
final fsync at the end cause the process to stall.  This ends up
increase disk util to much so this change switches to use multiple
fsyncs while writing the TSM file instead of O_SYNC or one large
one at the end.
pull/9204/head
Jason Wilder 2017-12-06 09:35:24 -07:00
parent fd11e200c8
commit 9c1d7d00a9
2 changed files with 35 additions and 4 deletions

View File

@ -1027,7 +1027,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
}
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}

View File

@ -97,6 +97,10 @@ const (
// max length of a key in an index entry (measurement + tags)
maxKeyLength = (1 << (2 * 8)) - 1
// The threshold amount data written before we periodically fsync a TSM file. This helps avoid
// long pauses due to very large fsyncs at the end of writing a TSM file.
fsyncEvery = 512 * 1024 * 1024
)
var (
@ -233,7 +237,7 @@ func (e *IndexEntry) String() string {
// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
buf := bytes.NewBuffer(make([]byte, 0, 4096))
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
return &directIndex{buf: buf, w: bufio.NewWriter(buf)}
}
@ -253,6 +257,9 @@ type indexBlock struct {
type directIndex struct {
keyCount int
size uint32
// The bytes written count of when we last fsync'd
lastSync uint32
fd *os.File
buf *bytes.Buffer
@ -377,7 +384,7 @@ func (d *directIndex) WriteTo(w io.Writer) (int64, error) {
return 0, err
}
return io.Copy(w, bufio.NewReader(d.fd))
return io.Copy(w, bufio.NewReaderSize(d.fd, 1024*1024))
}
func (d *directIndex) flush(w io.Writer) (int64, error) {
@ -435,6 +442,15 @@ func (d *directIndex) flush(w io.Writer) (int64, error) {
d.indexEntries.Type = 0
d.indexEntries.entries = d.indexEntries.entries[:0]
// If this is a disk based index and we've written more than the fsync threshold,
// fsync the data to avoid long pauses later on.
if d.fd != nil && d.size-d.lastSync > fsyncEvery {
if err := d.fd.Sync(); err != nil {
return N, err
}
d.lastSync = d.size
}
return N, nil
}
@ -486,13 +502,16 @@ type tsmWriter struct {
w *bufio.Writer
index IndexWriter
n int64
// The bytes written count of when we last fsync'd
lastSync int64
}
// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
var index IndexWriter
if fw, ok := w.(*os.File); ok && !strings.HasSuffix(fw.Name(), "01.tsm.tmp") {
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL|os.O_SYNC, 0666)
f, err := os.OpenFile(strings.TrimSuffix(fw.Name(), ".tsm.tmp")+".idx.tmp", os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return nil, err
}
@ -612,6 +631,14 @@ func (t *tsmWriter) WriteBlock(key []byte, minTime, maxTime int64, block []byte)
// Increment file position pointer (checksum + block len)
t.n += int64(n)
// fsync the file periodically to avoid long pauses with very big files.
if t.n-t.lastSync > fsyncEvery {
if err := t.sync(); err != nil {
return err
}
t.lastSync = t.n
}
if len(t.index.Entries(key)) >= maxIndexEntries {
return ErrMaxBlocksExceeded
}
@ -646,6 +673,10 @@ func (t *tsmWriter) Flush() error {
return err
}
return t.sync()
}
func (t *tsmWriter) sync() error {
if f, ok := t.wrapped.(*os.File); ok {
if err := f.Sync(); err != nil {
return err