diff --git a/storage/engine.go b/storage/engine.go index beba265947..9793d45002 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -520,7 +520,7 @@ func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) } // Add the delete to the WAL to be replayed if there is a crash or shutdown. - if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max); err != nil { + if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max, nil); err != nil { return err } diff --git a/storage/wal/wal.go b/storage/wal/wal.go index fc8fb33703..1c71f9757a 100644 --- a/storage/wal/wal.go +++ b/storage/wal/wal.go @@ -519,16 +519,17 @@ func (l *WAL) CloseSegment() error { // DeleteBucketRange deletes the data inside of the bucket between the two times, returning // the segment ID for the operation. -func (l *WAL) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) (int, error) { +func (l *WAL) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64, pred []byte) (int, error) { if !l.enabled { return -1, nil } entry := &DeleteBucketRangeWALEntry{ - OrgID: orgID, - BucketID: bucketID, - Min: min, - Max: max, + OrgID: orgID, + BucketID: bucketID, + Min: min, + Max: max, + Predicate: pred, } id, err := l.writeToLog(entry) @@ -992,9 +993,10 @@ func (w *WriteWALEntry) Type() WalEntryType { // DeleteBucketRangeWALEntry represents the deletion of data in a bucket. type DeleteBucketRangeWALEntry struct { - OrgID influxdb.ID - BucketID influxdb.ID - Min, Max int64 + OrgID influxdb.ID + BucketID influxdb.ID + Min, Max int64 + Predicate []byte } // MarshalBinary returns a binary representation of the entry in a new byte slice. @@ -1005,7 +1007,7 @@ func (w *DeleteBucketRangeWALEntry) MarshalBinary() ([]byte, error) { // UnmarshalBinary deserializes the byte slice into w. func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error { - if len(b) != 2*influxdb.IDLength+16 { + if len(b) < 2*influxdb.IDLength+16 { return ErrWALCorrupt } @@ -1017,13 +1019,19 @@ func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error { } w.Min = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength : 2*influxdb.IDLength+8])) w.Max = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength+8 : 2*influxdb.IDLength+16])) + w.Predicate = b[2*influxdb.IDLength+16:] + + // Maintain backwards compatability where no predicate bytes means nil + if len(w.Predicate) == 0 { + w.Predicate = nil + } return nil } // MarshalSize returns the number of bytes the entry takes when marshaled. func (w *DeleteBucketRangeWALEntry) MarshalSize() int { - return 2*influxdb.IDLength + 16 + return 2*influxdb.IDLength + 16 + len(w.Predicate) } // Encode converts the entry into a byte stream using b if it is large enough. @@ -1047,6 +1055,7 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) { copy(b[influxdb.IDLength:], bucketID) binary.BigEndian.PutUint64(b[2*influxdb.IDLength:], uint64(w.Min)) binary.BigEndian.PutUint64(b[2*influxdb.IDLength+8:], uint64(w.Max)) + copy(b[2*influxdb.IDLength+16:], w.Predicate) return b[:sz], nil } diff --git a/storage/wal/wal_test.go b/storage/wal/wal_test.go index 0beedf0166..2ccd2ca044 100644 --- a/storage/wal/wal_test.go +++ b/storage/wal/wal_test.go @@ -222,10 +222,11 @@ func TestWALWriter_DeleteBucketRange(t *testing.T) { w := NewWALSegmentWriter(f) entry := &DeleteBucketRangeWALEntry{ - OrgID: influxdb.ID(1), - BucketID: influxdb.ID(2), - Min: 3, - Max: 4, + OrgID: influxdb.ID(1), + BucketID: influxdb.ID(2), + Min: 3, + Max: 4, + Predicate: []byte("predicate"), } if err := w.Write(mustMarshalEntry(entry)); err != nil { @@ -449,10 +450,14 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { func TestDeleteBucketRangeWALEntry_UnmarshalBinary(t *testing.T) { for i := 0; i < 1000; i++ { in := &DeleteBucketRangeWALEntry{ - OrgID: influxdb.ID(rand.Int63()) + 1, - BucketID: influxdb.ID(rand.Int63()) + 1, - Min: rand.Int63(), - Max: rand.Int63(), + OrgID: influxdb.ID(rand.Int63()) + 1, + BucketID: influxdb.ID(rand.Int63()) + 1, + Min: rand.Int63(), + Max: rand.Int63(), + Predicate: make([]byte, rand.Intn(100)), + } + if len(in.Predicate) == 0 { + in.Predicate = nil } b, err := in.MarshalBinary() @@ -473,10 +478,11 @@ func TestDeleteBucketRangeWALEntry_UnmarshalBinary(t *testing.T) { func TestWriteWALSegment_UnmarshalBinary_DeleteBucketRangeWALCorrupt(t *testing.T) { w := &DeleteBucketRangeWALEntry{ - OrgID: influxdb.ID(1), - BucketID: influxdb.ID(2), - Min: 3, - Max: 4, + OrgID: influxdb.ID(1), + BucketID: influxdb.ID(2), + Min: 3, + Max: 4, + Predicate: []byte("predicate"), } b, err := w.MarshalBinary()