influx_tsm: ignore series index and convert all points

A case (#5606) was found where a lot of data unexpectedly disappeared from a database
following a TSM conversion.

The proximate cause was an inconsistency between the root Bolt DB bucket list
and the meta data in the "series" bucket of the same shard. There were apparently valid
series in Bolt DB buckets that were no longer referenced by the meta data
in the "series" bucket - so-called orphaned series; since the conversion
process only iterated across the series found in the meta data, the conversion process
caused the orphaned series to be removed from the converted shards. This resulted in the
unexpected removal of data from the TSM shards that had previously been accessible
(despite the meta data inconsistency) in the b1 shards.

The root cause of the meta data inconsistency in the case above was a failure, in versions prior
to v0.9.3 (actually 3348dab) to update the "series" bucket with series that had been created in
previous shards during the life of the same influxd process instance.

This fix is required to avoid data loss during TSM conversions for shards that were created with
versions of influx that did not include 3348dab (e.g. prior to v0.9.3).

Analysis-by: Jon Seymour <jon@wildducktheories.com>
pull/5704/head
Joe LeGasse 2016-02-13 13:31:19 +11:00
parent 1b8fc0c154
commit 9af4894c1c
3 changed files with 34 additions and 32 deletions

View File

@ -15,6 +15,13 @@ const DefaultChunkSize = 1000
var NoFieldsFiltered uint64 var NoFieldsFiltered uint64
var excludedBuckets = map[string]bool{
"fields": true,
"meta": true,
"series": true,
"wal": true,
}
// Reader is used to read all data from a b1 shard. // Reader is used to read all data from a b1 shard.
type Reader struct { type Reader struct {
path string path string
@ -27,7 +34,6 @@ type Reader struct {
keyBuf string keyBuf string
valuesBuf []tsm1.Value valuesBuf []tsm1.Value
series map[string]*tsdb.Series
fields map[string]*tsdb.MeasurementFields fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec codecs map[string]*tsdb.FieldCodec
@ -38,7 +44,6 @@ type Reader struct {
func NewReader(path string) *Reader { func NewReader(path string) *Reader {
return &Reader{ return &Reader{
path: path, path: path,
series: make(map[string]*tsdb.Series),
fields: make(map[string]*tsdb.MeasurementFields), fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec), codecs: make(map[string]*tsdb.FieldCodec),
} }
@ -71,32 +76,31 @@ func (r *Reader) Open() error {
return err return err
} }
// Load series seriesSet := make(map[string]bool)
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("series"))
c := meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() { // ignore series index and find all series in this shard
series := &tsdb.Series{} if err := r.db.View(func(tx *bolt.Tx) error {
if err := series.UnmarshalBinary(v); err != nil { tx.ForEach(func(name []byte, _ *bolt.Bucket) error {
return err key := string(name)
if !excludedBuckets[key] {
seriesSet[key] = true
} }
r.series[string(k)] = series return nil
} })
return nil return nil
}); err != nil { }); err != nil {
return err return err
} }
// Create cursor for each field of each series.
r.tx, err = r.db.Begin(false) r.tx, err = r.db.Begin(false)
if err != nil { if err != nil {
return err return err
} }
for s := range r.series { // Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s) measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)] fields := r.fields[measurement]
if fields == nil { if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1) atomic.AddUint64(&NoFieldsFiltered, 1)
continue continue

View File

@ -31,7 +31,6 @@ type Reader struct {
keyBuf string keyBuf string
valuesBuf []tsm.Value valuesBuf []tsm.Value
series map[string]*tsdb.Series
fields map[string]*tsdb.MeasurementFields fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec codecs map[string]*tsdb.FieldCodec
@ -42,7 +41,6 @@ type Reader struct {
func NewReader(path string) *Reader { func NewReader(path string) *Reader {
return &Reader{ return &Reader{
path: path, path: path,
series: make(map[string]*tsdb.Series),
fields: make(map[string]*tsdb.MeasurementFields), fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec), codecs: make(map[string]*tsdb.FieldCodec),
ChunkSize: DefaultChunkSize, ChunkSize: DefaultChunkSize,
@ -58,6 +56,8 @@ func (r *Reader) Open() error {
} }
r.db = db r.db = db
seriesSet := make(map[string]bool)
if err := r.db.View(func(tx *bolt.Tx) error { if err := r.db.View(func(tx *bolt.Tx) error {
var data []byte var data []byte
@ -66,20 +66,20 @@ func (r *Reader) Open() error {
// No data in this shard. // No data in this shard.
return nil return nil
} }
buf := meta.Get([]byte("series"))
if buf == nil { pointsBucket := tx.Bucket([]byte("points"))
// No data in this shard. if pointsBucket == nil {
return nil return nil
} }
data, err = snappy.Decode(nil, buf)
if err != nil { if err := pointsBucket.ForEach(func(key, _ []byte) error {
return err seriesSet[string(key)] = true
} return nil
if err := json.Unmarshal(data, &r.series); err != nil { }); err != nil {
return err return err
} }
buf = meta.Get([]byte("fields")) buf := meta.Get([]byte("fields"))
if buf == nil { if buf == nil {
// No data in this shard. // No data in this shard.
return nil return nil
@ -102,15 +102,15 @@ func (r *Reader) Open() error {
r.codecs[k] = tsdb.NewFieldCodec(v.Fields) r.codecs[k] = tsdb.NewFieldCodec(v.Fields)
} }
// Create cursor for each field of each series.
r.tx, err = r.db.Begin(false) r.tx, err = r.db.Begin(false)
if err != nil { if err != nil {
return err return err
} }
for s := range r.series { // Create cursor for each field of each series.
for s := range seriesSet {
measurement := tsdb.MeasurementFromSeriesKey(s) measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[tsdb.MeasurementFromSeriesKey(s)] fields := r.fields[measurement]
if fields == nil { if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1) atomic.AddUint64(&NoFieldsFiltered, 1)
continue continue

View File

@ -42,7 +42,7 @@ The backed-up files must be removed manually, generally after starting up the
node again to make sure all of data has been converted correctly. node again to make sure all of data has been converted correctly.
To restore a backup: To restore a backup:
Shut down the node, remove the converted directory, and Shut down the node, remove the converted directory, and
copy the backed-up directory to the original location.` copy the backed-up directory to the original location.`
type options struct { type options struct {
@ -54,7 +54,6 @@ type options struct {
Parallel bool Parallel bool
SkipBackup bool SkipBackup bool
UpdateInterval time.Duration UpdateInterval time.Duration
// Quiet bool
} }
func (o *options) Parse() error { func (o *options) Parse() error {
@ -67,7 +66,6 @@ func (o *options) Parse() error {
fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)") fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)")
fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.") fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.")
fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.") fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.")
// fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.")
fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address") fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address")
fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.") fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
fs.Usage = func() { fs.Usage = func() {