Merge pull request #1264 from influxdb/in_mem_tag_index

Add in memory tag and metastore index
pull/1287/head
Paul Dix 2015-01-02 14:28:25 -05:00
commit 7743310432
5 changed files with 1516 additions and 326 deletions

726
database.go Normal file
View File

@ -0,0 +1,726 @@
package influxdb
import (
"encoding/json"
"regexp"
"sort"
"strings"
"time"
)
// database is a collection of retention policies and shards. It also has methods
// for keeping an in memory index of all the measurements, series, and tags in the database.
// Methods on this struct aren't goroutine safe. They assume that the server is handling
// any locking to make things safe.
type database struct {
name string
policies map[string]*RetentionPolicy // retention policies by name
shards map[uint64]*Shard // shards by id
defaultRetentionPolicy string
// in memory indexing structures
measurements map[string]*Measurement // measurement name to object and index
series map[uint32]*Series // map series id to the Series object
names []string // sorted list of the measurement names
}
// newDatabase returns an instance of database.
func newDatabase() *database {
return &database{
policies: make(map[string]*RetentionPolicy),
shards: make(map[uint64]*Shard),
measurements: make(map[string]*Measurement),
series: make(map[uint32]*Series),
names: make([]string, 0),
}
}
// shardByTimestamp returns a shard that owns a given timestamp.
func (db *database) shardByTimestamp(policy string, seriesID uint32, timestamp time.Time) (*Shard, error) {
p := db.policies[policy]
if p == nil {
return nil, ErrRetentionPolicyNotFound
}
return p.shardByTimestamp(seriesID, timestamp), nil
}
// shardsByTimestamp returns all shards that own a given timestamp.
func (db *database) shardsByTimestamp(policy string, timestamp time.Time) ([]*Shard, error) {
p := db.policies[policy]
if p == nil {
return nil, ErrRetentionPolicyNotFound
}
return p.shardsByTimestamp(timestamp), nil
}
// timeBetweenInclusive returns true if t is between min and max, inclusive.
func timeBetweenInclusive(t, min, max time.Time) bool {
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
}
// MarshalJSON encodes a database into a JSON-encoded byte slice.
func (db *database) MarshalJSON() ([]byte, error) {
// Copy over properties to intermediate type.
var o databaseJSON
o.Name = db.name
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
for _, rp := range db.policies {
o.Policies = append(o.Policies, rp)
}
for _, s := range db.shards {
o.Shards = append(o.Shards, s)
}
return json.Marshal(&o)
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
func (db *database) UnmarshalJSON(data []byte) error {
// Decode into intermediate type.
var o databaseJSON
if err := json.Unmarshal(data, &o); err != nil {
return err
}
// Copy over properties from intermediate type.
db.name = o.Name
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
// Copy shard policies.
db.policies = make(map[string]*RetentionPolicy)
for _, rp := range o.Policies {
db.policies[rp.Name] = rp
}
// Copy shards.
db.shards = make(map[uint64]*Shard)
for _, s := range o.Shards {
db.shards[s.ID] = s
}
return nil
}
// databaseJSON represents the JSON-serialization format for a database.
type databaseJSON struct {
Name string `json:"name,omitempty"`
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
Policies []*RetentionPolicy `json:"policies,omitempty"`
Shards []*Shard `json:"shards,omitempty"`
}
// Measurement represents a collection of time series in a database. It also contains in memory
// structures for indexing tags. These structures are accessed through private methods on the Measurement
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
// go routine safe access.
type Measurement struct {
Name string `json:"name,omitempty"`
Fields []*Fields `json:"fields,omitempty"`
// in memory index fields
series map[string]*Series // sorted tagset string to the series object
seriesByID map[uint32]*Series // lookup table for series by their id
measurement *Measurement
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
ids SeriesIDs // sorted list of series IDs in this measurement
}
func NewMeasurement(name string) *Measurement {
return &Measurement{
Name: name,
Fields: make([]*Fields, 0),
series: make(map[string]*Series),
seriesByID: make(map[uint32]*Series),
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
ids: SeriesIDs(make([]uint32, 0)),
}
}
// addSeries will add a series to the measurementIndex. Returns false if already present
func (m *Measurement) addSeries(s *Series) bool {
if _, ok := m.seriesByID[s.ID]; ok {
return false
}
m.seriesByID[s.ID] = s
tagset := string(marshalTags(s.Tags))
m.series[tagset] = s
m.ids = append(m.ids, s.ID)
// the series ID should always be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] {
sort.Sort(m.ids)
}
// add this series id to the tag index on the measurement
for k, v := range s.Tags {
valueMap := m.seriesByTagKeyValue[k]
if valueMap == nil {
valueMap = make(map[string]SeriesIDs)
m.seriesByTagKeyValue[k] = valueMap
}
ids := valueMap[v]
ids = append(ids, s.ID)
// most of the time the series ID will be higher than all others because it's a new
// series. So don't do the sort if we don't have to.
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids)
}
valueMap[v] = ids
}
return true
}
// seriesByTags returns the Series that matches the given tagset.
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
}
// sereisIDs returns the series ids for a given filter
func (m *Measurement) seriesIDs(filter *TagFilter) (ids SeriesIDs) {
values := m.seriesByTagKeyValue[filter.Key]
if values == nil {
return
}
// handle regex filters
if filter.Regex != nil {
for k, v := range values {
if filter.Regex.MatchString(k) {
if ids == nil {
ids = v
} else {
ids = ids.Union(v)
}
}
}
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
// this is for the value is not null query
if filter.Not && filter.Value == "" {
for _, v := range values {
if ids == nil {
ids = v
} else {
ids.Intersect(v)
}
}
return
}
// get the ids that have the given key/value tag pair
ids = SeriesIDs(values[filter.Value])
// filter out these ids from the entire set if it's a not query
if filter.Not {
ids = m.ids.Reject(ids)
}
return
}
// tagValues returns a map of unique tag values for the given key
func (m *Measurement) tagValues(key string) TagValues {
tags := m.seriesByTagKeyValue[key]
values := make(map[string]bool, len(tags))
for k, _ := range tags {
values[k] = true
}
return TagValues(values)
}
type Measurements []*Measurement
// Field represents a series field.
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type FieldType `json:"field"`
}
type FieldType int
const (
Int64 FieldType = iota
Float64
String
Boolean
Binary
)
// Fields represents a list of fields.
type Fields []*Field
// Series belong to a Measurement and represent unique time series in a database
type Series struct {
ID uint32
Tags map[string]string
measurement *Measurement
}
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
type RetentionPolicy struct {
// Unique name within database. Required.
Name string
// Length of time to keep data around
Duration time.Duration
ReplicaN uint32
SplitN uint32
Shards []*Shard
}
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
func NewRetentionPolicy(name string) *RetentionPolicy {
return &RetentionPolicy{
Name: name,
ReplicaN: DefaultReplicaN,
SplitN: DefaultSplitN,
Duration: DefaultShardRetention,
}
}
// shardByTimestamp returns the shard in the space that owns a given timestamp for a given series id.
// Returns nil if the shard does not exist.
func (rp *RetentionPolicy) shardByTimestamp(seriesID uint32, timestamp time.Time) *Shard {
shards := rp.shardsByTimestamp(timestamp)
if len(shards) > 0 {
return shards[int(seriesID)%len(shards)]
}
return nil
}
func (rp *RetentionPolicy) shardsByTimestamp(timestamp time.Time) []*Shard {
shards := make([]*Shard, 0, rp.SplitN)
for _, s := range rp.Shards {
if timeBetweenInclusive(timestamp, s.StartTime, s.EndTime) {
shards = append(shards, s)
}
}
return shards
}
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal(&retentionPolicyJSON{
Name: rp.Name,
Duration: rp.Duration,
ReplicaN: rp.ReplicaN,
SplitN: rp.SplitN,
})
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
// Decode into intermediate type.
var o retentionPolicyJSON
if err := json.Unmarshal(data, &o); err != nil {
return err
}
// Copy over properties from intermediate type.
rp.Name = o.Name
rp.ReplicaN = o.ReplicaN
rp.SplitN = o.SplitN
rp.Duration = o.Duration
rp.Shards = o.Shards
return nil
}
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
type retentionPolicyJSON struct {
Name string `json:"name"`
ReplicaN uint32 `json:"replicaN,omitempty"`
SplitN uint32 `json:"splitN,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Shards []*Shard `json:"shards,omitempty"`
}
// RetentionPolicies represents a list of shard policies.
type RetentionPolicies []*RetentionPolicy
// Shards returns a list of all shards for all policies.
func (rps RetentionPolicies) Shards() []*Shard {
var shards []*Shard
for _, rp := range rps {
shards = append(shards, rp.Shards...)
}
return shards
}
// TagFilter represents a tag filter when looking up other tags or measurements.
type TagFilter struct {
Not bool
Key string
Value string
Regex *regexp.Regexp
}
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
// intersection of collections of series ids.
type SeriesIDs []uint32
func (p SeriesIDs) Len() int { return len(p) }
func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] }
func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Equals assumes that both are sorted. This is by design, no touchy!
func (a SeriesIDs) Equals(seriesIDs SeriesIDs) bool {
if len(a) != len(seriesIDs) {
return false
}
for i, s := range seriesIDs {
if a[i] != s {
return false
}
}
return true
}
// Intersect returns a new collection of series ids in sorted order that is the intersection of the two.
// The two collections must already be sorted.
func (a SeriesIDs) Intersect(seriesIDs SeriesIDs) SeriesIDs {
l := a
r := seriesIDs
// we want to iterate through the shortest one and stop
if len(seriesIDs) < len(a) {
l = seriesIDs
r = a
}
// they're in sorted order so advance the counter as needed.
// That is, don't run comparisons against lower values that we've already passed
var i, j int
ids := make([]uint32, 0, len(l))
for i < len(l) {
if l[i] == r[j] {
ids = append(ids, l[i])
i += 1
j += 1
} else if l[i] < r[j] {
i += 1
} else {
j += 1
}
}
return SeriesIDs(ids)
}
// Union returns a new collection of series ids in sorted order that is the union of the two.
// The two collections must already be sorted.
func (l SeriesIDs) Union(r SeriesIDs) SeriesIDs {
ids := make([]uint32, 0, len(l)+len(r))
var i, j int
for i < len(l) && j < len(r) {
if l[i] == r[j] {
ids = append(ids, l[i])
i += 1
j += 1
} else if l[i] < r[j] {
ids = append(ids, l[i])
i += 1
} else {
ids = append(ids, r[j])
j += 1
}
}
// now append the remainder
if i < len(l) {
ids = append(ids, l[i:]...)
} else if j < len(r) {
ids = append(ids, r[j:]...)
}
return ids
}
// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. This is useful for the NOT operator.
// The two collections must already be sorted.
func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs {
var i, j int
ids := make([]uint32, 0, len(l))
for i < len(l) && j < len(r) {
if l[i] == r[j] {
i += 1
j += 1
} else if l[i] < r[j] {
ids = append(ids, l[i])
i += 1
} else {
j += 1
}
}
// append the remainder
if i < len(l) {
ids = append(ids, l[i:]...)
}
return SeriesIDs(ids)
}
// addSeriesToIndex adds the series for the given measurement to the index. Returns false if already present
func (d *database) addSeriesToIndex(measurementName string, s *Series) bool {
// if there is a measurement for this id, it's already been added
if d.series[s.ID] != nil {
return false
}
// get or create the measurement index and index it globally and in the measurement
idx := d.createMeasurementIfNotExists(measurementName)
s.measurement = idx
d.series[s.ID] = s
// TODO: add this series to the global tag index
return idx.addSeries(s)
}
// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
func (d *database) createMeasurementIfNotExists(name string) *Measurement {
idx := d.measurements[name]
if idx == nil {
idx = NewMeasurement(name)
d.measurements[name] = idx
d.names = append(d.names, name)
sort.Strings(d.names)
}
return idx
}
// AddField adds a field to the measurement name. Returns false if already present
func (d *database) AddField(name string, f *Field) bool {
panic("not implemented")
return false
}
// MeasurementsBySeriesIDs returns a collection of unique Measurements for the passed in SeriesIDs.
func (d *database) MeasurementsBySeriesIDs(seriesIDs SeriesIDs) []*Measurement {
measurements := make(map[*Measurement]bool)
for _, id := range seriesIDs {
m := d.series[id].measurement
measurements[m] = true
}
values := make([]*Measurement, 0, len(measurements))
for m, _ := range measurements {
values = append(values, m)
}
return values
}
// SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all.
// Filters are equivalent to an AND operation. If you want to do an OR, get the series IDs for one set,
// then get the series IDs for another set and use the SeriesIDs.Union to combine the two.
func (d *database) SeriesIDs(names []string, filters []*TagFilter) SeriesIDs {
// they want all ids if no filters are specified
if len(filters) == 0 {
ids := SeriesIDs(make([]uint32, 0))
for _, idx := range d.measurements {
ids = ids.Union(idx.ids)
}
return ids
}
ids := SeriesIDs(make([]uint32, 0))
for _, n := range names {
ids = ids.Union(d.seriesIDsByName(n, filters))
}
return ids
}
// TagKeys returns a sorted array of unique tag keys for the given measurements.
// If an empty or nil slice is passed in, the tag keys for the entire database will be returned.
func (d *database) TagKeys(names []string) []string {
if len(names) == 0 {
names = d.names
}
keys := make(map[string]bool)
for _, n := range names {
idx := d.measurements[n]
if idx != nil {
for k, _ := range idx.seriesByTagKeyValue {
keys[k] = true
}
}
}
sortedKeys := make([]string, 0, len(keys))
for k, _ := range keys {
sortedKeys = append(sortedKeys, k)
}
sort.Strings(sortedKeys)
return sortedKeys
}
// TagValues returns a map of unique tag values for the given measurements and key with the given filters applied.
// Call .ToSlice() on the result to convert it into a sorted slice of strings.
// Filters are equivalent to and AND operation. If you want to do an OR, get the tag values for one set,
// then get the tag values for another set and do a union of the two.
func (d *database) TagValues(names []string, key string, filters []*TagFilter) TagValues {
values := TagValues(make(map[string]bool))
// see if they just want all the tag values for this key
if len(filters) == 0 {
for _, n := range names {
idx := d.measurements[n]
if idx != nil {
values.Union(idx.tagValues(key))
}
}
return values
}
// they have filters so just get a set of series ids matching them and then get the tag values from those
seriesIDs := d.SeriesIDs(names, filters)
return d.tagValuesBySeries(key, seriesIDs)
}
// tagValuesBySeries will return a TagValues map of all the unique tag values for a collection of series.
func (d *database) tagValuesBySeries(key string, seriesIDs SeriesIDs) TagValues {
values := make(map[string]bool)
for _, id := range seriesIDs {
s := d.series[id]
if s == nil {
continue
}
if v, ok := s.Tags[key]; ok {
values[v] = true
}
}
return TagValues(values)
}
type TagValues map[string]bool
// ToSlice returns a sorted slice of the TagValues
func (t TagValues) ToSlice() []string {
a := make([]string, 0, len(t))
for v, _ := range t {
a = append(a, v)
}
sort.Strings(a)
return a
}
// Union will modify the receiver by merging in the passed in values.
func (l TagValues) Union(r TagValues) {
for v, _ := range r {
l[v] = true
}
}
// Intersect will modify the receiver by keeping only the keys that exist in the passed in values
func (l TagValues) Intersect(r TagValues) {
for v, _ := range l {
if _, ok := r[v]; !ok {
delete(l, v)
}
}
}
//seriesIDsByName is the same as SeriesIDs, but for a specific measurement.
func (d *database) seriesIDsByName(name string, filters []*TagFilter) SeriesIDs {
idx := d.measurements[name]
if idx == nil {
return nil
}
// process the filters one at a time to get the list of ids they return
idsPerFilter := make([]SeriesIDs, len(filters), len(filters))
for i, filter := range filters {
idsPerFilter[i] = idx.seriesIDs(filter)
}
// collapse the set of ids
allIDs := idsPerFilter[0]
for i := 1; i < len(filters); i++ {
allIDs = allIDs.Intersect(idsPerFilter[i])
}
return allIDs
}
// MeasurementBySeriesID returns the Measurement that is the parent of the given series id.
func (d *database) MeasurementBySeriesID(id uint32) *Measurement {
if s, ok := d.series[id]; ok {
return s.measurement
}
return nil
}
// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set.
func (d *database) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) {
idx := d.measurements[name]
if idx == nil {
return nil, nil
}
return idx, idx.seriesByTags(tags)
}
// SereiesByID returns the Series that has the given id.
func (d *database) SeriesByID(id uint32) *Series {
return d.series[id]
}
// Measurements returns all measurements that match the given filters.
func (d *database) Measurements(filters []*TagFilter) []*Measurement {
measurements := make([]*Measurement, 0, len(d.measurements))
for _, idx := range d.measurements {
measurements = append(measurements, idx.measurement)
}
return measurements
}
// Names returns all measurement names in sorted order.
func (d *database) Names() []string {
return d.names
}
// DropSeries will clear the index of all references to a series.
func (d *database) DropSeries(id uint32) {
panic("not implemented")
}
// DropMeasurement will clear the index of all references to a measurement and its child series.
func (d *database) DropMeasurement(name string) {
panic("not implemented")
}
// used to convert the tag set to bytes for use as a lookup key
func marshalTags(tags map[string]string) []byte {
s := make([]string, 0, len(tags))
// pull out keys to sort
for k := range tags {
s = append(s, k)
}
sort.Strings(s)
// now append on the key values in key sorted order
for _, k := range s {
s = append(s, tags[k])
}
return []byte(strings.Join(s, "|"))
}

683
database_test.go Normal file
View File

@ -0,0 +1,683 @@
package influxdb
import (
"reflect"
"regexp"
"sort"
"testing"
)
// Ensure that the index will return a sorted array of measurement names.
func TestDatabase_Names(t *testing.T) {
idx := databaseWithFixtureData()
r := idx.Names()
exp := []string{"another_thing", "cpu_load", "key_count", "queue_depth"}
if !reflect.DeepEqual(r, exp) {
t.Fatalf("Names not equal:\n got: %s\n exp: %s", r, exp)
}
}
// Ensure that we can get the measurement by the series ID.
func TestDatabase_MeasurementBySeriesID(t *testing.T) {
idx := newDatabase()
m := &Measurement{
Name: "cpu_load",
}
s := &Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
}
// add it and see if we can look it up
idx.addSeriesToIndex(m.Name, s)
mm := idx.MeasurementBySeriesID(uint32(1))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
}
// now test that we can add another
s = &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.addSeriesToIndex(m.Name, s)
mm = idx.MeasurementBySeriesID(uint32(2))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
}
mm = idx.MeasurementBySeriesID(uint32(1))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
}
}
// Ensure that we can get an array of unique measurements by a collection of series IDs.
func TestDatabase_MeasurementsBySeriesIDs(t *testing.T) {
idx := databaseWithFixtureData()
ids := SeriesIDs([]uint32{uint32(1), uint32(4)})
names := make([]string, 0)
for _, m := range idx.MeasurementsBySeriesIDs(ids) {
names = append(names, m.Name)
}
sort.Strings(names)
expected := []string{"cpu_load", "key_count"}
if !reflect.DeepEqual(names, expected) {
t.Fatalf("wrong measurements:\n exp: %s\n got: %s", expected, names)
}
}
// Ensure that we can get the series object by the series ID.
func TestDatabase_SeriesBySeriesID(t *testing.T) {
idx := newDatabase()
// now test that we can add another
s := &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.addSeriesToIndex("foo", s)
ss := idx.SeriesByID(uint32(2))
if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
}
}
// Ensure that we can get the measurement and series objects out based on measurement and tags.
func TestDatabase_MeasurementAndSeries(t *testing.T) {
idx := newDatabase()
m := &Measurement{
Name: "cpu_load",
}
s := &Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
}
// add it and see if we can look it up by name and tags
idx.addSeriesToIndex(m.Name, s)
mm, ss := idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
}
// now test that we can add another
s = &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
idx.addSeriesToIndex(m.Name, s)
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
}
}
// Ensure that we can get the series IDs for measurements without any filters.
func TestDatabase_SeriesIDs(t *testing.T) {
idx := newDatabase()
s := &Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
// add it and see if we can look it up
added := idx.addSeriesToIndex("cpu_load", s)
if !added {
t.Fatal("couldn't add series")
}
// test that we can't add it again
added = idx.addSeriesToIndex("cpu_load", s)
if added {
t.Fatal("shoulnd't be able to add duplicate series")
}
// now test that we can add another
s = &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.addSeriesToIndex("cpu_load", s)
if !added {
t.Fatalf("couldn't add series")
}
l := idx.SeriesIDs([]string{"cpu_load"}, nil)
r := []uint32{1, 2}
if !l.Equals(r) {
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
}
// now add another in a different measurement
s = &Series{
ID: uint32(3),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.addSeriesToIndex("network_in", s)
if !added {
t.Fatalf("couldn't add series")
}
l = idx.SeriesIDs([]string{"cpu_load"}, nil)
r = []uint32{1, 2, 3}
if !l.Equals(r) {
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
}
}
func TestDatabase_SeriesIDsWhereTagFilter(t *testing.T) {
idx := databaseWithFixtureData()
var tests = []struct {
names []string
filters []*TagFilter
result []uint32
}{
// match against no tags
{
names: []string{"cpu_load", "redis"},
result: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5), uint32(6), uint32(7), uint32(8)},
},
// match against all tags
{
names: []string{"cpu_load"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "servera.influx.com"},
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1)},
},
// match against one tag
{
names: []string{"cpu_load"},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1), uint32(2)},
},
// match against one tag, single result
{
names: []string{"cpu_load"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "servera.influx.com"},
},
result: []uint32{uint32(1)},
},
// query against tag key that doesn't exist returns empty
{
names: []string{"cpu_load"},
filters: []*TagFilter{
&TagFilter{Key: "foo", Value: "bar"},
},
result: []uint32{},
},
// query against tag value that doesn't exist returns empty
{
names: []string{"cpu_load"},
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "foo"},
},
result: []uint32{},
},
// query against a tag NOT value
{
names: []string{"key_count"},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "useast", Not: true},
},
result: []uint32{uint32(3)},
},
// query against a tag NOT null
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "app", Value: "", Not: true},
},
result: []uint32{uint32(6)},
},
// query against a tag value and another tag NOT value
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "name", Value: "high priority"},
&TagFilter{Key: "app", Value: "paultown", Not: true},
},
result: []uint32{uint32(5), uint32(7)},
},
// query against a tag value matching regex
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []uint32{uint32(6), uint32(7)},
},
// query against a tag value matching regex and other tag value matching value
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "name", Value: "high priority"},
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []uint32{uint32(6), uint32(7)},
},
// query against a tag value NOT matching regex
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
},
result: []uint32{uint32(5)},
},
// query against a tag value NOT matching regex and other tag value matching value
{
names: []string{"queue_depth"},
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
&TagFilter{Key: "name", Value: "high priority"},
},
result: []uint32{uint32(5)},
},
// query against multiple measurements
{
names: []string{"cpu_load", "key_count"},
filters: []*TagFilter{
&TagFilter{Key: "region", Value: "uswest"},
},
result: []uint32{uint32(1), uint32(2), uint32(3)},
},
}
for i, tt := range tests {
r := idx.SeriesIDs(tt.names, tt.filters)
expectedIDs := SeriesIDs(tt.result)
if !r.Equals(expectedIDs) {
t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, mustMarshalJSON(tt.filters), mustMarshalJSON(expectedIDs), mustMarshalJSON(r))
}
}
}
func TestDatabase_TagKeys(t *testing.T) {
idx := databaseWithFixtureData()
var tests = []struct {
names []string
result []string
}{
{
names: nil,
result: []string{"a", "app", "host", "name", "region", "service"},
},
{
names: []string{"cpu_load"},
result: []string{"host", "region"},
},
{
names: []string{"key_count", "queue_depth"},
result: []string{"app", "host", "name", "region", "service"},
},
}
for i, tt := range tests {
r := idx.TagKeys(tt.names)
if !reflect.DeepEqual(r, tt.result) {
t.Fatalf("%d: names: %s: result mismatch:\n exp=%s\n got=%s", i, tt.names, tt.result, r)
}
}
}
func TestDatabase_TagValuesWhereTagFilter(t *testing.T) {
idx := databaseWithFixtureData()
var tests = []struct {
names []string
key string
filters []*TagFilter
result []string
}{
// get the tag values across multiple measurements
// get the tag values for a single measurement
{
names: []string{"key_count"},
key: "region",
result: []string{"useast", "uswest"},
},
// get the tag values for a single measurement with where filter
{
names: []string{"key_count"},
key: "region",
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com"},
},
result: []string{"uswest"},
},
// get the tag values for a single measurement with a not where filter
{
names: []string{"key_count"},
key: "region",
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com", Not: true},
},
result: []string{"useast"},
},
// get the tag values for a single measurement with multiple where filters
{
names: []string{"key_count"},
key: "region",
filters: []*TagFilter{
&TagFilter{Key: "host", Value: "serverc.influx.com"},
&TagFilter{Key: "service", Value: "redis"},
},
result: []string{"uswest"},
},
// get the tag values for a single measurement with regex filter
{
names: []string{"queue_depth"},
key: "name",
filters: []*TagFilter{
&TagFilter{Key: "app", Regex: regexp.MustCompile("paul.*")},
},
result: []string{"high priority"},
},
// get the tag values for a single measurement with a not regex filter
{
names: []string{"key_count"},
key: "region",
filters: []*TagFilter{
&TagFilter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
},
result: []string{"uswest"},
},
}
for i, tt := range tests {
r := idx.TagValues(tt.names, tt.key, tt.filters).ToSlice()
if !reflect.DeepEqual(r, tt.result) {
t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, mustMarshalJSON(tt.filters), tt.result, r)
}
}
}
func TestDatabase_DropSeries(t *testing.T) {
t.Skip("pending")
}
func TestDatabase_DropMeasurement(t *testing.T) {
t.Skip("pending")
}
func TestDatabase_FieldKeys(t *testing.T) {
t.Skip("pending")
}
// databaseWithFixtureData returns a populated Index for use in many of the filtering tests
func databaseWithFixtureData() *database {
idx := newDatabase()
s := &Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
added := idx.addSeriesToIndex("cpu_load", s)
if !added {
return nil
}
s = &Series{
ID: uint32(2),
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
added = idx.addSeriesToIndex("cpu_load", s)
if !added {
return nil
}
s = &Series{
ID: uint32(3),
Tags: map[string]string{"host": "serverc.influx.com", "region": "uswest", "service": "redis"}}
added = idx.addSeriesToIndex("key_count", s)
if !added {
return nil
}
s = &Series{
ID: uint32(4),
Tags: map[string]string{"host": "serverd.influx.com", "region": "useast", "service": "redis"}}
added = idx.addSeriesToIndex("key_count", s)
if !added {
return nil
}
s = &Series{
ID: uint32(5),
Tags: map[string]string{"name": "high priority"}}
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
s = &Series{
ID: uint32(6),
Tags: map[string]string{"name": "high priority", "app": "paultown"}}
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
s = &Series{
ID: uint32(7),
Tags: map[string]string{"name": "high priority", "app": "paulcountry"}}
added = idx.addSeriesToIndex("queue_depth", s)
if !added {
return nil
}
s = &Series{
ID: uint32(8),
Tags: map[string]string{"a": "b"}}
added = idx.addSeriesToIndex("another_thing", s)
if !added {
return nil
}
return idx
}
func TestDatabase_SeriesIDsIntersect(t *testing.T) {
var tests = []struct {
expected []uint32
left []uint32
right []uint32
}{
// both sets empty
{
expected: []uint32{},
left: []uint32{},
right: []uint32{},
},
// right set empty
{
expected: []uint32{},
left: []uint32{uint32(1)},
right: []uint32{},
},
// left set empty
{
expected: []uint32{},
left: []uint32{},
right: []uint32{uint32(1)},
},
// both sides same size
{
expected: []uint32{uint32(1), uint32(4)},
left: []uint32{uint32(1), uint32(2), uint32(4), uint32(5)},
right: []uint32{uint32(1), uint32(3), uint32(4), uint32(7)},
},
// left side bigger
{
expected: []uint32{uint32(2)},
left: []uint32{uint32(1), uint32(2), uint32(3)},
right: []uint32{uint32(2)},
},
// right side bigger
{
expected: []uint32{uint32(4), uint32(8)},
left: []uint32{uint32(2), uint32(3), uint32(4), uint32(8)},
right: []uint32{uint32(1), uint32(4), uint32(7), uint32(8), uint32(9)},
},
}
for i, tt := range tests {
a := SeriesIDs(tt.left).Intersect(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s intersect %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}
func TestDatabase_SeriesIDsUnion(t *testing.T) {
var tests = []struct {
expected []uint32
left []uint32
right []uint32
}{
// both sets empty
{
expected: []uint32{},
left: []uint32{},
right: []uint32{},
},
// right set empty
{
expected: []uint32{uint32(1)},
left: []uint32{uint32(1)},
right: []uint32{},
},
// left set empty
{
expected: []uint32{uint32(1)},
left: []uint32{},
right: []uint32{uint32(1)},
},
// both sides same size
{
expected: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(5), uint32(7)},
left: []uint32{uint32(1), uint32(2), uint32(4), uint32(5)},
right: []uint32{uint32(1), uint32(3), uint32(4), uint32(7)},
},
// left side bigger
{
expected: []uint32{uint32(1), uint32(2), uint32(3)},
left: []uint32{uint32(1), uint32(2), uint32(3)},
right: []uint32{uint32(2)},
},
// right side bigger
{
expected: []uint32{uint32(1), uint32(2), uint32(3), uint32(4), uint32(7), uint32(8), uint32(9)},
left: []uint32{uint32(2), uint32(3), uint32(4), uint32(8)},
right: []uint32{uint32(1), uint32(4), uint32(7), uint32(8), uint32(9)},
},
}
for i, tt := range tests {
a := SeriesIDs(tt.left).Union(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s union %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}
func TestDatabase_SeriesIDsReject(t *testing.T) {
var tests = []struct {
expected []uint32
left []uint32
right []uint32
}{
// both sets empty
{
expected: []uint32{},
left: []uint32{},
right: []uint32{},
},
// right set empty
{
expected: []uint32{uint32(1)},
left: []uint32{uint32(1)},
right: []uint32{},
},
// left set empty
{
expected: []uint32{},
left: []uint32{},
right: []uint32{uint32(1)},
},
// both sides same size
{
expected: []uint32{uint32(2), uint32(5)},
left: []uint32{uint32(1), uint32(2), uint32(4), uint32(5)},
right: []uint32{uint32(1), uint32(3), uint32(4), uint32(7)},
},
// left side bigger
{
expected: []uint32{uint32(1), uint32(3)},
left: []uint32{uint32(1), uint32(2), uint32(3)},
right: []uint32{uint32(2)},
},
// right side bigger
{
expected: []uint32{uint32(2), uint32(3)},
left: []uint32{uint32(2), uint32(3), uint32(4), uint32(8)},
right: []uint32{uint32(1), uint32(4), uint32(7), uint32(8), uint32(9)},
},
}
for i, tt := range tests {
a := SeriesIDs(tt.left).Reject(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s reject %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}

View File

@ -2,8 +2,6 @@ package influxdb
import (
"encoding/binary"
"sort"
"strings"
"time"
"unsafe"
@ -165,97 +163,50 @@ func (tx *metatx) deleteDatabase(name string) error {
return tx.Bucket([]byte("Databases")).DeleteBucket([]byte(name))
}
// returns a unique series id by database, name and tags. Returns ErrSeriesNotFound
func (tx *metatx) seriesID(database, name string, tags map[string]string) (uint32, error) {
// get the bucket that holds series data for the database
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database))
if b == nil {
return uint32(0), ErrDatabaseNotFound
}
// get the bucket that holds tag data for the series name
b = b.Bucket([]byte("TagBytesToID")).Bucket([]byte(name))
if b == nil {
return uint32(0), ErrSeriesNotFound
}
// look up the id of the tagset
tagBytes := tagsToBytes(tags)
v := b.Get(tagBytes)
if v == nil {
return uint32(0), ErrSeriesNotFound
}
// the value is the bytes for a uint32, return it
return *(*uint32)(unsafe.Pointer(&v[0])), nil
}
// sets the series id for the database, name, and tags.
func (tx *metatx) createSeriesIfNotExists(database, name string, tags map[string]string) error {
func (tx *metatx) createSeries(database, name string, tags map[string]string) (*Series, error) {
// create the buckets to store tag indexes for the series and give it a unique ID in the DB
db := tx.Bucket([]byte("Databases")).Bucket([]byte(database))
t := db.Bucket([]byte("TagBytesToID"))
t := db.Bucket([]byte("Series"))
b, err := t.CreateBucketIfNotExists([]byte(name))
if err != nil {
return err
return nil, err
}
// give the series a unique ID
id, _ := t.NextSequence()
tagBytes := tagsToBytes(tags)
idBytes := make([]byte, 4)
*(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id)
if err := b.Put(tagBytes, idBytes); err != nil {
return err
}
// store the tag map for the series
b, err = db.Bucket([]byte("Series")).CreateBucketIfNotExists([]byte(name))
if err != nil {
return err
return nil, err
}
s := &Series{ID: uint32(id), Tags: tags}
return b.Put(idBytes, mustMarshalJSON(s))
idBytes := make([]byte, 4)
*(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id)
if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil {
return nil, err
}
return s, nil
}
// series returns all the measurements and series in a database
func (tx *metatx) measurements(database string) []*Measurement {
// loops through all the measurements and series in a database
func (tx *metatx) indexDatabase(db *database) {
// get the bucket that holds series data for the database
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series"))
measurements := make([]*Measurement, 0)
b := tx.Bucket([]byte("Databases")).Bucket([]byte(db.name)).Bucket([]byte("Series"))
c := b.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
mc := b.Bucket(k).Cursor()
m := &Measurement{Name: string(k), Series: make([]*Series, 0)}
name := string(k)
for id, v := mc.First(); id != nil; id, v = mc.Next() {
var s *Series
mustUnmarshalJSON(v, &s)
m.Series = append(m.Series, s)
db.addSeriesToIndex(name, s)
}
measurements = append(measurements, m)
}
return measurements
}
// used to convert the tag set to bytes for use as a key in bolt
func tagsToBytes(tags map[string]string) []byte {
s := make([]string, 0, len(tags))
// pull out keys to sort
for k := range tags {
s = append(s, k)
}
sort.Strings(s)
// now append on the key values in key sorted order
for _, k := range s {
s = append(s, tags[k])
}
return []byte(strings.Join(s, "|"))
}
// user returns a user from the metastore by name.

300
server.go
View File

@ -3,6 +3,7 @@ package influxdb
import (
"encoding/json"
"fmt"
"log"
"net/url"
"os"
"path/filepath"
@ -194,6 +195,16 @@ func (s *Server) load() error {
for sh := range db.shards {
s.databasesByShard[sh] = db
}
// load the index
log.Printf("Loading metadata index for %s\n", db.name)
err := s.meta.view(func(tx *metatx) error {
tx.indexDatabase(db)
return nil
})
if err != nil {
return err
}
}
// Load users.
@ -1021,9 +1032,23 @@ func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
return ErrDatabaseNotFound
}
return s.meta.mustUpdate(func(tx *metatx) error {
return tx.createSeriesIfNotExists(db.name, c.Name, c.Tags)
if _, series := db.MeasurementAndSeries(c.Name, c.Tags); series != nil {
return nil
}
// save to the metastore and add it to the in memory index
var series *Series
err := s.meta.mustUpdate(func(tx *metatx) error {
var err error
series, err = tx.createSeries(db.name, c.Name, c.Tags)
return err
})
if err != nil {
return err
}
db.addSeriesToIndex(c.Name, series)
return nil
}
type createSeriesIfNotExistsCommand struct {
@ -1088,20 +1113,16 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
return sh.writeSeries(overwrite, m.Data)
}
// seriesID returns the unique id of a series and tagset and a bool indicating if it was found
func (s *Server) seriesID(database, name string, tags map[string]string) (id uint32) {
s.meta.view(func(tx *metatx) error {
id, _ = tx.seriesID(database, name, tags)
return nil
})
return
}
func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
// Try to find series locally first.
if id := s.seriesID(database, name, tags); id != 0 {
return id, nil
s.mu.RLock()
idx := s.databases[database]
if _, series := idx.MeasurementAndSeries(name, tags); series != nil {
s.mu.RUnlock()
return series.ID, nil
}
// release the read lock so the broadcast can actually go through and acquire the write lock
s.mu.RUnlock()
// If it doesn't exist then create a message and broadcast.
c := &createSeriesIfNotExistsCommand{Database: database, Name: name, Tags: tags}
@ -1111,19 +1132,35 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]
}
// Lookup series again.
id := s.seriesID(database, name, tags)
if id == 0 {
_, series := idx.MeasurementAndSeries(name, tags)
if series == nil {
return 0, ErrSeriesNotFound
}
return id, nil
return series.ID, nil
}
func (s *Server) Measurements(database string) (a Measurements) {
s.meta.view(func(tx *metatx) error {
a = tx.measurements(database)
func (s *Server) MeasurementNames(database string) []string {
s.mu.RLock()
defer s.mu.RUnlock()
db := s.databases[database]
if db == nil {
return nil
})
return
}
return db.names
}
func (s *Server) MeasurementSeriesIDs(database, measurement string) SeriesIDs {
s.mu.RLock()
defer s.mu.RUnlock()
db := s.databases[database]
if db == nil {
return nil
}
return db.SeriesIDs([]string{measurement}, nil)
}
// processor runs in a separate goroutine and processes all incoming broker messages.
@ -1204,227 +1241,6 @@ func (p dataNodes) Len() int { return len(p) }
func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID }
func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// database represents a collection of retention policies.
type database struct {
name string
policies map[string]*RetentionPolicy // retention policies by name
shards map[uint64]*Shard // shards by id
defaultRetentionPolicy string
}
// newDatabase returns an instance of database.
func newDatabase() *database {
return &database{
policies: make(map[string]*RetentionPolicy),
shards: make(map[uint64]*Shard),
}
}
// shardByTimestamp returns a shard that owns a given timestamp.
func (db *database) shardByTimestamp(policy string, id uint32, timestamp time.Time) (*Shard, error) {
p := db.policies[policy]
if p == nil {
return nil, ErrRetentionPolicyNotFound
}
return p.shardByTimestamp(id, timestamp), nil
}
// shardsByTimestamp returns all shards that own a given timestamp.
func (db *database) shardsByTimestamp(policy string, timestamp time.Time) ([]*Shard, error) {
p := db.policies[policy]
if p == nil {
return nil, ErrRetentionPolicyNotFound
}
return p.shardsByTimestamp(timestamp), nil
}
// timeBetweenInclusive returns true if t is between min and max, inclusive.
func timeBetweenInclusive(t, min, max time.Time) bool {
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
}
// MarshalJSON encodes a database into a JSON-encoded byte slice.
func (db *database) MarshalJSON() ([]byte, error) {
// Copy over properties to intermediate type.
var o databaseJSON
o.Name = db.name
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
for _, rp := range db.policies {
o.Policies = append(o.Policies, rp)
}
for _, s := range db.shards {
o.Shards = append(o.Shards, s)
}
return json.Marshal(&o)
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
func (db *database) UnmarshalJSON(data []byte) error {
// Decode into intermediate type.
var o databaseJSON
if err := json.Unmarshal(data, &o); err != nil {
return err
}
// Copy over properties from intermediate type.
db.name = o.Name
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
// Copy shard policies.
db.policies = make(map[string]*RetentionPolicy)
for _, rp := range o.Policies {
db.policies[rp.Name] = rp
}
// Copy shards.
db.shards = make(map[uint64]*Shard)
for _, s := range o.Shards {
db.shards[s.ID] = s
}
return nil
}
// databaseJSON represents the JSON-serialization format for a database.
type databaseJSON struct {
Name string `json:"name,omitempty"`
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
Policies []*RetentionPolicy `json:"policies,omitempty"`
Shards []*Shard `json:"shards,omitempty"`
}
// Measurement represents a collection of time series in a database
type Measurement struct {
Name string `json:"name,omitempty"`
Series []*Series `json:"series,omitempty"`
Fields []*Fields `json:"fields,omitempty"`
}
type Measurements []*Measurement
func (m Measurement) String() string { return string(mustMarshalJSON(m)) }
// Field represents a series field.
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type FieldType `json:"field"`
}
type FieldType int
const (
Int64 FieldType = iota
Float64
String
Boolean
Binary
)
// Fields represents a list of fields.
type Fields []*Field
// Series belong to a Measurement and represent unique time series in a database
type Series struct {
ID uint32
Tags map[string]string
}
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
type RetentionPolicy struct {
// Unique name within database. Required.
Name string
// Length of time to keep data around
Duration time.Duration
ReplicaN uint32
SplitN uint32
Shards []*Shard
}
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
func NewRetentionPolicy(name string) *RetentionPolicy {
return &RetentionPolicy{
Name: name,
ReplicaN: DefaultReplicaN,
SplitN: DefaultSplitN,
Duration: DefaultShardRetention,
}
}
// shardByTimestamp returns the shard in the space that owns a given timestamp for a given series id.
// Returns nil if the shard does not exist.
func (rp *RetentionPolicy) shardByTimestamp(id uint32, timestamp time.Time) *Shard {
shards := rp.shardsByTimestamp(timestamp)
if len(shards) > 0 {
return shards[int(id)%len(shards)]
}
return nil
}
func (rp *RetentionPolicy) shardsByTimestamp(timestamp time.Time) []*Shard {
shards := make([]*Shard, 0, rp.SplitN)
for _, s := range rp.Shards {
if timeBetweenInclusive(timestamp, s.StartTime, s.EndTime) {
shards = append(shards, s)
}
}
return shards
}
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal(&retentionPolicyJSON{
Name: rp.Name,
Duration: rp.Duration,
ReplicaN: rp.ReplicaN,
SplitN: rp.SplitN,
})
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
// Decode into intermediate type.
var o retentionPolicyJSON
if err := json.Unmarshal(data, &o); err != nil {
return err
}
// Copy over properties from intermediate type.
rp.Name = o.Name
rp.ReplicaN = o.ReplicaN
rp.SplitN = o.SplitN
rp.Duration = o.Duration
rp.Shards = o.Shards
return nil
}
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
type retentionPolicyJSON struct {
Name string `json:"name"`
ReplicaN uint32 `json:"replicaN,omitempty"`
SplitN uint32 `json:"splitN,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Shards []*Shard `json:"shards,omitempty"`
}
// RetentionPolicies represents a list of shard policies.
type RetentionPolicies []*RetentionPolicy
// Shards returns a list of all shards for all policies.
func (rps RetentionPolicies) Shards() []*Shard {
var shards []*Shard
for _, rp := range rps {
shards = append(shards, rp.Shards...)
}
return shards
}
// BcryptCost is the cost associated with generating password with Bcrypt.
// This setting is lowered during testing to improve test suite performance.
var BcryptCost = 10

View File

@ -1,6 +1,7 @@
package influxdb_test
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
@ -488,29 +489,42 @@ func TestServer_Measurements(t *testing.T) {
t.Fatal(err)
}
r := s.Measurements("foo")
m := []*influxdb.Measurement{
&influxdb.Measurement{
Name: "cpu_load",
Series: []*influxdb.Series{
&influxdb.Series{
ID: uint32(1),
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}}}}
if !measurementsEqual(r, m) {
t.Fatalf("Mesurements not the same:\n%s\n%s", r, m)
expectedMeasurementNames := []string{"cpu_load"}
expectedSeriesIDs := influxdb.SeriesIDs([]uint32{uint32(1)})
names := s.MeasurementNames("foo")
if !reflect.DeepEqual(names, expectedMeasurementNames) {
t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names)
}
ids := s.MeasurementSeriesIDs("foo", "foo")
if !ids.Equals(expectedSeriesIDs) {
t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids)
}
s.Restart()
names = s.MeasurementNames("foo")
if !reflect.DeepEqual(names, expectedMeasurementNames) {
t.Fatalf("Mesurements not the same:\n exp: %s\n got: %s", expectedMeasurementNames, names)
}
ids = s.MeasurementSeriesIDs("foo", "foo")
if !ids.Equals(expectedSeriesIDs) {
t.Fatalf("Series IDs not the same:\n exp: %s\n got: %s", expectedSeriesIDs, ids)
}
}
func mustMarshalJSON(v interface{}) string {
b, err := json.Marshal(v)
if err != nil {
panic("marshal: " + err.Error())
}
return string(b)
}
func measurementsEqual(l influxdb.Measurements, r influxdb.Measurements) bool {
if len(l) != len(r) {
return false
if mustMarshalJSON(l) == mustMarshalJSON(r) {
return true
}
for i, ll := range l {
if !reflect.DeepEqual(ll, r[i]) {
return false
}
}
return true
return false
}
func TestServer_SeriesByTagNames(t *testing.T) { t.Skip("pending") }