influxdb/tx.go

306 lines
6.9 KiB
Go
Raw Normal View History

2015-01-26 12:19:35 +00:00
package influxdb
import (
"fmt"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/influxql"
)
// tx represents a transaction that spans multiple shard data stores.
// This transaction will open and close all data stores atomically.
type tx struct {
mu sync.Mutex
server *Server
opened bool
now time.Time
itrs []*shardIterator // shard iterators
2015-01-26 12:19:35 +00:00
}
// newTx return a new initialized Tx.
func newTx(server *Server) *tx {
return &tx{
server: server,
now: time.Now(),
}
}
// SetNow sets the current time for the transaction.
func (tx *tx) SetNow(now time.Time) { tx.now = now }
// Open opens a read-only transaction on all data stores atomically.
func (tx *tx) Open() error {
tx.mu.Lock()
defer tx.mu.Unlock()
// Mark transaction as open.
tx.opened = true
// Open each iterator individually. If any fail close the transaction and error out
for _, itr := range tx.itrs {
if err := itr.open(); err != nil {
2015-01-26 12:19:35 +00:00
_ = tx.close()
return err
}
}
return nil
}
// Close closes all data store transactions atomically.
func (tx *tx) Close() error {
tx.mu.Lock()
defer tx.mu.Unlock()
return tx.close()
}
func (tx *tx) close() error {
// Mark transaction as closed.
tx.opened = false
for _, itr := range tx.itrs {
_ = itr.close()
2015-01-26 12:19:35 +00:00
}
return nil
}
// CreateIterators returns an iterator for a simple select statement.
func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterator, error) {
// Parse the source segments.
database, policyName, measurement, err := splitIdent(stmt.Source.(*influxql.Measurement).Name)
if err != nil {
return nil, err
}
// Grab time range from statement.
tmin, tmax := influxql.TimeRange(stmt.Condition)
if tmin.IsZero() {
tmin = time.Unix(0, 1)
}
if tmax.IsZero() {
tmax = tx.now
}
// Find database and retention policy.
db := tx.server.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
rp := db.policies[policyName]
if rp == nil {
return nil, ErrRetentionPolicyNotFound
}
// Find shard groups within time range.
var shardGroups []*ShardGroup
for _, group := range rp.shardGroups {
if timeBetweenInclusive(group.StartTime, tmin, tmax) || timeBetweenInclusive(group.EndTime, tmin, tmax) {
shardGroups = append(shardGroups, group)
}
}
if len(shardGroups) == 0 {
return nil, nil
}
2015-01-26 12:19:35 +00:00
// Normalize dimensions to extract the interval.
_, dimensions, err := stmt.Dimensions.Normalize()
2015-01-26 12:19:35 +00:00
if err != nil {
return nil, err
}
// Find measurement.
m, err := tx.server.measurement(database, measurement)
if err != nil {
return nil, err
}
if m == nil {
return nil, ErrMeasurementNotFound
}
2015-01-26 12:19:35 +00:00
// Find field.
fieldName := stmt.Fields[0].Expr.(*influxql.VarRef).Val
f := m.FieldByName(fieldName)
if f == nil {
return nil, fmt.Errorf("field not found: %s", fieldName)
}
tagSets := m.tagSets(stmt, dimensions)
2015-01-26 12:19:35 +00:00
// Create an iterator for every shard.
2015-01-26 12:19:35 +00:00
var itrs []influxql.Iterator
for tag, set := range tagSets {
for _, group := range shardGroups {
// TODO: only create iterators for the shards we actually have to hit in a group
for _, sh := range group.Shards {
2015-01-28 01:07:55 +00:00
// create a series cursor for each unique series id
cursors := make([]*seriesCursor, 0, len(set))
for id, cond := range set {
2015-02-12 23:56:54 +00:00
cursors = append(cursors, &seriesCursor{id: id, condition: cond, decoder: m})
2015-01-28 01:07:55 +00:00
}
// create the shard iterator that will map over all series for the shard
itr := &shardIterator{
2015-01-28 01:07:55 +00:00
fieldName: f.Name,
fieldID: f.ID,
tags: tag,
db: sh.store,
cursors: cursors,
tmin: tmin.UnixNano(),
tmax: tmax.UnixNano(),
}
// Add to tx so the bolt transaction can be opened/closed.
tx.itrs = append(tx.itrs, itr)
itrs = append(itrs, itr)
2015-01-27 02:14:07 +00:00
}
2015-01-26 12:19:35 +00:00
}
}
return itrs, nil
}
// splitIdent splits an identifier into it's database, policy, and measurement parts.
func splitIdent(s string) (db, rp, m string, err error) {
a, err := influxql.SplitIdent(s)
if err != nil {
return "", "", "", err
} else if len(a) != 3 {
return "", "", "", fmt.Errorf("invalid ident, expected 3 segments: %q", s)
}
return a[0], a[1], a[2], nil
}
// shardIterator represents an iterator for traversing over a single series.
type shardIterator struct {
2015-01-28 01:07:55 +00:00
fieldName string
fieldID uint8
tags string // encoded dimensional tag values
2015-01-28 01:07:55 +00:00
cursors []*seriesCursor
keyValues []keyValue
db *bolt.DB // data stores by shard id
txn *bolt.Tx // read transactions by shard id
2015-01-28 01:07:55 +00:00
tmin, tmax int64
2015-01-26 12:19:35 +00:00
}
2015-01-27 02:14:07 +00:00
func (i *shardIterator) open() error {
// Open the data store
txn, err := i.db.Begin(false)
if err != nil {
return err
}
i.txn = txn
// Open cursors for each series id
2015-01-28 01:07:55 +00:00
for _, c := range i.cursors {
b := i.txn.Bucket(u32tob(c.id))
if b == nil {
continue
}
2015-01-28 01:07:55 +00:00
c.cur = b.Cursor()
}
2015-01-28 01:07:55 +00:00
i.keyValues = make([]keyValue, len(i.cursors))
for j, cur := range i.cursors {
i.keyValues[j].key, i.keyValues[j].value = cur.Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
}
return nil
}
func (i *shardIterator) close() error {
_ = i.txn.Rollback()
return nil
}
2015-01-28 01:07:55 +00:00
func (i *shardIterator) Tags() string { return i.tags }
2015-01-27 02:14:07 +00:00
func (i *shardIterator) Next() (key int64, data []byte) {
min := -1
2015-01-27 02:14:07 +00:00
2015-01-28 01:07:55 +00:00
for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
min = ind
2015-01-27 02:14:07 +00:00
}
}
2015-01-27 02:14:07 +00:00
// if min is -1 we've exhausted all cursors for the given time range
if min == -1 {
2015-01-28 01:07:55 +00:00
return 0, nil
}
2015-01-27 02:14:07 +00:00
2015-01-28 01:07:55 +00:00
kv := i.keyValues[min]
key = kv.key
value = kv.value
2015-01-27 02:14:07 +00:00
2015-01-28 01:07:55 +00:00
i.keyValues[min].key, i.keyValues[min].value = i.cursors[min].Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
return key, value
}
type keyValue struct {
key int64
value interface{}
}
2015-02-12 23:56:54 +00:00
type fieldDecoder interface {
DecodeField(b []byte, fieldID uint8) interface{}
}
2015-01-28 01:07:55 +00:00
type seriesCursor struct {
id uint32
condition influxql.Expr
cur *bolt.Cursor
initialized bool
2015-02-12 23:56:54 +00:00
decoder fieldDecoder
2015-01-28 01:07:55 +00:00
}
func (c *seriesCursor) Next(fieldName string, tmin, tmax int64) (key int64, data []byte) {
2015-01-28 01:07:55 +00:00
// TODO: clean this up when we make it so series ids are only queried against the shards they exist in.
// Right now we query for all series ids on a query against each shard, even if that shard may not have the
// data, so cur could be nil.
if c.cur == nil {
return 0, nil
2015-01-27 02:14:07 +00:00
}
2015-01-28 01:07:55 +00:00
for {
var k, v []byte
if !c.initialized {
k, v = c.cur.Seek(u64tob(uint64(tmin)))
c.initialized = true
} else {
k, v = c.cur.Next()
}
// Exit if there is no more data.
if k == nil {
return 0, nil
}
// Marshal key & value.
2015-02-12 23:56:54 +00:00
key, value = int64(btou64(k)), c.decoder.DecodeField(v, fieldID)
2015-01-28 01:07:55 +00:00
if key > tmax {
return 0, nil
}
// Skip to the next if we don't have a field value for this field for this point
if value == nil {
continue
}
2015-01-28 01:07:55 +00:00
// Evaluate condition. Move to next key/value if non-true.
if c.condition != nil {
if ok, _ := influxql.Eval(c.condition, map[string]interface{}{fieldName: value}).(bool); !ok {
continue
}
}
return key, value
}
2015-01-27 02:14:07 +00:00
}