Fix deletes not kept if shutdown before flush on tsm1

pull/4308/head
Paul Dix 2015-10-05 17:21:07 -04:00
parent a0841c4508
commit ae36c57110
4 changed files with 176 additions and 60 deletions

View File

@ -13,6 +13,7 @@ import (
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"syscall"
"time"
@ -47,6 +48,10 @@ const (
// are removed after the file has been synced and is safe for use. If a file
// has an associated checkpoint file, it wasn't safely written and both should be removed
CheckpointExtension = "check"
// keyFieldSeparator separates the series key from the field name in the composite key
// that identifies a specific field in series
keyFieldSeparator = "#!~#"
)
type TimePrecision uint8
@ -115,8 +120,12 @@ type Engine struct {
lastCompactionTime time.Time
// deletes is a map of keys that are deleted, but haven't yet been
// compacted and flushed
deletes map[uint64]bool
// compacted and flushed. They map the ID to the corresponding key
deletes map[uint64]string
// deleteMeasurements is a map of the measurements that are deleted
// but haven't yet been compacted and flushed
deleteMeasurements map[string]bool
collisionsLock sync.RWMutex
collisions map[string]uint64
@ -240,7 +249,8 @@ func (e *Engine) Open() error {
return err
}
e.deletes = make(map[uint64]bool)
e.deletes = make(map[uint64]string)
e.deleteMeasurements = make(map[string]bool)
// mark the last compaction as now so it doesn't try to compact while
// flushing the WAL on load
@ -278,6 +288,7 @@ func (e *Engine) Close() error {
e.currentFileID = 0
e.collisions = nil
e.deletes = nil
e.deleteMeasurements = nil
return nil
}
@ -440,10 +451,16 @@ func (e *Engine) MarkDeletes(keys []string) {
e.filesLock.Lock()
defer e.filesLock.Unlock()
for _, k := range keys {
e.deletes[e.keyToID(k)] = true
e.deletes[e.keyToID(k)] = k
}
}
func (e *Engine) MarkMeasurementDelete(name string) {
e.filesLock.Lock()
defer e.filesLock.Unlock()
e.deleteMeasurements[name] = true
}
// filesAndLock returns the data files that match the given range and
// ensures that the write lock will hold for the entire range
func (e *Engine) filesAndLock(min, max int64) (a dataFiles, lockStart, lockEnd int64) {
@ -1166,17 +1183,66 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
func (e *Engine) flushDeletes() error {
e.writeLock.LockRange(math.MinInt64, math.MaxInt64)
defer e.writeLock.UnlockRange(math.MinInt64, math.MaxInt64)
e.metaLock.Lock()
defer e.metaLock.Unlock()
measurements := make(map[string]bool)
deletes := make(map[uint64]string)
e.filesLock.RLock()
for name, _ := range e.deleteMeasurements {
measurements[name] = true
}
for id, key := range e.deletes {
deletes[id] = key
}
e.filesLock.RUnlock()
// if we're deleting measurements, rewrite the field data
if len(measurements) > 0 {
fields, err := e.readFields()
if err != nil {
return err
}
for name, _ := range measurements {
delete(fields, name)
}
if err := e.writeFields(fields); err != nil {
return err
}
}
series, err := e.readSeries()
if err != nil {
return err
}
for _, key := range deletes {
seriesName, _ := seriesAndFieldFromCompositeKey(key)
delete(series, seriesName)
}
if err := e.writeSeries(series); err != nil {
return err
}
// now remove the raw time series data from the data files
files := e.copyFilesCollection()
newFiles := make(dataFiles, 0, len(files))
for _, f := range files {
newFiles = append(newFiles, e.writeNewFileExcludeDeletes(f))
}
// update the delete map and files
e.filesLock.Lock()
defer e.filesLock.Unlock()
e.files = newFiles
e.deletes = make(map[uint64]bool)
// remove the things we've deleted from the map
for name, _ := range measurements {
delete(e.deleteMeasurements, name)
}
for id, _ := range deletes {
delete(e.deletes, id)
}
e.deletesPending.Add(1)
go func() {
@ -1288,7 +1354,7 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys
mf := fields[measurement]
if mf != nil {
for _, f := range mf.Fields {
a = append(a, seriesFieldKey(k, f.Name))
a = append(a, SeriesFieldKey(k, f.Name))
}
}
@ -1296,7 +1362,7 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys
mf = e.WAL.measurementFieldsCache[measurement]
if mf != nil {
for _, f := range mf.Fields {
a = append(a, seriesFieldKey(k, f.Name))
a = append(a, SeriesFieldKey(k, f.Name))
}
}
}
@ -1305,30 +1371,23 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys
}
// DeleteSeries deletes the series from the engine.
func (e *Engine) DeleteSeries(keys []string) error {
func (e *Engine) DeleteSeries(seriesKeys []string) error {
e.metaLock.Lock()
defer e.metaLock.Unlock()
fields, err := e.readFields()
if err != nil {
return err
}
keyFields := e.keysWithFields(fields, keys)
return e.deleteKeyFields(keyFields)
}
func (e *Engine) deleteKeyFields(keyFields []string) error {
err := e.WAL.DeleteSeries(keyFields)
if err != nil {
return err
}
keyFields := e.keysWithFields(fields, seriesKeys)
e.filesLock.Lock()
defer e.filesLock.Unlock()
for _, k := range keyFields {
e.deletes[e.keyToID(k)] = true
for _, key := range keyFields {
e.deletes[e.keyToID(key)] = key
}
return nil
return e.WAL.DeleteSeries(keyFields)
}
// DeleteMeasurement deletes a measurement and all related series.
@ -1336,24 +1395,23 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error {
e.metaLock.Lock()
defer e.metaLock.Unlock()
// remove the field data from the index
fields, err := e.readFields()
if err != nil {
return err
}
// mark the measurement, series keys and the fields for deletion on the next flush
// also serves as a tombstone for any queries that come in before the flush
keyFields := e.keysWithFields(fields, seriesKeys)
e.filesLock.Lock()
defer e.filesLock.Unlock()
delete(fields, name)
if err := e.writeFields(fields); err != nil {
return err
e.deleteMeasurements[name] = true
for _, k := range keyFields {
e.deletes[e.keyToID(k)] = k
}
e.WAL.DropMeasurementFields(name)
// now delete all the measurement's series
return e.deleteKeyFields(keyFields)
return e.WAL.DeleteMeasurement(name, seriesKeys)
}
// SeriesCount returns the number of series buckets on the shard.
@ -1416,7 +1474,7 @@ func (e *Engine) keyToID(key string) uint64 {
}
func (e *Engine) keyAndFieldToID(series, field string) uint64 {
key := seriesFieldKey(series, field)
key := SeriesFieldKey(series, field)
return e.keyToID(key)
}
@ -1892,9 +1950,17 @@ func hashSeriesField(key string) uint64 {
return h.Sum64()
}
// seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
func seriesFieldKey(seriesKey, field string) string {
return seriesKey + "#" + field
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
func SeriesFieldKey(seriesKey, field string) string {
return seriesKey + keyFieldSeparator + field
}
func seriesAndFieldFromCompositeKey(key string) (string, string) {
parts := strings.Split(key, keyFieldSeparator)
if len(parts) != 0 {
return parts[0], strings.Join(parts[1:], keyFieldSeparator)
}
return parts[0], parts[1]
}
type uint64slice []uint64

View File

@ -19,7 +19,7 @@ func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascend
// don't add the overhead of the multifield cursor if we only have one field
if len(fields) == 1 {
id := t.engine.keyAndFieldToID(series, fields[0])
isDeleted := t.engine.deletes[id]
_, isDeleted := t.engine.deletes[id]
var indexCursor tsdb.Cursor
if isDeleted {
@ -37,7 +37,7 @@ func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascend
cursorFields := make([]string, 0)
for _, field := range fields {
id := t.engine.keyAndFieldToID(series, field)
isDeleted := t.engine.deletes[id]
_, isDeleted := t.engine.deletes[id]
var indexCursor tsdb.Cursor
if isDeleted {

View File

@ -119,6 +119,7 @@ type Log struct {
type IndexWriter interface {
Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
MarkDeletes(keys []string)
MarkMeasurementDelete(name string)
}
func NewLog(path string) *Log {
@ -168,7 +169,7 @@ func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascen
if len(fields) != 1 {
panic("wal cursor should only ever be called with 1 field")
}
ck := seriesFieldKey(series, fields[0])
ck := SeriesFieldKey(series, fields[0])
values := l.cache[ck]
// if we're in the middle of a flush, combine the previous cache
@ -268,7 +269,7 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem
for _, p := range points {
for name, value := range p.Fields() {
k := seriesFieldKey(string(p.Key()), name)
k := SeriesFieldKey(string(p.Key()), name)
v := NewValue(p.Time(), value)
cacheValues := l.cache[k]
@ -388,11 +389,16 @@ func (l *Log) readFileToCache(fileName string) error {
}
l.addToCache(nil, nil, series, false)
case deleteEntry:
var keys []string
if err := json.Unmarshal(data, &keys); err != nil {
d := &deleteData{}
if err := json.Unmarshal(data, &d); err != nil {
return err
}
l.Index.MarkDeletes(keys)
l.Index.MarkDeletes(d.Keys)
l.Index.MarkMeasurementDelete(d.MeasurementName)
l.deleteKeysFromCache(d.Keys)
if d.MeasurementName != "" {
l.deleteMeasurementFromCache(d.MeasurementName)
}
}
}
}
@ -431,27 +437,62 @@ func (l *Log) Flush() error {
return l.flush(idleFlush)
}
func (l *Log) DropMeasurementFields(measurement string) {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
delete(l.measurementFieldsCache, measurement)
}
func (l *Log) DeleteSeries(keys []string) error {
l.cacheLock.Lock()
for _, k := range keys {
delete(l.cache, k)
}
l.cacheLock.Unlock()
b, err := json.Marshal(keys)
func (l *Log) DeleteMeasurement(measurement string, keys []string) error {
d := &deleteData{MeasurementName: measurement, Keys: keys}
err := l.writeDeleteEntry(d)
if err != nil {
return err
}
cb := snappy.Encode(nil, b)
l.deleteKeysFromCache(keys)
l.deleteMeasurementFromCache(measurement)
return l.writeToLog(deleteEntry, cb)
return nil
}
func (l *Log) deleteMeasurementFromCache(name string) {
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
delete(l.measurementFieldsCache, name)
}
func (l *Log) writeDeleteEntry(d *deleteData) error {
js, err := json.Marshal(d)
if err != nil {
return err
}
data := snappy.Encode(nil, js)
return l.writeToLog(deleteEntry, data)
}
func (l *Log) DeleteSeries(keys []string) error {
l.deleteKeysFromCache(keys)
return l.writeDeleteEntry(&deleteData{Keys: keys})
}
func (l *Log) deleteKeysFromCache(keys []string) {
seriesKeys := make(map[string]bool)
for _, k := range keys {
series, _ := seriesAndFieldFromCompositeKey(k)
seriesKeys[series] = true
}
l.cacheLock.Lock()
defer l.cacheLock.Unlock()
for _, k := range keys {
delete(l.cache, k)
}
// now remove any of these that are marked for creation
var seriesCreate []*tsdb.SeriesCreate
for _, sc := range l.seriesToCreateCache {
if _, ok := seriesKeys[sc.Series.Key]; !ok {
seriesCreate = append(seriesCreate, sc)
}
}
l.seriesToCreateCache = seriesCreate
}
// Close will finish any flush that is currently in process and close file handles
@ -731,6 +772,13 @@ func (c *walCursor) nextReverse() Value {
return c.cache[c.position]
}
// deleteData holds the information for a delete entry
type deleteData struct {
// MeasurementName will be empty for deletes that are only against series
MeasurementName string
Keys []string
}
// idFromFileName parses the segment file ID from its name
func idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")

View File

@ -111,11 +111,11 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
t.Fatalf("failed to open: %s", err.Error())
}
if len(vals["cpu,host=A#value"]) != 2 {
if len(vals[tsm1.SeriesFieldKey("cpu,host=A", "value")]) != 2 {
t.Fatal("expected host A values to flush to index on open")
}
if len(vals["cpu,host=B#value"]) != 1 {
if len(vals[tsm1.SeriesFieldKey("cpu,host=B", "value")]) != 1 {
t.Fatal("expected host B values to flush to index on open")
}
@ -174,3 +174,5 @@ func (m *MockIndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementF
}
func (m *MockIndexWriter) MarkDeletes(keys []string) {}
func (m *MockIndexWriter) MarkMeasurementDelete(name string) {}