2016-12-28 23:15:31 +00:00
|
|
|
// Package bz1 reads data from bz1 shards.
|
2016-02-10 18:30:52 +00:00
|
|
|
package bz1 // import "github.com/influxdata/influxdb/cmd/influx_tsm/bz1"
|
2015-12-26 22:13:54 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
2016-02-29 16:14:16 +00:00
|
|
|
"math"
|
2015-12-26 22:13:54 +00:00
|
|
|
"sort"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/boltdb/bolt"
|
|
|
|
"github.com/golang/snappy"
|
2016-02-29 16:14:16 +00:00
|
|
|
"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
|
2016-02-10 17:26:18 +00:00
|
|
|
"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
|
2016-02-29 16:14:16 +00:00
|
|
|
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
2015-12-26 22:13:54 +00:00
|
|
|
)
|
|
|
|
|
2016-02-24 16:39:27 +00:00
|
|
|
// DefaultChunkSize is the size of chunks read from the bz1 shard
|
2015-12-26 22:13:54 +00:00
|
|
|
const DefaultChunkSize = 1000
|
|
|
|
|
|
|
|
// Reader is used to read all data from a bz1 shard.
|
|
|
|
type Reader struct {
|
|
|
|
path string
|
|
|
|
db *bolt.DB
|
|
|
|
tx *bolt.Tx
|
|
|
|
|
|
|
|
cursors []*cursor
|
|
|
|
currCursor int
|
|
|
|
|
2016-03-30 16:48:23 +00:00
|
|
|
keyBuf string
|
|
|
|
values []tsm1.Value
|
|
|
|
valuePos int
|
2015-12-26 22:13:54 +00:00
|
|
|
|
|
|
|
fields map[string]*tsdb.MeasurementFields
|
|
|
|
codecs map[string]*tsdb.FieldCodec
|
|
|
|
|
2016-02-29 16:14:16 +00:00
|
|
|
stats *stats.Stats
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewReader returns a reader for the bz1 shard at path.
|
2016-02-29 16:14:16 +00:00
|
|
|
func NewReader(path string, stats *stats.Stats, chunkSize int) *Reader {
|
|
|
|
r := &Reader{
|
|
|
|
path: path,
|
|
|
|
fields: make(map[string]*tsdb.MeasurementFields),
|
|
|
|
codecs: make(map[string]*tsdb.FieldCodec),
|
|
|
|
stats: stats,
|
|
|
|
}
|
|
|
|
|
|
|
|
if chunkSize <= 0 {
|
|
|
|
chunkSize = DefaultChunkSize
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
2016-02-29 16:14:16 +00:00
|
|
|
|
2016-03-30 16:48:23 +00:00
|
|
|
r.values = make([]tsm1.Value, chunkSize)
|
2016-02-29 16:14:16 +00:00
|
|
|
|
|
|
|
return r
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Open opens the reader.
|
|
|
|
func (r *Reader) Open() error {
|
|
|
|
// Open underlying storage.
|
|
|
|
db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r.db = db
|
|
|
|
|
2016-02-13 02:31:19 +00:00
|
|
|
seriesSet := make(map[string]bool)
|
|
|
|
|
2015-12-26 22:13:54 +00:00
|
|
|
if err := r.db.View(func(tx *bolt.Tx) error {
|
|
|
|
var data []byte
|
|
|
|
|
|
|
|
meta := tx.Bucket([]byte("meta"))
|
|
|
|
if meta == nil {
|
|
|
|
// No data in this shard.
|
|
|
|
return nil
|
|
|
|
}
|
2016-02-13 02:31:19 +00:00
|
|
|
|
|
|
|
pointsBucket := tx.Bucket([]byte("points"))
|
|
|
|
if pointsBucket == nil {
|
2015-12-26 22:13:54 +00:00
|
|
|
return nil
|
|
|
|
}
|
2016-02-13 02:31:19 +00:00
|
|
|
|
|
|
|
if err := pointsBucket.ForEach(func(key, _ []byte) error {
|
|
|
|
seriesSet[string(key)] = true
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
2015-12-26 22:13:54 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-02-13 02:31:19 +00:00
|
|
|
buf := meta.Get([]byte("fields"))
|
2015-12-26 22:13:54 +00:00
|
|
|
if buf == nil {
|
|
|
|
// No data in this shard.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
data, err = snappy.Decode(nil, buf)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &r.fields); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Build the codec for each measurement.
|
|
|
|
for k, v := range r.fields {
|
|
|
|
r.codecs[k] = tsdb.NewFieldCodec(v.Fields)
|
|
|
|
}
|
|
|
|
|
|
|
|
r.tx, err = r.db.Begin(false)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-02-13 02:31:19 +00:00
|
|
|
// Create cursor for each field of each series.
|
|
|
|
for s := range seriesSet {
|
2015-12-26 22:13:54 +00:00
|
|
|
measurement := tsdb.MeasurementFromSeriesKey(s)
|
2016-02-13 02:31:19 +00:00
|
|
|
fields := r.fields[measurement]
|
2016-01-06 19:39:07 +00:00
|
|
|
if fields == nil {
|
2016-02-29 16:14:16 +00:00
|
|
|
r.stats.IncrFiltered()
|
2016-01-06 19:39:07 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, f := range fields.Fields {
|
2015-12-26 22:13:54 +00:00
|
|
|
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
|
2016-01-05 23:18:55 +00:00
|
|
|
if c == nil {
|
|
|
|
continue
|
|
|
|
}
|
2015-12-26 22:13:54 +00:00
|
|
|
c.SeekTo(0)
|
|
|
|
r.cursors = append(r.cursors, c)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sort.Sort(cursors(r.cursors))
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns whether there is any more data to be read.
|
|
|
|
func (r *Reader) Next() bool {
|
2016-02-29 16:14:16 +00:00
|
|
|
r.valuePos = 0
|
|
|
|
OUTER:
|
2015-12-26 22:13:54 +00:00
|
|
|
for {
|
2016-02-29 16:14:16 +00:00
|
|
|
if r.currCursor >= len(r.cursors) {
|
2015-12-26 22:13:54 +00:00
|
|
|
// All cursors drained. No more data remains.
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
cc := r.cursors[r.currCursor]
|
2016-02-29 16:14:16 +00:00
|
|
|
r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
|
|
|
|
|
|
|
|
for {
|
|
|
|
k, v := cc.Next()
|
|
|
|
if k == -1 {
|
|
|
|
// Go to next cursor and try again.
|
|
|
|
r.currCursor++
|
|
|
|
if r.valuePos == 0 {
|
|
|
|
// The previous cursor had no data. Instead of returning
|
|
|
|
// just go immediately to the next cursor.
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
// There is some data available. Indicate that it should be read.
|
|
|
|
return true
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
2016-02-29 16:14:16 +00:00
|
|
|
if f, ok := v.(float64); ok {
|
|
|
|
if math.IsInf(f, 0) {
|
|
|
|
r.stats.AddPointsRead(1)
|
|
|
|
r.stats.IncrInf()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if math.IsNaN(f) {
|
|
|
|
r.stats.AddPointsRead(1)
|
|
|
|
r.stats.IncrNaN()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-30 16:48:23 +00:00
|
|
|
r.values[r.valuePos] = tsm1.NewValue(k, v)
|
2016-02-29 16:14:16 +00:00
|
|
|
r.valuePos++
|
|
|
|
|
|
|
|
if r.valuePos >= len(r.values) {
|
|
|
|
return true
|
|
|
|
}
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
|
|
|
|
// emitted completely for every field, in every series, before the next field is processed.
|
|
|
|
// Data from Read() adheres to the requirements for writing to tsm1 shards
|
2016-02-29 16:14:16 +00:00
|
|
|
func (r *Reader) Read() (string, []tsm1.Value, error) {
|
2016-03-30 16:48:23 +00:00
|
|
|
return r.keyBuf, r.values[:r.valuePos], nil
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the reader.
|
|
|
|
func (r *Reader) Close() error {
|
2016-02-17 19:20:28 +00:00
|
|
|
r.tx.Rollback()
|
|
|
|
return r.db.Close()
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// cursor provides ordered iteration across a series.
|
|
|
|
type cursor struct {
|
|
|
|
cursor *bolt.Cursor
|
|
|
|
buf []byte // uncompressed buffer
|
|
|
|
off int // buffer offset
|
|
|
|
fieldIndices []int
|
|
|
|
index int
|
|
|
|
|
|
|
|
series string
|
|
|
|
field string
|
|
|
|
dec *tsdb.FieldCodec
|
|
|
|
|
|
|
|
keyBuf int64
|
|
|
|
valBuf interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// newCursor returns an instance of a bz1 cursor.
|
|
|
|
func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor {
|
|
|
|
// Retrieve points bucket. Ignore if there is no bucket.
|
|
|
|
b := tx.Bucket([]byte("points")).Bucket([]byte(series))
|
|
|
|
if b == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return &cursor{
|
|
|
|
cursor: b.Cursor(),
|
|
|
|
series: series,
|
|
|
|
field: field,
|
|
|
|
dec: dec,
|
|
|
|
keyBuf: -2,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Seek moves the cursor to a position.
|
|
|
|
func (c *cursor) SeekTo(seek int64) {
|
2016-02-04 22:05:21 +00:00
|
|
|
var seekBytes [8]byte
|
|
|
|
binary.BigEndian.PutUint64(seekBytes[:], uint64(seek))
|
2015-12-26 22:13:54 +00:00
|
|
|
|
|
|
|
// Move cursor to appropriate block and set to buffer.
|
2016-02-04 22:05:21 +00:00
|
|
|
k, v := c.cursor.Seek(seekBytes[:])
|
2015-12-26 22:13:54 +00:00
|
|
|
if v == nil { // get the last block, it might have this time
|
|
|
|
_, v = c.cursor.Last()
|
2016-02-04 22:05:21 +00:00
|
|
|
} else if seek < int64(binary.BigEndian.Uint64(k)) { // the seek key is less than this block, go back one and check
|
2015-12-26 22:13:54 +00:00
|
|
|
_, v = c.cursor.Prev()
|
|
|
|
|
|
|
|
// if the previous block max time is less than the seek value, reset to where we were originally
|
2016-02-04 22:05:21 +00:00
|
|
|
if v == nil || seek > int64(binary.BigEndian.Uint64(v[0:8])) {
|
|
|
|
_, v = c.cursor.Seek(seekBytes[:])
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
c.setBuf(v)
|
|
|
|
|
|
|
|
// Read current block up to seek position.
|
2016-02-04 22:05:21 +00:00
|
|
|
c.seekBuf(seekBytes[:])
|
2015-12-26 22:13:54 +00:00
|
|
|
|
|
|
|
// Return current entry.
|
|
|
|
c.keyBuf, c.valBuf = c.read()
|
|
|
|
}
|
|
|
|
|
|
|
|
// seekBuf moves the cursor to a position within the current buffer.
|
|
|
|
func (c *cursor) seekBuf(seek []byte) (key, value []byte) {
|
|
|
|
for {
|
|
|
|
// Slice off the current entry.
|
|
|
|
buf := c.buf[c.off:]
|
|
|
|
|
|
|
|
// Exit if current entry's timestamp is on or after the seek.
|
|
|
|
if len(buf) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if bytes.Compare(buf[0:8], seek) != -1 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
c.off += entryHeaderSize + entryDataSize(buf)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next key/value pair from the cursor. If there are no values
|
|
|
|
// remaining, -1 is returned.
|
|
|
|
func (c *cursor) Next() (int64, interface{}) {
|
|
|
|
for {
|
|
|
|
k, v := func() (int64, interface{}) {
|
|
|
|
if c.keyBuf != -2 {
|
|
|
|
k, v := c.keyBuf, c.valBuf
|
|
|
|
c.keyBuf = -2
|
|
|
|
return k, v
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ignore if there is no buffer.
|
|
|
|
if len(c.buf) == 0 {
|
|
|
|
return -1, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Move forward to next entry.
|
|
|
|
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
|
|
|
|
|
|
|
|
// If no items left then read first item from next block.
|
|
|
|
if c.off >= len(c.buf) {
|
|
|
|
_, v := c.cursor.Next()
|
|
|
|
c.setBuf(v)
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.read()
|
|
|
|
}()
|
|
|
|
|
|
|
|
if k != -1 && v == nil {
|
|
|
|
// There is a point in the series at the next timestamp,
|
|
|
|
// but not for this cursor's field. Go to the next point.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
return k, v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// setBuf saves a compressed block to the buffer.
|
|
|
|
func (c *cursor) setBuf(block []byte) {
|
|
|
|
// Clear if the block is empty.
|
|
|
|
if len(block) == 0 {
|
|
|
|
c.buf, c.off, c.fieldIndices, c.index = c.buf[0:0], 0, c.fieldIndices[0:0], 0
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise decode block into buffer.
|
|
|
|
// Skip over the first 8 bytes since they are the max timestamp.
|
|
|
|
buf, err := snappy.Decode(nil, block[8:])
|
|
|
|
if err != nil {
|
|
|
|
c.buf = c.buf[0:0]
|
|
|
|
fmt.Printf("block decode error: %s\n", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
c.buf, c.off = buf, 0
|
|
|
|
}
|
|
|
|
|
|
|
|
// read reads the current key and value from the current block.
|
|
|
|
func (c *cursor) read() (key int64, value interface{}) {
|
|
|
|
// Return nil if the offset is at the end of the buffer.
|
|
|
|
if c.off >= len(c.buf) {
|
|
|
|
return -1, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise read the current entry.
|
|
|
|
buf := c.buf[c.off:]
|
|
|
|
dataSize := entryDataSize(buf)
|
|
|
|
|
|
|
|
return tsdb.DecodeKeyValue(c.field, c.dec, buf[0:8], buf[entryHeaderSize:entryHeaderSize+dataSize])
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sort bz1 cursors in correct order for writing to TSM files.
|
|
|
|
|
|
|
|
type cursors []*cursor
|
|
|
|
|
|
|
|
func (a cursors) Len() int { return len(a) }
|
|
|
|
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
|
|
func (a cursors) Less(i, j int) bool {
|
2016-02-29 16:14:16 +00:00
|
|
|
if a[i].series == a[j].series {
|
|
|
|
return a[i].field < a[j].field
|
|
|
|
}
|
|
|
|
return a[i].series < a[j].series
|
2015-12-26 22:13:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// entryHeaderSize is the number of bytes required for the header.
|
|
|
|
const entryHeaderSize = 8 + 4
|
|
|
|
|
|
|
|
// entryDataSize returns the size of an entry's data field, in bytes.
|
|
|
|
func entryDataSize(v []byte) int { return int(binary.BigEndian.Uint32(v[8:12])) }
|