influxdb/cmd/influx_tsm/b1/reader.go

243 lines
5.5 KiB
Go
Raw Normal View History

2016-02-10 18:30:52 +00:00
package b1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/b1"
2015-12-26 22:13:54 +00:00
import (
"encoding/binary"
"sort"
"sync/atomic"
2015-12-26 22:13:54 +00:00
"time"
"github.com/boltdb/bolt"
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
2015-12-26 22:13:54 +00:00
)
// DefaultChunkSize is the size of chunks read from the b1 shard
2015-12-26 22:13:54 +00:00
const DefaultChunkSize = 1000
// NoFieldsFiltered is the number of nil fields filtered
var NoFieldsFiltered uint64
var excludedBuckets = map[string]bool{
"fields": true,
"meta": true,
"series": true,
"wal": true,
}
2015-12-26 22:13:54 +00:00
// Reader is used to read all data from a b1 shard.
type Reader struct {
path string
db *bolt.DB
tx *bolt.Tx
cursors []*cursor
currCursor int
keyBuf string
valuesBuf []tsm1.Value
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec
ChunkSize int
}
// NewReader returns a reader for the b1 shard at path.
func NewReader(path string) *Reader {
return &Reader{
path: path,
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
}
}
// Open opens the reader.
func (r *Reader) Open() error {
// Open underlying storage.
db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
r.db = db
// Load fields.
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("fields"))
c := meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
mf := &tsdb.MeasurementFields{}
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
r.fields[string(k)] = mf
r.codecs[string(k)] = tsdb.NewFieldCodec(mf.Fields)
}
return nil
}); err != nil {
return err
}
seriesSet := make(map[string]bool)
2015-12-26 22:13:54 +00:00
// ignore series index and find all series in this shard
if err := r.db.View(func(tx *bolt.Tx) error {
tx.ForEach(func(name []byte, _ *bolt.Bucket) error {
key := string(name)
if !excludedBuckets[key] {
seriesSet[key] = true
2015-12-26 22:13:54 +00:00
}
return nil
})
2015-12-26 22:13:54 +00:00
return nil
}); err != nil {
return err
}
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}
// Create cursor for each field of each series.
for s := range seriesSet {
2015-12-26 22:13:54 +00:00
measurement := tsdb.MeasurementFromSeriesKey(s)
fields := r.fields[measurement]
2016-01-06 19:39:07 +00:00
if fields == nil {
atomic.AddUint64(&NoFieldsFiltered, 1)
2016-01-06 19:39:07 +00:00
continue
}
for _, f := range fields.Fields {
2015-12-26 22:13:54 +00:00
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
c.SeekTo(0)
r.cursors = append(r.cursors, c)
}
}
sort.Sort(cursors(r.cursors))
return nil
}
// Next returns whether any data remains to be read. It must be called before
// the next call to Read().
func (r *Reader) Next() bool {
for {
if r.currCursor == len(r.cursors) {
// All cursors drained. No more data remains.
return false
}
cc := r.cursors[r.currCursor]
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if len(r.valuesBuf) == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue
}
// There is some data available. Indicate that it should be read.
return true
}
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v))
if len(r.valuesBuf) == r.ChunkSize {
return true
}
}
}
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm1.Value, error) {
defer func() {
r.valuesBuf = nil
}()
return r.keyBuf, r.valuesBuf, nil
}
// Close closes the reader.
func (r *Reader) Close() error {
r.tx.Rollback()
return r.db.Close()
2015-12-26 22:13:54 +00:00
}
// cursor provides ordered iteration across a series.
type cursor struct {
// Bolt cursor and readahead buffer.
cursor *bolt.Cursor
keyBuf int64
valBuf interface{}
series string
field string
dec *tsdb.FieldCodec
}
// Cursor returns an iterator for a key over a single field.
func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor {
cur := &cursor{
keyBuf: -2,
series: series,
field: field,
dec: dec,
}
// Retrieve series bucket.
b := tx.Bucket([]byte(series))
if b != nil {
cur.cursor = b.Cursor()
}
return cur
}
// Seek moves the cursor to a position.
func (c cursor) SeekTo(seek int64) {
var seekBytes [8]byte
binary.BigEndian.PutUint64(seekBytes[:], uint64(seek))
k, v := c.cursor.Seek(seekBytes[:])
2015-12-26 22:13:54 +00:00
c.keyBuf, c.valBuf = tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}
// Next returns the next key/value pair from the cursor.
func (c *cursor) Next() (key int64, value interface{}) {
for {
k, v := func() (int64, interface{}) {
if c.keyBuf != -2 {
k, v := c.keyBuf, c.valBuf
c.keyBuf = -2
return k, v
}
k, v := c.cursor.Next()
if k == nil {
return -1, nil
}
return tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}()
if k != -1 && v == nil {
// There is a point in the series at the next timestamp,
// but not for this cursor's field. Go to the next point.
continue
}
return k, v
}
}
// Sort b1 cursors in correct order for writing to TSM files.
type cursors []*cursor
func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
return tsm1.SeriesFieldKey(a[i].series, a[i].field) < tsm1.SeriesFieldKey(a[j].series, a[j].field)
}