Add multishard iteration.

pull/1369/head
Ben Johnson 2015-01-26 21:14:07 -05:00 committed by Paul Dix
parent 7c4a79248d
commit 385687b8b4
4 changed files with 103 additions and 53 deletions

View File

@ -92,8 +92,8 @@ type Server struct {
databases map[string]*database // databases by name
users map[string]*User // user by name
shards map[uint64]*Shard // shards by shard id
shardsBySeriesID map[uint32]*Shard // shards by series id
shards map[uint64]*Shard // shards by shard id
shardsBySeriesID map[uint32][]*Shard // shards by series id
}
// NewServer returns a new instance of Server.
@ -106,7 +106,7 @@ func NewServer() *Server {
users: make(map[string]*User),
shards: make(map[uint64]*Shard),
shardsBySeriesID: make(map[uint32]*Shard),
shardsBySeriesID: make(map[uint32][]*Shard),
}
}
@ -1286,7 +1286,6 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
// Find appropriate shard within the shard group.
sh := g.ShardBySeriesID(seriesID)
s.shardsBySeriesID[seriesID] = sh
// Ignore requests that have no values.
if len(values) == 0 {
@ -1390,6 +1389,9 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
return err
}
// Add to lookup.
s.addShardBySeriesID(sh, c.SeriesID)
// Encode the values into a binary format.
data := marshalValues(rawValues)
@ -1415,6 +1417,9 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
data := m.Data[pointHeaderSize:]
// Add to lookup.
s.addShardBySeriesID(sh, seriesID)
// TODO: Enable some way to specify if the data should be overwritten
overwrite := true
@ -1422,6 +1427,15 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
return sh.writeSeries(seriesID, timestamp, data, overwrite)
}
func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
for _, other := range s.shardsBySeriesID[seriesID] {
if other.ID == sh.ID {
return
}
}
s.shardsBySeriesID[seriesID] = append(s.shardsBySeriesID[seriesID], sh)
}
func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
// Try to find series locally first.
s.mu.RLock()

View File

@ -938,7 +938,7 @@ func OpenDefaultServer(client influxdb.MessagingClient) *Server {
s := OpenServer(client)
if err := s.CreateDatabase("db"); err != nil {
panic(err.Error())
} else if err = s.CreateRetentionPolicy("db", &influxdb.RetentionPolicy{Name: "raw"}); err != nil {
} else if err = s.CreateRetentionPolicy("db", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}); err != nil {
panic(err.Error())
} else if err = s.SetDefaultRetentionPolicy("db", "raw"); err != nil {
panic(err.Error())

115
tx.go
View File

@ -1,6 +1,7 @@
package influxdb
import (
"bytes"
"errors"
"fmt"
"sync"
@ -56,16 +57,18 @@ func (tx *tx) Open() error {
// Open cursors on every series iterator.
for _, itr := range tx.itrs {
txn := tx.txns[itr.shardID]
for _, shardID := range itr.shardIDs {
txn := tx.txns[shardID]
// Retrieve series bucket.
b := txn.Bucket(u32tob(itr.seriesID))
if b == nil {
continue
// Retrieve series bucket.
b := txn.Bucket(u32tob(itr.seriesID))
if b == nil {
continue
}
// Add cursor.
itr.cur.cursors = append(itr.cur.cursors, b.Cursor())
}
// Create cursor.
itr.cur = b.Cursor()
}
return nil
@ -191,19 +194,23 @@ func (tx *tx) createExprIterators(m *Measurement, fieldID uint8, interval time.D
seriesID: seriesID,
fieldID: fieldID,
condition: expr,
tmax: tmax.UnixNano(),
cur: &multiCursor{
kmin: u64tob(uint64(tmin.UnixNano())),
kmax: u64tob(uint64(tmax.UnixNano())),
},
}
// Lookup shard.
sh := tx.server.shardsBySeriesID[seriesID]
if sh != nil {
itr.shardID = sh.ID
tx.dbs[sh.ID] = sh.store
// Lookup shards.
shards := tx.server.shardsBySeriesID[seriesID]
for _, sh := range shards {
itr.shardIDs = append(itr.shardIDs, sh.ID)
if tx.dbs[sh.ID] == nil {
tx.dbs[sh.ID] = sh.store
}
}
itrs = append(itrs, itr)
// Track series iterators on tx so their cursors can be opened.
itrs = append(itrs, itr)
tx.itrs = append(tx.itrs, itr)
}
@ -248,46 +255,64 @@ func splitIdent(s string) (db, rp, m string, err error) {
// seriesIterator represents an iterator for traversing over a single series.
type seriesIterator struct {
shardID uint64
seriesID uint32
fieldID uint8
tags string // encoded dimensional tag values
condition influxql.Expr
tmin, tmax int64
initialized bool
cur *bolt.Cursor
seriesID uint32
fieldID uint8
tags string // encoded dimensional tag values
condition influxql.Expr
shardIDs []uint64
cur *multiCursor
}
func (i *seriesIterator) Tags() string { return i.tags }
func (i *seriesIterator) Next() (key int64, value interface{}) {
if i.cur == nil {
return 0, nil
}
// If not run yet then seek to the first key.
// Otherwise retrieve next key.
var k, v []byte
if !i.initialized {
k, v = i.cur.Seek(u64tob(uint64(i.tmin)))
i.initialized = true
} else {
k, v = i.cur.Next()
}
// Ignore if there's no more data.
k, v := i.cur.Next()
if k == nil {
return 0, nil
}
// Read timestamp. Ignore if greater than tmax.
// Read timestamp & field value.
key = int64(btou64(k))
if i.tmax != 0 && key > i.tmax {
return 0, nil
}
// Read field value.
value = unmarshalValue(v, i.fieldID)
warn("!", key, value)
return key, value
}
type multiCursor struct {
cursors []*bolt.Cursor
index int
initialized bool
kmin, kmax []byte // min/max keys
}
func (c *multiCursor) Next() (key, value []byte) {
for {
// Exit if no more cursors remain.
if c.index >= len(c.cursors) {
return nil, nil
}
// Move to the first or next key.
if !c.initialized {
key, value = c.cursors[c.index].Seek(c.kmin)
c.initialized = true
} else {
key, value = c.cursors[c.index].Next()
}
// Clear key if it's above max allowed.
if bytes.Compare(key, c.kmax) == 1 {
key, value = nil, nil
}
// If there is no key then increment the index and retry.
if key == nil {
c.index++
c.initialized = false
continue
}
return key, value
}
}

View File

@ -1,6 +1,7 @@
package influxdb_test
import (
"reflect"
"sort"
"testing"
@ -17,7 +18,7 @@ func TestTx_CreateIterators(t *testing.T) {
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Values: map[string]interface{}{"value": float64(90)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:20Z"), Values: map[string]interface{}{"value": float64(80)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:30Z"), Values: map[string]interface{}{"value": float64(60)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:30Z"), Values: map[string]interface{}{"value": float64(70)}}})
// Write to us-west
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west", "host": "serverB"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Values: map[string]interface{}{"value": float64(1)}}})
@ -52,8 +53,18 @@ func TestTx_CreateIterators(t *testing.T) {
defer tx.Close()
// Iterate over each one.
data := slurp(itrs)
warnf("> %v", data)
if data := slurp(itrs); !reflect.DeepEqual(data, []keyValue{
{key: 946684800000000000, value: float64(100)},
{key: 946684800000000000, value: float64(2000)},
{key: 946684800000000000, value: float64(1)},
{key: 946684800000000000, value: float64(2)},
{key: 946684800000000000, value: float64(1000)},
{key: 946684810000000000, value: float64(90)},
{key: 946684820000000000, value: float64(80)},
{key: 946684830000000000, value: float64(70)},
}) {
t.Fatalf("unexpected data: %#v", data)
}
}
func slurp(itrs []influxql.Iterator) []keyValue {