Fix data race in WAL
This commit fixes a data race in the WAL, which can occur when writes and deletes are being executed concurrently. The WAL uses a buffer pool of `[]byte` when reading the WAL. WAL entries are unmarshaled into these buffers and passed along to the relevant methods handling the different types of entry (write, delete etc). In the case of deletes, the keys that need to be deleted were being stored for later processing, however these keys were part of the backing array of initial buffer from the pool. As such, those keys could be written to at a future time when handling other parts of the WAL.pull/9580/head
parent
8116cded0c
commit
0fc7643d59
|
@ -901,7 +901,14 @@ func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) {
|
|||
|
||||
// UnmarshalBinary deserializes the byte slice into w.
|
||||
func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error {
|
||||
w.Keys = bytes.Split(b, []byte("\n"))
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// b originates from a pool. Copy what needs to be retained.
|
||||
buf := make([]byte, len(b))
|
||||
copy(buf, b)
|
||||
w.Keys = bytes.Split(buf, []byte("\n"))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -977,7 +984,11 @@ func (w *DeleteRangeWALEntry) UnmarshalBinary(b []byte) error {
|
|||
if i+sz > len(b) {
|
||||
return ErrWALCorrupt
|
||||
}
|
||||
w.Keys = append(w.Keys, b[i:i+sz])
|
||||
|
||||
// b originates from a pool. Copy what needs to be retained.
|
||||
buf := make([]byte, sz)
|
||||
copy(buf, b[i:i+sz])
|
||||
w.Keys = append(w.Keys, buf)
|
||||
i += sz
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -4,9 +4,11 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb/pkg/slices"
|
||||
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
|
@ -685,6 +687,51 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) {
|
||||
examples := []struct {
|
||||
In []string
|
||||
Out [][]byte
|
||||
}{
|
||||
{
|
||||
In: []string{""},
|
||||
Out: nil,
|
||||
},
|
||||
{
|
||||
In: []string{"foo"},
|
||||
Out: [][]byte{[]byte("foo")},
|
||||
},
|
||||
{
|
||||
In: []string{"foo", "bar"},
|
||||
Out: [][]byte{[]byte("foo"), []byte("bar")},
|
||||
},
|
||||
{
|
||||
In: []string{"foo", "bar", "z", "abc"},
|
||||
Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("abc")},
|
||||
},
|
||||
{
|
||||
In: []string{"foo", "bar", "z", "a"},
|
||||
Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("a")},
|
||||
},
|
||||
}
|
||||
|
||||
for i, example := range examples {
|
||||
w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)}
|
||||
b, err := w.MarshalBinary()
|
||||
if err != nil {
|
||||
t.Fatalf("[example %d] unexpected error, got %v", i, err)
|
||||
}
|
||||
|
||||
out := &tsm1.DeleteWALEntry{}
|
||||
if err := out.UnmarshalBinary(b); err != nil {
|
||||
t.Fatalf("[example %d] %v", i, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(example.Out, out.Keys) {
|
||||
t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) {
|
||||
w := &tsm1.DeleteWALEntry{
|
||||
Keys: [][]byte{[]byte("foo"), []byte("bar")},
|
||||
|
|
Loading…
Reference in New Issue