Implemment WAL write/delete functions

pull/4845/head
Jason Wilder 2015-11-18 15:42:48 -07:00
parent afc0d5bfb9
commit e2b1a09ece
2 changed files with 160 additions and 63 deletions

View File

@ -12,7 +12,6 @@ import (
"sync"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
"github.com/golang/snappy"
)
@ -42,13 +41,13 @@ var ErrWALClosed = fmt.Errorf("WAL closed")
var bufPool sync.Pool
type WAL struct {
mu sync.RWMutex
path string
// write variables
writeLock sync.Mutex
currentSegmentID int
currentSegmentFile *os.File
currentSegmentSize int
currentSegmentID int
currentSegmentWriter *WALSegmentWriter
// cache and flush variables
closing chan struct{}
@ -60,9 +59,6 @@ type WAL struct {
// SegmentSize is the file size at which a segment file will be rotated
SegmentSize int
// MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected
MaxMemorySizeThreshold int
// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool
}
@ -72,21 +68,27 @@ func NewWAL(path string) *WAL {
path: path,
// these options should be overriden by any options in the config
LogOutput: os.Stderr,
SegmentSize: DefaultSegmentSize,
MaxMemorySizeThreshold: tsdb.DefaultMaxMemorySizeThreshold,
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),
LogOutput: os.Stderr,
SegmentSize: DefaultSegmentSize,
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),
}
}
// Path returns the path the log was initialized with.
func (l *WAL) Path() string { return l.path }
func (l *WAL) Path() string {
l.mu.RLock()
defer l.mu.RUnlock()
return l.path
}
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.LoggingEnabled {
l.logger.Printf("tsm1 WAL starting with %d max memory size threshold\n", l.MaxMemorySizeThreshold)
l.logger.Printf("tsm1 WAL starting with %d segment size\n", l.SegmentSize)
l.logger.Printf("tsm1 WAL writing to %s\n", l.path)
}
if err := os.MkdirAll(l.path, 0777); err != nil {
@ -99,6 +101,9 @@ func (l *WAL) Open() error {
}
func (l *WAL) WritePoints(points []models.Point) error {
l.mu.Lock()
defer l.mu.Unlock()
entry := &WriteWALEntry{
Points: points,
}
@ -110,10 +115,33 @@ func (l *WAL) WritePoints(points []models.Point) error {
return nil
}
func (l *WAL) writeToLog(entry WALEntry) error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
func (l *WAL) ClosedSegments() ([]string, error) {
l.mu.RLock()
defer l.mu.RUnlock()
// Not loading files from disk so nothing to do
if l.path == "" {
return nil, nil
}
files, err := l.segmentFileNames()
if err != nil {
return nil, err
}
var names []string
for _, fn := range files {
// Skip the active segment
if l.currentSegmentWriter != nil && fn == l.currentSegmentWriter.Path() {
continue
}
names = append(names, fn)
}
return names, nil
}
func (l *WAL) writeToLog(entry WALEntry) error {
// Make sure the log has not been closed
select {
case <-l.closing:
@ -121,7 +149,7 @@ func (l *WAL) writeToLog(entry WALEntry) error {
default:
}
if l.currentSegmentFile == nil || l.currentSegmentSize > DefaultSegmentSize {
if l.currentSegmentWriter == nil || l.currentSegmentWriter.Size() > DefaultSegmentSize {
if err := l.newSegmentFile(); err != nil {
// A drop database or RP call could trigger this error if writes were in-flight
// when the drop statement executes.
@ -129,59 +157,37 @@ func (l *WAL) writeToLog(entry WALEntry) error {
}
}
bytes, err := entry.MarshalBinary()
if err != nil {
return fmt.Errorf("error marshaling WAL entry: %v", err)
if err := l.currentSegmentWriter.Write(entry); err != nil {
return fmt.Errorf("error writing WAL entry: %v", err)
}
b := append([]byte{byte(entry.Type())})
b = append(b, u32tob(uint32(len(bytes)))...)
b = append(b, bytes...)
if _, err := l.currentSegmentFile.Write(b); err != nil {
return fmt.Errorf("error writing to WAL: %v", err)
}
l.currentSegmentSize += 5 + len(b)
return l.currentSegmentFile.Sync()
return l.currentSegmentWriter.Sync()
}
func (l *WAL) DeleteMeasurement(measurement string, keys []string) error {
d := &deleteData{MeasurementName: measurement, Keys: keys}
err := l.writeDeleteEntry(d)
if err != nil {
func (l *WAL) Delete(keys []string) error {
l.mu.Lock()
defer l.mu.Unlock()
entry := &DeleteWALEntry{
Keys: keys,
}
if err := l.writeToLog(entry); err != nil {
return err
}
return nil
}
func (l *WAL) writeDeleteEntry(d *deleteData) error {
panic("not implemented")
// js, err := json.Marshal(d)
// if err != nil {
// return err
// }
// data := snappy.Encode(nil, js)
// return l.writeToLog(deleteEntry, data)
}
func (l *WAL) DeleteSeries(keys []string) error {
return l.writeDeleteEntry(&deleteData{Keys: keys})
}
// Close will finish any flush that is currently in process and close file handles
func (l *WAL) Close() error {
l.writeLock.Lock()
defer l.writeLock.Unlock()
l.mu.Lock()
defer l.mu.Unlock()
// Close, but don't set to nil so future goroutines can still be signaled
close(l.closing)
if l.currentSegmentFile != nil {
l.currentSegmentFile.Close()
l.currentSegmentFile = nil
if l.currentSegmentWriter != nil {
l.currentSegmentWriter.Close()
}
return nil
@ -200,8 +206,8 @@ func (l *WAL) segmentFileNames() ([]string, error) {
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log
func (l *WAL) newSegmentFile() error {
l.currentSegmentID++
if l.currentSegmentFile != nil {
if err := l.currentSegmentFile.Close(); err != nil {
if l.currentSegmentWriter != nil {
if err := l.currentSegmentWriter.Close(); err != nil {
return err
}
}
@ -211,8 +217,7 @@ func (l *WAL) newSegmentFile() error {
if err != nil {
return err
}
l.currentSegmentSize = 0
l.currentSegmentFile = ff
l.currentSegmentWriter = NewWALSegmentWriter(ff)
return nil
}
@ -330,6 +335,13 @@ func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
}
}
func (w *WALSegmentWriter) Path() string {
if f, ok := w.w.(*os.File); ok {
return f.Name()
}
return ""
}
func (w *WALSegmentWriter) Write(e WALEntry) error {
bytes := getBuf(writeBufLen)
defer putBuf(bytes)
@ -359,8 +371,7 @@ func (w *WALSegmentWriter) Write(e WALEntry) error {
w.size += n
// TODO: Move this up to the WAL
return w.Sync()
return nil
}
// Sync flushes the file systems in-memory copy of recently written data to disk.

View File

@ -236,6 +236,92 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) {
}
}
func TestWAL_ClosedSegments(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
w := tsm1.NewWAL(dir)
if err := w.Open(); err != nil {
t.Fatalf("error opening WAL: %v", err)
}
files, err := w.ClosedSegments()
if err != nil {
t.Fatalf("error getting closed segments: %v", err)
}
if got, exp := len(files), 0; got != exp {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
if err := w.WritePoints([]models.Point{
parsePoint("cpu,host=A value=1.1 1000000000"),
}); err != nil {
t.Fatalf("error writing points: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("error closing wal: %v", err)
}
// Re-open the WAL
w = tsm1.NewWAL(dir)
defer w.Close()
if err := w.Open(); err != nil {
t.Fatalf("error opening WAL: %v", err)
}
files, err = w.ClosedSegments()
if err != nil {
t.Fatalf("error getting closed segments: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
}
func TestWAL_Delete(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
w := tsm1.NewWAL(dir)
if err := w.Open(); err != nil {
t.Fatalf("error opening WAL: %v", err)
}
files, err := w.ClosedSegments()
if err != nil {
t.Fatalf("error getting closed segments: %v", err)
}
if got, exp := len(files), 0; got != exp {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
if err := w.Delete([]string{"cpu"}); err != nil {
t.Fatalf("error writing points: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("error closing wal: %v", err)
}
// Re-open the WAL
w = tsm1.NewWAL(dir)
defer w.Close()
if err := w.Open(); err != nil {
t.Fatalf("error opening WAL: %v", err)
}
files, err = w.ClosedSegments()
if err != nil {
t.Fatalf("error getting closed segments: %v", err)
}
if got, exp := len(files), 1; got != exp {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
}
func BenchmarkWALSegmentWriter(b *testing.B) {
points := make([]models.Point, 5000)
for i := range points {