diff --git a/tsdb/engine/tsm1/tsm1.go b/tsdb/engine/tsm1/tsm1.go index b75d894f65..aac359c19d 100644 --- a/tsdb/engine/tsm1/tsm1.go +++ b/tsdb/engine/tsm1/tsm1.go @@ -13,6 +13,7 @@ import ( "path/filepath" "reflect" "sort" + "strings" "sync" "syscall" "time" @@ -47,6 +48,10 @@ const ( // are removed after the file has been synced and is safe for use. If a file // has an associated checkpoint file, it wasn't safely written and both should be removed CheckpointExtension = "check" + + // keyFieldSeparator separates the series key from the field name in the composite key + // that identifies a specific field in series + keyFieldSeparator = "#!~#" ) type TimePrecision uint8 @@ -115,8 +120,12 @@ type Engine struct { lastCompactionTime time.Time // deletes is a map of keys that are deleted, but haven't yet been - // compacted and flushed - deletes map[uint64]bool + // compacted and flushed. They map the ID to the corresponding key + deletes map[uint64]string + + // deleteMeasurements is a map of the measurements that are deleted + // but haven't yet been compacted and flushed + deleteMeasurements map[string]bool collisionsLock sync.RWMutex collisions map[string]uint64 @@ -240,7 +249,8 @@ func (e *Engine) Open() error { return err } - e.deletes = make(map[uint64]bool) + e.deletes = make(map[uint64]string) + e.deleteMeasurements = make(map[string]bool) // mark the last compaction as now so it doesn't try to compact while // flushing the WAL on load @@ -278,6 +288,7 @@ func (e *Engine) Close() error { e.currentFileID = 0 e.collisions = nil e.deletes = nil + e.deleteMeasurements = nil return nil } @@ -440,10 +451,16 @@ func (e *Engine) MarkDeletes(keys []string) { e.filesLock.Lock() defer e.filesLock.Unlock() for _, k := range keys { - e.deletes[e.keyToID(k)] = true + e.deletes[e.keyToID(k)] = k } } +func (e *Engine) MarkMeasurementDelete(name string) { + e.filesLock.Lock() + defer e.filesLock.Unlock() + e.deleteMeasurements[name] = true +} + // filesAndLock returns the data files that match the given range and // ensures that the write lock will hold for the entire range func (e *Engine) filesAndLock(min, max int64) (a dataFiles, lockStart, lockEnd int64) { @@ -1166,17 +1183,66 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro func (e *Engine) flushDeletes() error { e.writeLock.LockRange(math.MinInt64, math.MaxInt64) defer e.writeLock.UnlockRange(math.MinInt64, math.MaxInt64) + e.metaLock.Lock() + defer e.metaLock.Unlock() + measurements := make(map[string]bool) + deletes := make(map[uint64]string) + e.filesLock.RLock() + for name, _ := range e.deleteMeasurements { + measurements[name] = true + } + for id, key := range e.deletes { + deletes[id] = key + } + e.filesLock.RUnlock() + + // if we're deleting measurements, rewrite the field data + if len(measurements) > 0 { + fields, err := e.readFields() + if err != nil { + return err + } + for name, _ := range measurements { + delete(fields, name) + } + if err := e.writeFields(fields); err != nil { + return err + } + } + + series, err := e.readSeries() + if err != nil { + return err + } + for _, key := range deletes { + seriesName, _ := seriesAndFieldFromCompositeKey(key) + delete(series, seriesName) + } + if err := e.writeSeries(series); err != nil { + return err + } + + // now remove the raw time series data from the data files files := e.copyFilesCollection() newFiles := make(dataFiles, 0, len(files)) for _, f := range files { newFiles = append(newFiles, e.writeNewFileExcludeDeletes(f)) } + // update the delete map and files e.filesLock.Lock() defer e.filesLock.Unlock() + e.files = newFiles - e.deletes = make(map[uint64]bool) + + // remove the things we've deleted from the map + for name, _ := range measurements { + delete(e.deleteMeasurements, name) + } + for id, _ := range deletes { + delete(e.deletes, id) + } e.deletesPending.Add(1) go func() { @@ -1288,7 +1354,7 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys mf := fields[measurement] if mf != nil { for _, f := range mf.Fields { - a = append(a, seriesFieldKey(k, f.Name)) + a = append(a, SeriesFieldKey(k, f.Name)) } } @@ -1296,7 +1362,7 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys mf = e.WAL.measurementFieldsCache[measurement] if mf != nil { for _, f := range mf.Fields { - a = append(a, seriesFieldKey(k, f.Name)) + a = append(a, SeriesFieldKey(k, f.Name)) } } } @@ -1305,30 +1371,23 @@ func (e *Engine) keysWithFields(fields map[string]*tsdb.MeasurementFields, keys } // DeleteSeries deletes the series from the engine. -func (e *Engine) DeleteSeries(keys []string) error { +func (e *Engine) DeleteSeries(seriesKeys []string) error { + e.metaLock.Lock() + defer e.metaLock.Unlock() + fields, err := e.readFields() if err != nil { return err } - keyFields := e.keysWithFields(fields, keys) - - return e.deleteKeyFields(keyFields) -} - -func (e *Engine) deleteKeyFields(keyFields []string) error { - err := e.WAL.DeleteSeries(keyFields) - if err != nil { - return err - } + keyFields := e.keysWithFields(fields, seriesKeys) e.filesLock.Lock() defer e.filesLock.Unlock() - - for _, k := range keyFields { - e.deletes[e.keyToID(k)] = true + for _, key := range keyFields { + e.deletes[e.keyToID(key)] = key } - return nil + return e.WAL.DeleteSeries(keyFields) } // DeleteMeasurement deletes a measurement and all related series. @@ -1336,24 +1395,23 @@ func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error { e.metaLock.Lock() defer e.metaLock.Unlock() - // remove the field data from the index fields, err := e.readFields() if err != nil { return err } + // mark the measurement, series keys and the fields for deletion on the next flush + // also serves as a tombstone for any queries that come in before the flush keyFields := e.keysWithFields(fields, seriesKeys) + e.filesLock.Lock() + defer e.filesLock.Unlock() - delete(fields, name) - - if err := e.writeFields(fields); err != nil { - return err + e.deleteMeasurements[name] = true + for _, k := range keyFields { + e.deletes[e.keyToID(k)] = k } - e.WAL.DropMeasurementFields(name) - - // now delete all the measurement's series - return e.deleteKeyFields(keyFields) + return e.WAL.DeleteMeasurement(name, seriesKeys) } // SeriesCount returns the number of series buckets on the shard. @@ -1416,7 +1474,7 @@ func (e *Engine) keyToID(key string) uint64 { } func (e *Engine) keyAndFieldToID(series, field string) uint64 { - key := seriesFieldKey(series, field) + key := SeriesFieldKey(series, field) return e.keyToID(key) } @@ -1892,9 +1950,17 @@ func hashSeriesField(key string) uint64 { return h.Sum64() } -// seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID -func seriesFieldKey(seriesKey, field string) string { - return seriesKey + "#" + field +// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID +func SeriesFieldKey(seriesKey, field string) string { + return seriesKey + keyFieldSeparator + field +} + +func seriesAndFieldFromCompositeKey(key string) (string, string) { + parts := strings.Split(key, keyFieldSeparator) + if len(parts) != 0 { + return parts[0], strings.Join(parts[1:], keyFieldSeparator) + } + return parts[0], parts[1] } type uint64slice []uint64 diff --git a/tsdb/engine/tsm1/tx.go b/tsdb/engine/tsm1/tx.go index d5f31110d1..54653c2872 100644 --- a/tsdb/engine/tsm1/tx.go +++ b/tsdb/engine/tsm1/tx.go @@ -19,7 +19,7 @@ func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascend // don't add the overhead of the multifield cursor if we only have one field if len(fields) == 1 { id := t.engine.keyAndFieldToID(series, fields[0]) - isDeleted := t.engine.deletes[id] + _, isDeleted := t.engine.deletes[id] var indexCursor tsdb.Cursor if isDeleted { @@ -37,7 +37,7 @@ func (t *tx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascend cursorFields := make([]string, 0) for _, field := range fields { id := t.engine.keyAndFieldToID(series, field) - isDeleted := t.engine.deletes[id] + _, isDeleted := t.engine.deletes[id] var indexCursor tsdb.Cursor if isDeleted { diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 9cc2a2fd8f..4f6607d02a 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -119,6 +119,7 @@ type Log struct { type IndexWriter interface { Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error MarkDeletes(keys []string) + MarkMeasurementDelete(name string) } func NewLog(path string) *Log { @@ -168,7 +169,7 @@ func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascen if len(fields) != 1 { panic("wal cursor should only ever be called with 1 field") } - ck := seriesFieldKey(series, fields[0]) + ck := SeriesFieldKey(series, fields[0]) values := l.cache[ck] // if we're in the middle of a flush, combine the previous cache @@ -268,7 +269,7 @@ func (l *Log) addToCache(points []models.Point, fields map[string]*tsdb.Measurem for _, p := range points { for name, value := range p.Fields() { - k := seriesFieldKey(string(p.Key()), name) + k := SeriesFieldKey(string(p.Key()), name) v := NewValue(p.Time(), value) cacheValues := l.cache[k] @@ -388,11 +389,16 @@ func (l *Log) readFileToCache(fileName string) error { } l.addToCache(nil, nil, series, false) case deleteEntry: - var keys []string - if err := json.Unmarshal(data, &keys); err != nil { + d := &deleteData{} + if err := json.Unmarshal(data, &d); err != nil { return err } - l.Index.MarkDeletes(keys) + l.Index.MarkDeletes(d.Keys) + l.Index.MarkMeasurementDelete(d.MeasurementName) + l.deleteKeysFromCache(d.Keys) + if d.MeasurementName != "" { + l.deleteMeasurementFromCache(d.MeasurementName) + } } } } @@ -431,27 +437,62 @@ func (l *Log) Flush() error { return l.flush(idleFlush) } -func (l *Log) DropMeasurementFields(measurement string) { - l.cacheLock.Lock() - defer l.cacheLock.Unlock() - delete(l.measurementFieldsCache, measurement) -} - -func (l *Log) DeleteSeries(keys []string) error { - l.cacheLock.Lock() - for _, k := range keys { - delete(l.cache, k) - } - l.cacheLock.Unlock() - - b, err := json.Marshal(keys) +func (l *Log) DeleteMeasurement(measurement string, keys []string) error { + d := &deleteData{MeasurementName: measurement, Keys: keys} + err := l.writeDeleteEntry(d) if err != nil { return err } - cb := snappy.Encode(nil, b) + l.deleteKeysFromCache(keys) + l.deleteMeasurementFromCache(measurement) - return l.writeToLog(deleteEntry, cb) + return nil +} + +func (l *Log) deleteMeasurementFromCache(name string) { + l.cacheLock.Lock() + defer l.cacheLock.Unlock() + delete(l.measurementFieldsCache, name) +} + +func (l *Log) writeDeleteEntry(d *deleteData) error { + js, err := json.Marshal(d) + if err != nil { + return err + } + data := snappy.Encode(nil, js) + return l.writeToLog(deleteEntry, data) +} + +func (l *Log) DeleteSeries(keys []string) error { + l.deleteKeysFromCache(keys) + + return l.writeDeleteEntry(&deleteData{Keys: keys}) +} + +func (l *Log) deleteKeysFromCache(keys []string) { + seriesKeys := make(map[string]bool) + for _, k := range keys { + series, _ := seriesAndFieldFromCompositeKey(k) + seriesKeys[series] = true + } + + l.cacheLock.Lock() + defer l.cacheLock.Unlock() + + for _, k := range keys { + delete(l.cache, k) + } + + // now remove any of these that are marked for creation + var seriesCreate []*tsdb.SeriesCreate + for _, sc := range l.seriesToCreateCache { + if _, ok := seriesKeys[sc.Series.Key]; !ok { + seriesCreate = append(seriesCreate, sc) + } + } + l.seriesToCreateCache = seriesCreate } // Close will finish any flush that is currently in process and close file handles @@ -731,6 +772,13 @@ func (c *walCursor) nextReverse() Value { return c.cache[c.position] } +// deleteData holds the information for a delete entry +type deleteData struct { + // MeasurementName will be empty for deletes that are only against series + MeasurementName string + Keys []string +} + // idFromFileName parses the segment file ID from its name func idFromFileName(name string) (int, error) { parts := strings.Split(filepath.Base(name), ".") diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index dfc5cda8cc..9df191c7a7 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -111,11 +111,11 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) { t.Fatalf("failed to open: %s", err.Error()) } - if len(vals["cpu,host=A#value"]) != 2 { + if len(vals[tsm1.SeriesFieldKey("cpu,host=A", "value")]) != 2 { t.Fatal("expected host A values to flush to index on open") } - if len(vals["cpu,host=B#value"]) != 1 { + if len(vals[tsm1.SeriesFieldKey("cpu,host=B", "value")]) != 1 { t.Fatal("expected host B values to flush to index on open") } @@ -174,3 +174,5 @@ func (m *MockIndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementF } func (m *MockIndexWriter) MarkDeletes(keys []string) {} + +func (m *MockIndexWriter) MarkMeasurementDelete(name string) {}