diff --git a/cmd/influx_tsm/b1/reader.go b/cmd/influx_tsm/b1/reader.go index ad8fcad524..383699958e 100644 --- a/cmd/influx_tsm/b1/reader.go +++ b/cmd/influx_tsm/b1/reader.go @@ -15,6 +15,13 @@ const DefaultChunkSize = 1000 var NoFieldsFiltered uint64 +var excludedBuckets = map[string]bool{ + "fields": true, + "meta": true, + "series": true, + "wal": true, +} + // Reader is used to read all data from a b1 shard. type Reader struct { path string @@ -27,7 +34,6 @@ type Reader struct { keyBuf string valuesBuf []tsm1.Value - series map[string]*tsdb.Series fields map[string]*tsdb.MeasurementFields codecs map[string]*tsdb.FieldCodec @@ -38,7 +44,6 @@ type Reader struct { func NewReader(path string) *Reader { return &Reader{ path: path, - series: make(map[string]*tsdb.Series), fields: make(map[string]*tsdb.MeasurementFields), codecs: make(map[string]*tsdb.FieldCodec), } @@ -71,32 +76,31 @@ func (r *Reader) Open() error { return err } - // Load series - if err := r.db.View(func(tx *bolt.Tx) error { - meta := tx.Bucket([]byte("series")) - c := meta.Cursor() + seriesSet := make(map[string]bool) - for k, v := c.First(); k != nil; k, v = c.Next() { - series := &tsdb.Series{} - if err := series.UnmarshalBinary(v); err != nil { - return err + // ignore series index and find all series in this shard + if err := r.db.View(func(tx *bolt.Tx) error { + tx.ForEach(func(name []byte, _ *bolt.Bucket) error { + key := string(name) + if !excludedBuckets[key] { + seriesSet[key] = true } - r.series[string(k)] = series - } + return nil + }) return nil }); err != nil { return err } - // Create cursor for each field of each series. r.tx, err = r.db.Begin(false) if err != nil { return err } - for s := range r.series { + // Create cursor for each field of each series. + for s := range seriesSet { measurement := tsdb.MeasurementFromSeriesKey(s) - fields := r.fields[tsdb.MeasurementFromSeriesKey(s)] + fields := r.fields[measurement] if fields == nil { atomic.AddUint64(&NoFieldsFiltered, 1) continue diff --git a/cmd/influx_tsm/bz1/reader.go b/cmd/influx_tsm/bz1/reader.go index 760b0bcbff..90ecc2f457 100644 --- a/cmd/influx_tsm/bz1/reader.go +++ b/cmd/influx_tsm/bz1/reader.go @@ -31,7 +31,6 @@ type Reader struct { keyBuf string valuesBuf []tsm.Value - series map[string]*tsdb.Series fields map[string]*tsdb.MeasurementFields codecs map[string]*tsdb.FieldCodec @@ -42,7 +41,6 @@ type Reader struct { func NewReader(path string) *Reader { return &Reader{ path: path, - series: make(map[string]*tsdb.Series), fields: make(map[string]*tsdb.MeasurementFields), codecs: make(map[string]*tsdb.FieldCodec), ChunkSize: DefaultChunkSize, @@ -58,6 +56,8 @@ func (r *Reader) Open() error { } r.db = db + seriesSet := make(map[string]bool) + if err := r.db.View(func(tx *bolt.Tx) error { var data []byte @@ -66,20 +66,20 @@ func (r *Reader) Open() error { // No data in this shard. return nil } - buf := meta.Get([]byte("series")) - if buf == nil { - // No data in this shard. + + pointsBucket := tx.Bucket([]byte("points")) + if pointsBucket == nil { return nil } - data, err = snappy.Decode(nil, buf) - if err != nil { - return err - } - if err := json.Unmarshal(data, &r.series); err != nil { + + if err := pointsBucket.ForEach(func(key, _ []byte) error { + seriesSet[string(key)] = true + return nil + }); err != nil { return err } - buf = meta.Get([]byte("fields")) + buf := meta.Get([]byte("fields")) if buf == nil { // No data in this shard. return nil @@ -102,15 +102,15 @@ func (r *Reader) Open() error { r.codecs[k] = tsdb.NewFieldCodec(v.Fields) } - // Create cursor for each field of each series. r.tx, err = r.db.Begin(false) if err != nil { return err } - for s := range r.series { + // Create cursor for each field of each series. + for s := range seriesSet { measurement := tsdb.MeasurementFromSeriesKey(s) - fields := r.fields[tsdb.MeasurementFromSeriesKey(s)] + fields := r.fields[measurement] if fields == nil { atomic.AddUint64(&NoFieldsFiltered, 1) continue diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go index 55a538d9c7..25b4523563 100644 --- a/cmd/influx_tsm/main.go +++ b/cmd/influx_tsm/main.go @@ -42,7 +42,7 @@ The backed-up files must be removed manually, generally after starting up the node again to make sure all of data has been converted correctly. To restore a backup: - Shut down the node, remove the converted directory, and + Shut down the node, remove the converted directory, and copy the backed-up directory to the original location.` type options struct { @@ -54,7 +54,6 @@ type options struct { Parallel bool SkipBackup bool UpdateInterval time.Duration - // Quiet bool } func (o *options) Parse() error { @@ -67,7 +66,6 @@ func (o *options) Parse() error { fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)") fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.") fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.") - // fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.") fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address") fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.") fs.Usage = func() {