Fix typo in func names
parent
e2b1a09ece
commit
aa00ef953a
|
@ -33,6 +33,8 @@ const (
|
|||
idleFlush
|
||||
// startupFlush indicates that we're flushing because the database is starting up
|
||||
startupFlush
|
||||
|
||||
writeBufLen = 32 << 10
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -25,7 +25,7 @@ const (
|
|||
|
||||
WALFilePrefix = "_"
|
||||
|
||||
writeBufLen = 128 << 10 // 128kb
|
||||
defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points)
|
||||
)
|
||||
|
||||
// walEntry is a byte written to a wal segment file that indicates what the following compressed block contains
|
||||
|
@ -101,9 +101,6 @@ func (l *WAL) Open() error {
|
|||
}
|
||||
|
||||
func (l *WAL) WritePoints(points []models.Point) error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
entry := &WriteWALEntry{
|
||||
Points: points,
|
||||
}
|
||||
|
@ -142,6 +139,8 @@ func (l *WAL) ClosedSegments() ([]string, error) {
|
|||
}
|
||||
|
||||
func (l *WAL) writeToLog(entry WALEntry) error {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
// Make sure the log has not been closed
|
||||
select {
|
||||
case <-l.closing:
|
||||
|
@ -165,9 +164,6 @@ func (l *WAL) writeToLog(entry WALEntry) error {
|
|||
}
|
||||
|
||||
func (l *WAL) Delete(keys []string) error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
entry := &DeleteWALEntry{
|
||||
Keys: keys,
|
||||
}
|
||||
|
@ -228,7 +224,7 @@ type WALEntry interface {
|
|||
Type() walEntryType
|
||||
Encode(dst []byte) ([]byte, error)
|
||||
MarshalBinary() ([]byte, error)
|
||||
UnmarshlBinary(b []byte) error
|
||||
UnmarshalBinary(b []byte) error
|
||||
}
|
||||
|
||||
// WriteWALEntry represents a write of points.
|
||||
|
@ -248,7 +244,7 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
|
|||
// Make sure we have enough space in our buf before copying. If not,
|
||||
// grow the buf.
|
||||
if len(bytes)+4 > len(dst)-n {
|
||||
grow := make([]byte, writeBufLen)
|
||||
grow := make([]byte, len(bytes)*2)
|
||||
dst = append(dst, grow...)
|
||||
}
|
||||
n += copy(dst[n:], u32tob(uint32(len(bytes))))
|
||||
|
@ -260,11 +256,11 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
|
|||
|
||||
func (w *WriteWALEntry) MarshalBinary() ([]byte, error) {
|
||||
// Temp buffer to write marshaled points into
|
||||
b := make([]byte, writeBufLen)
|
||||
b := make([]byte, defaultBufLen)
|
||||
return w.Encode(b)
|
||||
}
|
||||
|
||||
func (w *WriteWALEntry) UnmarshlBinary(b []byte) error {
|
||||
func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
|
||||
var i int
|
||||
|
||||
for i < len(b) {
|
||||
|
@ -291,11 +287,11 @@ type DeleteWALEntry struct {
|
|||
}
|
||||
|
||||
func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) {
|
||||
b := make([]byte, writeBufLen)
|
||||
b := make([]byte, defaultBufLen)
|
||||
return w.Encode(b)
|
||||
}
|
||||
|
||||
func (w *DeleteWALEntry) UnmarshlBinary(b []byte) error {
|
||||
func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error {
|
||||
w.Keys = strings.Split(string(b), "\n")
|
||||
return nil
|
||||
}
|
||||
|
@ -304,7 +300,7 @@ func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
|
|||
var n int
|
||||
for _, k := range w.Keys {
|
||||
if len(dst)+1 > len(dst)-n {
|
||||
grow := make([]byte, writeBufLen)
|
||||
grow := make([]byte, defaultBufLen)
|
||||
dst = append(dst, grow...)
|
||||
}
|
||||
|
||||
|
@ -325,13 +321,11 @@ func (w *DeleteWALEntry) Type() walEntryType {
|
|||
type WALSegmentWriter struct {
|
||||
w io.WriteCloser
|
||||
size int
|
||||
b []byte
|
||||
}
|
||||
|
||||
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
|
||||
return &WALSegmentWriter{
|
||||
w: w,
|
||||
b: make([]byte, writeBufLen),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -343,7 +337,7 @@ func (w *WALSegmentWriter) Path() string {
|
|||
}
|
||||
|
||||
func (w *WALSegmentWriter) Write(e WALEntry) error {
|
||||
bytes := getBuf(writeBufLen)
|
||||
bytes := getBuf(defaultBufLen)
|
||||
defer putBuf(bytes)
|
||||
|
||||
b, err := e.Encode(bytes)
|
||||
|
@ -351,25 +345,19 @@ func (w *WALSegmentWriter) Write(e WALEntry) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Temp buf for snappy compression
|
||||
cb := getBuf(len(b))
|
||||
defer putBuf(cb)
|
||||
compressed := snappy.Encode(b, b)
|
||||
|
||||
compressed := snappy.Encode(cb, b)
|
||||
|
||||
// Temp buf for file writing
|
||||
wb := getBuf(len(compressed) + 5)
|
||||
defer putBuf(wb)
|
||||
|
||||
n := copy(wb, []byte{byte(e.Type())})
|
||||
n += copy(wb[n:], u32tob(uint32(len(compressed))))
|
||||
n += copy(wb[n:], compressed)
|
||||
|
||||
if _, err := w.w.Write(wb[:n]); err != nil {
|
||||
return fmt.Errorf("error writing to WAL: %v", err)
|
||||
if _, err := w.w.Write([]byte{byte(e.Type())}); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.w.Write(u32tob(uint32(len(compressed)))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.w.Write(compressed); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.size += n
|
||||
w.size += len(compressed) + 5
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -405,7 +393,7 @@ func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
|
|||
|
||||
// Next indicates if there is a value to read
|
||||
func (r *WALSegmentReader) Next() bool {
|
||||
b := getBuf(writeBufLen)
|
||||
b := getBuf(defaultBufLen)
|
||||
defer putBuf(b)
|
||||
|
||||
// read the type and the length of the entry
|
||||
|
@ -440,7 +428,7 @@ func (r *WALSegmentReader) Next() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
buf := getBuf(writeBufLen)
|
||||
buf := getBuf(defaultBufLen)
|
||||
defer putBuf(buf)
|
||||
|
||||
data, err := snappy.Decode(buf, b[:length])
|
||||
|
@ -459,7 +447,7 @@ func (r *WALSegmentReader) Next() bool {
|
|||
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
|
||||
return true
|
||||
}
|
||||
r.err = r.entry.UnmarshlBinary(data)
|
||||
r.err = r.entry.UnmarshalBinary(data)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue