diff --git a/tsdb/engine/tsm1/log.go b/tsdb/engine/tsm1/log.go index c4cff15dc1..46b059a50f 100644 --- a/tsdb/engine/tsm1/log.go +++ b/tsdb/engine/tsm1/log.go @@ -33,6 +33,8 @@ const ( idleFlush // startupFlush indicates that we're flushing because the database is starting up startupFlush + + writeBufLen = 32 << 10 ) const ( diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 67bb18cf3d..bb352056eb 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -25,7 +25,7 @@ const ( WALFilePrefix = "_" - writeBufLen = 128 << 10 // 128kb + defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points) ) // walEntry is a byte written to a wal segment file that indicates what the following compressed block contains @@ -101,9 +101,6 @@ func (l *WAL) Open() error { } func (l *WAL) WritePoints(points []models.Point) error { - l.mu.Lock() - defer l.mu.Unlock() - entry := &WriteWALEntry{ Points: points, } @@ -142,6 +139,8 @@ func (l *WAL) ClosedSegments() ([]string, error) { } func (l *WAL) writeToLog(entry WALEntry) error { + l.mu.RLock() + defer l.mu.RUnlock() // Make sure the log has not been closed select { case <-l.closing: @@ -165,9 +164,6 @@ func (l *WAL) writeToLog(entry WALEntry) error { } func (l *WAL) Delete(keys []string) error { - l.mu.Lock() - defer l.mu.Unlock() - entry := &DeleteWALEntry{ Keys: keys, } @@ -228,7 +224,7 @@ type WALEntry interface { Type() walEntryType Encode(dst []byte) ([]byte, error) MarshalBinary() ([]byte, error) - UnmarshlBinary(b []byte) error + UnmarshalBinary(b []byte) error } // WriteWALEntry represents a write of points. @@ -248,7 +244,7 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { // Make sure we have enough space in our buf before copying. If not, // grow the buf. if len(bytes)+4 > len(dst)-n { - grow := make([]byte, writeBufLen) + grow := make([]byte, len(bytes)*2) dst = append(dst, grow...) } n += copy(dst[n:], u32tob(uint32(len(bytes)))) @@ -260,11 +256,11 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { func (w *WriteWALEntry) MarshalBinary() ([]byte, error) { // Temp buffer to write marshaled points into - b := make([]byte, writeBufLen) + b := make([]byte, defaultBufLen) return w.Encode(b) } -func (w *WriteWALEntry) UnmarshlBinary(b []byte) error { +func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { var i int for i < len(b) { @@ -291,11 +287,11 @@ type DeleteWALEntry struct { } func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) { - b := make([]byte, writeBufLen) + b := make([]byte, defaultBufLen) return w.Encode(b) } -func (w *DeleteWALEntry) UnmarshlBinary(b []byte) error { +func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error { w.Keys = strings.Split(string(b), "\n") return nil } @@ -304,7 +300,7 @@ func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) { var n int for _, k := range w.Keys { if len(dst)+1 > len(dst)-n { - grow := make([]byte, writeBufLen) + grow := make([]byte, defaultBufLen) dst = append(dst, grow...) } @@ -325,13 +321,11 @@ func (w *DeleteWALEntry) Type() walEntryType { type WALSegmentWriter struct { w io.WriteCloser size int - b []byte } func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter { return &WALSegmentWriter{ w: w, - b: make([]byte, writeBufLen), } } @@ -343,7 +337,7 @@ func (w *WALSegmentWriter) Path() string { } func (w *WALSegmentWriter) Write(e WALEntry) error { - bytes := getBuf(writeBufLen) + bytes := getBuf(defaultBufLen) defer putBuf(bytes) b, err := e.Encode(bytes) @@ -351,25 +345,19 @@ func (w *WALSegmentWriter) Write(e WALEntry) error { return err } - // Temp buf for snappy compression - cb := getBuf(len(b)) - defer putBuf(cb) + compressed := snappy.Encode(b, b) - compressed := snappy.Encode(cb, b) - - // Temp buf for file writing - wb := getBuf(len(compressed) + 5) - defer putBuf(wb) - - n := copy(wb, []byte{byte(e.Type())}) - n += copy(wb[n:], u32tob(uint32(len(compressed)))) - n += copy(wb[n:], compressed) - - if _, err := w.w.Write(wb[:n]); err != nil { - return fmt.Errorf("error writing to WAL: %v", err) + if _, err := w.w.Write([]byte{byte(e.Type())}); err != nil { + return err + } + if _, err := w.w.Write(u32tob(uint32(len(compressed)))); err != nil { + return err + } + if _, err := w.w.Write(compressed); err != nil { + return err } - w.size += n + w.size += len(compressed) + 5 return nil } @@ -405,7 +393,7 @@ func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader { // Next indicates if there is a value to read func (r *WALSegmentReader) Next() bool { - b := getBuf(writeBufLen) + b := getBuf(defaultBufLen) defer putBuf(b) // read the type and the length of the entry @@ -440,7 +428,7 @@ func (r *WALSegmentReader) Next() bool { return true } - buf := getBuf(writeBufLen) + buf := getBuf(defaultBufLen) defer putBuf(buf) data, err := snappy.Decode(buf, b[:length]) @@ -459,7 +447,7 @@ func (r *WALSegmentReader) Next() bool { r.err = fmt.Errorf("unknown wal entry type: %v", entryType) return true } - r.err = r.entry.UnmarshlBinary(data) + r.err = r.entry.UnmarshalBinary(data) return true }