WAL returns current segment ID on write and delete

pull/4911/head
Philip O'Toole 2015-11-25 12:23:10 -08:00
parent 45502d288f
commit 8e7dc3bef9
3 changed files with 23 additions and 16 deletions

View File

@ -120,7 +120,8 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m
}
}
return e.WAL.WritePoints(values)
_, err := e.WAL.WritePoints(values)
return err
}
// DeleteSeries deletes the series from the engine.

View File

@ -103,16 +103,20 @@ func (l *WAL) Open() error {
return nil
}
func (l *WAL) WritePoints(values map[string][]Value) error {
// WritePoints writes the given points to the WAL. Returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should
// be ignored.
func (l *WAL) WritePoints(values map[string][]Value) (int, error) {
entry := &WriteWALEntry{
Values: values,
}
if err := l.writeToLog(entry); err != nil {
return err
id, err := l.writeToLog(entry)
if err != nil {
return -1, err
}
return nil
return id, nil
}
func (l *WAL) ClosedSegments() ([]string, error) {
@ -146,29 +150,29 @@ func (l *WAL) ClosedSegments() ([]string, error) {
return names, nil
}
func (l *WAL) writeToLog(entry WALEntry) error {
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
l.mu.RLock()
// Make sure the log has not been closed
select {
case <-l.closing:
l.mu.RUnlock()
return ErrWALClosed
return -1, ErrWALClosed
default:
}
l.mu.RUnlock()
if err := l.rollSegment(); err != nil {
return fmt.Errorf("error rolling WAL segment: %v", err)
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
l.mu.RLock()
defer l.mu.RUnlock()
if err := l.currentSegmentWriter.Write(entry); err != nil {
return fmt.Errorf("error writing WAL entry: %v", err)
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
return l.currentSegmentWriter.Sync()
return l.currentSegmentID, l.currentSegmentWriter.Sync()
}
func (l *WAL) rollSegment() error {
@ -189,15 +193,17 @@ func (l *WAL) rollSegment() error {
return nil
}
func (l *WAL) Delete(keys []string) error {
// Delete deletes the given keys, returning the segment ID for the operation.
func (l *WAL) Delete(keys []string) (int, error) {
entry := &DeleteWALEntry{
Keys: keys,
}
if err := l.writeToLog(entry); err != nil {
return err
id, err := l.writeToLog(entry)
if err != nil {
return -1, err
}
return nil
return id, nil
}
// Close will finish any flush that is currently in process and close file handles

View File

@ -277,7 +277,7 @@ func TestWAL_ClosedSegments(t *testing.T) {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
if err := w.WritePoints(map[string][]tsm1.Value{
if _, err := w.WritePoints(map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{
tsm1.NewValue(time.Unix(1, 0), 1.1),
},
@ -323,7 +323,7 @@ func TestWAL_Delete(t *testing.T) {
t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp)
}
if err := w.Delete([]string{"cpu"}); err != nil {
if _, err := w.Delete([]string{"cpu"}); err != nil {
t.Fatalf("error writing points: %v", err)
}