storage: add predicate tracking to the WAL

pull/13371/head
Jeff Wendling 2019-04-29 10:12:57 -06:00 committed by Jeff Wendling
parent 4fb7bf1730
commit e10939b8af
3 changed files with 38 additions and 23 deletions

View File

@ -520,7 +520,7 @@ func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64)
} }
// Add the delete to the WAL to be replayed if there is a crash or shutdown. // Add the delete to the WAL to be replayed if there is a crash or shutdown.
if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max); err != nil { if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max, nil); err != nil {
return err return err
} }

View File

@ -519,16 +519,17 @@ func (l *WAL) CloseSegment() error {
// DeleteBucketRange deletes the data inside of the bucket between the two times, returning // DeleteBucketRange deletes the data inside of the bucket between the two times, returning
// the segment ID for the operation. // the segment ID for the operation.
func (l *WAL) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) (int, error) { func (l *WAL) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64, pred []byte) (int, error) {
if !l.enabled { if !l.enabled {
return -1, nil return -1, nil
} }
entry := &DeleteBucketRangeWALEntry{ entry := &DeleteBucketRangeWALEntry{
OrgID: orgID, OrgID: orgID,
BucketID: bucketID, BucketID: bucketID,
Min: min, Min: min,
Max: max, Max: max,
Predicate: pred,
} }
id, err := l.writeToLog(entry) id, err := l.writeToLog(entry)
@ -992,9 +993,10 @@ func (w *WriteWALEntry) Type() WalEntryType {
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket. // DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
type DeleteBucketRangeWALEntry struct { type DeleteBucketRangeWALEntry struct {
OrgID influxdb.ID OrgID influxdb.ID
BucketID influxdb.ID BucketID influxdb.ID
Min, Max int64 Min, Max int64
Predicate []byte
} }
// MarshalBinary returns a binary representation of the entry in a new byte slice. // MarshalBinary returns a binary representation of the entry in a new byte slice.
@ -1005,7 +1007,7 @@ func (w *DeleteBucketRangeWALEntry) MarshalBinary() ([]byte, error) {
// UnmarshalBinary deserializes the byte slice into w. // UnmarshalBinary deserializes the byte slice into w.
func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error { func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error {
if len(b) != 2*influxdb.IDLength+16 { if len(b) < 2*influxdb.IDLength+16 {
return ErrWALCorrupt return ErrWALCorrupt
} }
@ -1017,13 +1019,19 @@ func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error {
} }
w.Min = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength : 2*influxdb.IDLength+8])) w.Min = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength : 2*influxdb.IDLength+8]))
w.Max = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength+8 : 2*influxdb.IDLength+16])) w.Max = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength+8 : 2*influxdb.IDLength+16]))
w.Predicate = b[2*influxdb.IDLength+16:]
// Maintain backwards compatability where no predicate bytes means nil
if len(w.Predicate) == 0 {
w.Predicate = nil
}
return nil return nil
} }
// MarshalSize returns the number of bytes the entry takes when marshaled. // MarshalSize returns the number of bytes the entry takes when marshaled.
func (w *DeleteBucketRangeWALEntry) MarshalSize() int { func (w *DeleteBucketRangeWALEntry) MarshalSize() int {
return 2*influxdb.IDLength + 16 return 2*influxdb.IDLength + 16 + len(w.Predicate)
} }
// Encode converts the entry into a byte stream using b if it is large enough. // Encode converts the entry into a byte stream using b if it is large enough.
@ -1047,6 +1055,7 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
copy(b[influxdb.IDLength:], bucketID) copy(b[influxdb.IDLength:], bucketID)
binary.BigEndian.PutUint64(b[2*influxdb.IDLength:], uint64(w.Min)) binary.BigEndian.PutUint64(b[2*influxdb.IDLength:], uint64(w.Min))
binary.BigEndian.PutUint64(b[2*influxdb.IDLength+8:], uint64(w.Max)) binary.BigEndian.PutUint64(b[2*influxdb.IDLength+8:], uint64(w.Max))
copy(b[2*influxdb.IDLength+16:], w.Predicate)
return b[:sz], nil return b[:sz], nil
} }

View File

@ -222,10 +222,11 @@ func TestWALWriter_DeleteBucketRange(t *testing.T) {
w := NewWALSegmentWriter(f) w := NewWALSegmentWriter(f)
entry := &DeleteBucketRangeWALEntry{ entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1), OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2), BucketID: influxdb.ID(2),
Min: 3, Min: 3,
Max: 4, Max: 4,
Predicate: []byte("predicate"),
} }
if err := w.Write(mustMarshalEntry(entry)); err != nil { if err := w.Write(mustMarshalEntry(entry)); err != nil {
@ -449,10 +450,14 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
func TestDeleteBucketRangeWALEntry_UnmarshalBinary(t *testing.T) { func TestDeleteBucketRangeWALEntry_UnmarshalBinary(t *testing.T) {
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
in := &DeleteBucketRangeWALEntry{ in := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(rand.Int63()) + 1, OrgID: influxdb.ID(rand.Int63()) + 1,
BucketID: influxdb.ID(rand.Int63()) + 1, BucketID: influxdb.ID(rand.Int63()) + 1,
Min: rand.Int63(), Min: rand.Int63(),
Max: rand.Int63(), Max: rand.Int63(),
Predicate: make([]byte, rand.Intn(100)),
}
if len(in.Predicate) == 0 {
in.Predicate = nil
} }
b, err := in.MarshalBinary() b, err := in.MarshalBinary()
@ -473,10 +478,11 @@ func TestDeleteBucketRangeWALEntry_UnmarshalBinary(t *testing.T) {
func TestWriteWALSegment_UnmarshalBinary_DeleteBucketRangeWALCorrupt(t *testing.T) { func TestWriteWALSegment_UnmarshalBinary_DeleteBucketRangeWALCorrupt(t *testing.T) {
w := &DeleteBucketRangeWALEntry{ w := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1), OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2), BucketID: influxdb.ID(2),
Min: 3, Min: 3,
Max: 4, Max: 4,
Predicate: []byte("predicate"),
} }
b, err := w.MarshalBinary() b, err := w.MarshalBinary()