Refactor Index into database
* Pull database, Measurement, Series, ReplicationPolicy, and Shard into database.go to keep things manageable and clean * Remove the locks from the index method. The server will handle thread safety for database objects * Move the Index tests into database_test.go and in the influxdb package because we're not exporting databasepull/1264/head
parent
b268ffecb8
commit
d5548aa136
|
@ -0,0 +1,723 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// database is a collection of retention policies and shards. It also has methods
|
||||
// for keeping an in memory index of all the measurements, series, and tags in the database.
|
||||
// Methods on this struct aren't goroutine safe. They assume that the server is handling
|
||||
// any locking to make things safe.
|
||||
type database struct {
|
||||
name string
|
||||
|
||||
policies map[string]*RetentionPolicy // retention policies by name
|
||||
shards map[uint64]*Shard // shards by id
|
||||
|
||||
defaultRetentionPolicy string
|
||||
|
||||
// in memory indexing structures
|
||||
measurements map[string]*Measurement // measurement name to object and index
|
||||
seriesToMeasurement map[uint32]*Measurement // map series id to its measurement
|
||||
series map[uint32]*Series // map series id to the Series object
|
||||
names []string // sorted list of the measurement names
|
||||
}
|
||||
|
||||
// newDatabase returns an instance of database.
|
||||
func newDatabase() *database {
|
||||
return &database{
|
||||
policies: make(map[string]*RetentionPolicy),
|
||||
shards: make(map[uint64]*Shard),
|
||||
measurements: make(map[string]*Measurement),
|
||||
seriesToMeasurement: make(map[uint32]*Measurement),
|
||||
series: make(map[uint32]*Series),
|
||||
names: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// shardByTimestamp returns a shard that owns a given timestamp.
|
||||
func (db *database) shardByTimestamp(policy string, id uint32, timestamp time.Time) (*Shard, error) {
|
||||
p := db.policies[policy]
|
||||
if p == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
return p.shardByTimestamp(id, timestamp), nil
|
||||
}
|
||||
|
||||
// shardsByTimestamp returns all shards that own a given timestamp.
|
||||
func (db *database) shardsByTimestamp(policy string, timestamp time.Time) ([]*Shard, error) {
|
||||
p := db.policies[policy]
|
||||
if p == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
return p.shardsByTimestamp(timestamp), nil
|
||||
}
|
||||
|
||||
// timeBetweenInclusive returns true if t is between min and max, inclusive.
|
||||
func timeBetweenInclusive(t, min, max time.Time) bool {
|
||||
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a database into a JSON-encoded byte slice.
|
||||
func (db *database) MarshalJSON() ([]byte, error) {
|
||||
// Copy over properties to intermediate type.
|
||||
var o databaseJSON
|
||||
o.Name = db.name
|
||||
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
|
||||
for _, rp := range db.policies {
|
||||
o.Policies = append(o.Policies, rp)
|
||||
}
|
||||
for _, s := range db.shards {
|
||||
o.Shards = append(o.Shards, s)
|
||||
}
|
||||
return json.Marshal(&o)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
|
||||
func (db *database) UnmarshalJSON(data []byte) error {
|
||||
// Decode into intermediate type.
|
||||
var o databaseJSON
|
||||
if err := json.Unmarshal(data, &o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy over properties from intermediate type.
|
||||
db.name = o.Name
|
||||
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
|
||||
|
||||
// Copy shard policies.
|
||||
db.policies = make(map[string]*RetentionPolicy)
|
||||
for _, rp := range o.Policies {
|
||||
db.policies[rp.Name] = rp
|
||||
}
|
||||
|
||||
// Copy shards.
|
||||
db.shards = make(map[uint64]*Shard)
|
||||
for _, s := range o.Shards {
|
||||
db.shards[s.ID] = s
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// databaseJSON represents the JSON-serialization format for a database.
|
||||
type databaseJSON struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
||||
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
||||
Shards []*Shard `json:"shards,omitempty"`
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
// structures for indexing tags. These structures are accessed through private methods on the Measurement
|
||||
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
|
||||
// go routine safe access.
|
||||
type Measurement struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Fields []*Fields `json:"fields,omitempty"`
|
||||
|
||||
// in memory index fields
|
||||
series map[string]*Series // sorted tagset string to the series object
|
||||
seriesByID map[uint32]*Series // lookup table for series by their id
|
||||
measurement *Measurement
|
||||
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
|
||||
ids SeriesIDs // sorted list of series IDs in this measurement
|
||||
}
|
||||
|
||||
func NewMeasurement(name string) *Measurement {
|
||||
return &Measurement{
|
||||
Name: name,
|
||||
Fields: make([]*Fields, 0),
|
||||
|
||||
series: make(map[string]*Series),
|
||||
seriesByID: make(map[uint32]*Series),
|
||||
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
|
||||
ids: SeriesIDs(make([]uint32, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
// addSeries will add a series to the measurementIndex. Returns false if already present
|
||||
func (m *Measurement) addSeries(s *Series) bool {
|
||||
if _, ok := m.seriesByID[s.ID]; ok {
|
||||
return false
|
||||
}
|
||||
m.seriesByID[s.ID] = s
|
||||
tagset := string(marshalTags(s.Tags))
|
||||
m.series[tagset] = s
|
||||
m.ids = append(m.ids, s.ID)
|
||||
// the series ID should always be higher than all others because it's a new
|
||||
// series. So don't do the sort if we don't have to.
|
||||
if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] {
|
||||
sort.Sort(m.ids)
|
||||
}
|
||||
|
||||
// add this series id to the tag index on the measurement
|
||||
for k, v := range s.Tags {
|
||||
valueMap := m.seriesByTagKeyValue[k]
|
||||
if valueMap == nil {
|
||||
valueMap = make(map[string]SeriesIDs)
|
||||
m.seriesByTagKeyValue[k] = valueMap
|
||||
}
|
||||
ids := valueMap[v]
|
||||
ids = append(ids, s.ID)
|
||||
|
||||
// most of the time the series ID will be higher than all others because it's a new
|
||||
// series. So don't do the sort if we don't have to.
|
||||
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
|
||||
sort.Sort(ids)
|
||||
}
|
||||
valueMap[v] = ids
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// seriesByTags returns the Series that matches the given tagset.
|
||||
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
|
||||
return m.series[string(marshalTags(tags))]
|
||||
}
|
||||
|
||||
// sereisIDs returns the series ids for a given filter
|
||||
func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
|
||||
values := m.seriesByTagKeyValue[filter.Key]
|
||||
if values == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// hanlde regex filters
|
||||
if filter.Regex != nil {
|
||||
for k, v := range values {
|
||||
if filter.Regex.MatchString(k) {
|
||||
if ids == nil {
|
||||
ids = v
|
||||
} else {
|
||||
ids = ids.Union(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
if filter.Not {
|
||||
ids = m.ids.Reject(ids)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// this is for the value is not null query
|
||||
if filter.Not && filter.Value == "" {
|
||||
for _, v := range values {
|
||||
if ids == nil {
|
||||
ids = v
|
||||
} else {
|
||||
ids.Intersect(v)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// get the ids that have the given key/value tag pair
|
||||
ids = SeriesIDs(values[filter.Value])
|
||||
|
||||
// filter out these ids from the entire set if it's a not query
|
||||
if filter.Not {
|
||||
ids = m.ids.Reject(ids)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// tagValues returns an array of unique tag values for the given key
|
||||
func (m *Measurement) tagValues(key string) TagValues {
|
||||
tags := m.seriesByTagKeyValue[key]
|
||||
values := make(map[string]bool, len(tags))
|
||||
for k, _ := range tags {
|
||||
values[k] = true
|
||||
}
|
||||
return TagValues(values)
|
||||
}
|
||||
|
||||
type Measurements []*Measurement
|
||||
|
||||
// Field represents a series field.
|
||||
type Field struct {
|
||||
ID uint8 `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Type FieldType `json:"field"`
|
||||
}
|
||||
|
||||
type FieldType int
|
||||
|
||||
const (
|
||||
Int64 FieldType = iota
|
||||
Float64
|
||||
String
|
||||
Boolean
|
||||
Binary
|
||||
)
|
||||
|
||||
// Fields represents a list of fields.
|
||||
type Fields []*Field
|
||||
|
||||
// Series belong to a Measurement and represent unique time series in a database
|
||||
type Series struct {
|
||||
ID uint32
|
||||
Tags map[string]string
|
||||
}
|
||||
|
||||
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
|
||||
type RetentionPolicy struct {
|
||||
// Unique name within database. Required.
|
||||
Name string
|
||||
|
||||
// Length of time to keep data around
|
||||
Duration time.Duration
|
||||
|
||||
ReplicaN uint32
|
||||
SplitN uint32
|
||||
|
||||
Shards []*Shard
|
||||
}
|
||||
|
||||
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
|
||||
func NewRetentionPolicy(name string) *RetentionPolicy {
|
||||
return &RetentionPolicy{
|
||||
Name: name,
|
||||
ReplicaN: DefaultReplicaN,
|
||||
SplitN: DefaultSplitN,
|
||||
Duration: DefaultShardRetention,
|
||||
}
|
||||
}
|
||||
|
||||
// shardByTimestamp returns the shard in the space that owns a given timestamp for a given series id.
|
||||
// Returns nil if the shard does not exist.
|
||||
func (rp *RetentionPolicy) shardByTimestamp(id uint32, timestamp time.Time) *Shard {
|
||||
shards := rp.shardsByTimestamp(timestamp)
|
||||
if len(shards) > 0 {
|
||||
return shards[int(id)%len(shards)]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *RetentionPolicy) shardsByTimestamp(timestamp time.Time) []*Shard {
|
||||
shards := make([]*Shard, 0, rp.SplitN)
|
||||
for _, s := range rp.Shards {
|
||||
if timeBetweenInclusive(timestamp, s.StartTime, s.EndTime) {
|
||||
shards = append(shards, s)
|
||||
}
|
||||
}
|
||||
return shards
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
|
||||
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(&retentionPolicyJSON{
|
||||
Name: rp.Name,
|
||||
Duration: rp.Duration,
|
||||
ReplicaN: rp.ReplicaN,
|
||||
SplitN: rp.SplitN,
|
||||
})
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
|
||||
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
||||
// Decode into intermediate type.
|
||||
var o retentionPolicyJSON
|
||||
if err := json.Unmarshal(data, &o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy over properties from intermediate type.
|
||||
rp.Name = o.Name
|
||||
rp.ReplicaN = o.ReplicaN
|
||||
rp.SplitN = o.SplitN
|
||||
rp.Duration = o.Duration
|
||||
rp.Shards = o.Shards
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
|
||||
type retentionPolicyJSON struct {
|
||||
Name string `json:"name"`
|
||||
ReplicaN uint32 `json:"replicaN,omitempty"`
|
||||
SplitN uint32 `json:"splitN,omitempty"`
|
||||
Duration time.Duration `json:"duration,omitempty"`
|
||||
Shards []*Shard `json:"shards,omitempty"`
|
||||
}
|
||||
|
||||
// RetentionPolicies represents a list of shard policies.
|
||||
type RetentionPolicies []*RetentionPolicy
|
||||
|
||||
// Shards returns a list of all shards for all policies.
|
||||
func (rps RetentionPolicies) Shards() []*Shard {
|
||||
var shards []*Shard
|
||||
for _, rp := range rps {
|
||||
shards = append(shards, rp.Shards...)
|
||||
}
|
||||
return shards
|
||||
}
|
||||
|
||||
// Filter represents a tag filter when looking up other tags or measurements.
|
||||
type Filter struct {
|
||||
Not bool
|
||||
Key string
|
||||
Value string
|
||||
Regex *regexp.Regexp
|
||||
}
|
||||
|
||||
type Filters []*Filter
|
||||
|
||||
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
|
||||
// intersection of collections of series ids.
|
||||
type SeriesIDs []uint32
|
||||
|
||||
func (p SeriesIDs) Len() int { return len(p) }
|
||||
func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
// Equals assumes that both are sorted. This is by design, no touchy!
|
||||
func (a SeriesIDs) Equals(seriesIDs SeriesIDs) bool {
|
||||
if len(a) != len(seriesIDs) {
|
||||
return false
|
||||
}
|
||||
for i, s := range seriesIDs {
|
||||
if a[i] != s {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Intersect returns a new collection of series ids in sorted order that is the intersection of the two.
|
||||
// The two collections must already be sorted.
|
||||
func (a SeriesIDs) Intersect(seriesIDs SeriesIDs) SeriesIDs {
|
||||
l := a
|
||||
r := seriesIDs
|
||||
|
||||
// we want to iterate through the shortest one and stop
|
||||
if len(seriesIDs) < len(a) {
|
||||
l = seriesIDs
|
||||
r = a
|
||||
}
|
||||
|
||||
// they're in sorted order so advance the counter as needed.
|
||||
// That is, don't run comparisons against lower values that we've already passed
|
||||
var i, j int
|
||||
|
||||
ids := make([]uint32, 0, len(l))
|
||||
for i < len(l) {
|
||||
if l[i] == r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
i += 1
|
||||
} else {
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
return SeriesIDs(ids)
|
||||
}
|
||||
|
||||
// Union returns a new collection of series ids in sorted order that is the union of the two.
|
||||
// The two collections must already be sorted.
|
||||
func (l SeriesIDs) Union(r SeriesIDs) SeriesIDs {
|
||||
ids := make([]uint32, 0, len(l)+len(r))
|
||||
var i, j int
|
||||
for i < len(l) && j < len(r) {
|
||||
if l[i] == r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
} else {
|
||||
ids = append(ids, r[j])
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
// now append the remainder
|
||||
if i < len(l) {
|
||||
ids = append(ids, l[i:]...)
|
||||
} else if j < len(r) {
|
||||
ids = append(ids, r[j:]...)
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. This is useful for the NOT operator.
|
||||
// The two collections must already be sorted.
|
||||
func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs {
|
||||
var i, j int
|
||||
|
||||
ids := make([]uint32, 0, len(l))
|
||||
for i < len(l) && j < len(r) {
|
||||
if l[i] == r[j] {
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
} else {
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
// append the remainder
|
||||
if i < len(l) {
|
||||
ids = append(ids, l[i:]...)
|
||||
}
|
||||
|
||||
return SeriesIDs(ids)
|
||||
}
|
||||
|
||||
// AddSeries adds the series for the given measurement to the index. Returns false if already present
|
||||
func (t *database) AddSeries(name string, s *Series) bool {
|
||||
// if there is a measurement for this id, it's already been added
|
||||
if t.seriesToMeasurement[s.ID] != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// get or create the measurement index and index it globally and in the measurement
|
||||
idx := t.createMeasurementIfNotExists(name)
|
||||
|
||||
t.seriesToMeasurement[s.ID] = idx
|
||||
t.series[s.ID] = s
|
||||
|
||||
// TODO: add this series to the global tag index
|
||||
|
||||
return idx.addSeries(s)
|
||||
}
|
||||
|
||||
// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
|
||||
func (t *database) createMeasurementIfNotExists(name string) *Measurement {
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
idx = NewMeasurement(name)
|
||||
t.measurements[name] = idx
|
||||
t.names = append(t.names, name)
|
||||
sort.Strings(t.names)
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
// AddField adds a field to the measurement name. Returns false if already present
|
||||
func (t *database) AddField(name string, f *Field) bool {
|
||||
panic("not implemented")
|
||||
return false
|
||||
}
|
||||
|
||||
// MeasurementsForSeriesIDs returns a collection of unique Measurements for the passed in SeriesIDs.
|
||||
func (t *database) MeasurementsForSeriesIDs(seriesIDs SeriesIDs) []*Measurement {
|
||||
measurements := make(map[*Measurement]bool)
|
||||
|
||||
for _, id := range seriesIDs {
|
||||
measurements[t.seriesToMeasurement[id]] = true
|
||||
}
|
||||
|
||||
values := make([]*Measurement, 0, len(measurements))
|
||||
for m, _ := range measurements {
|
||||
values = append(values, m)
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all.
|
||||
// Filters are equivalent to and AND operation. If you want to do an OR, get the series IDs for one set,
|
||||
// then get the series IDs for another set and use the SeriesIDs.Union to combine the two.
|
||||
func (t *database) SeriesIDs(names []string, filters Filters) SeriesIDs {
|
||||
// they want all ids if no filters are specified
|
||||
if len(filters) == 0 {
|
||||
ids := SeriesIDs(make([]uint32, 0))
|
||||
for _, idx := range t.measurements {
|
||||
ids = ids.Union(idx.ids)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
ids := SeriesIDs(make([]uint32, 0))
|
||||
for _, n := range names {
|
||||
ids = ids.Union(t.seriesIDsForName(n, filters))
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
// TagKeys returns a sorted array of unique tag keys for the given measurements.
|
||||
func (t *database) TagKeys(names []string) []string {
|
||||
if len(names) == 0 {
|
||||
names = t.names
|
||||
}
|
||||
|
||||
keys := make(map[string]bool)
|
||||
for _, n := range names {
|
||||
idx := t.measurements[n]
|
||||
if idx != nil {
|
||||
for k, _ := range idx.seriesByTagKeyValue {
|
||||
keys[k] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sortedKeys := make([]string, 0, len(keys))
|
||||
for k, _ := range keys {
|
||||
sortedKeys = append(sortedKeys, k)
|
||||
}
|
||||
sort.Strings(sortedKeys)
|
||||
|
||||
return sortedKeys
|
||||
}
|
||||
|
||||
// TagValues returns a map of unique tag values for the given measurements and key with the given filters applied.
|
||||
// Call .ToSlice() on the result to convert it into a sorted slice of strings.
|
||||
// Filters are equivalent to and AND operation. If you want to do an OR, get the tag values for one set,
|
||||
// then get the tag values for another set and do a union of the two.
|
||||
func (t *database) TagValues(names []string, key string, filters []*Filter) TagValues {
|
||||
values := TagValues(make(map[string]bool))
|
||||
|
||||
// see if they just want all the tag values for this key
|
||||
if len(filters) == 0 {
|
||||
for _, n := range names {
|
||||
idx := t.measurements[n]
|
||||
if idx != nil {
|
||||
values.Union(idx.tagValues(key))
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// they have filters so just get a set of series ids matching them and then get the tag values from those
|
||||
seriesIDs := t.SeriesIDs(names, filters)
|
||||
return t.tagValuesForSeries(key, seriesIDs)
|
||||
}
|
||||
|
||||
// tagValuesForSeries will return a TagValues map of all the unique tag values for a collection of series.
|
||||
func (t *database) tagValuesForSeries(key string, seriesIDs SeriesIDs) TagValues {
|
||||
values := make(map[string]bool)
|
||||
for _, id := range seriesIDs {
|
||||
s := t.series[id]
|
||||
if s == nil {
|
||||
continue
|
||||
}
|
||||
if v, ok := s.Tags[key]; ok {
|
||||
values[v] = true
|
||||
}
|
||||
}
|
||||
return TagValues(values)
|
||||
}
|
||||
|
||||
type TagValues map[string]bool
|
||||
|
||||
// ToSlice returns a sorted slice of the TagValues
|
||||
func (t TagValues) ToSlice() []string {
|
||||
a := make([]string, 0, len(t))
|
||||
for v, _ := range t {
|
||||
a = append(a, v)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return a
|
||||
}
|
||||
|
||||
// Union will modify the receiver by merging in the passed in values.
|
||||
func (l TagValues) Union(r TagValues) {
|
||||
for v, _ := range r {
|
||||
l[v] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Intersect will modify the receiver by keeping only the keys that exist in the passed in values
|
||||
func (l TagValues) Intersect(r TagValues) {
|
||||
for v, _ := range l {
|
||||
if _, ok := r[v]; !ok {
|
||||
delete(l, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
|
||||
func (t *database) seriesIDsForName(name string, filters Filters) SeriesIDs {
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// process the filters one at a time to get the list of ids they return
|
||||
idsPerFilter := make([]SeriesIDs, len(filters), len(filters))
|
||||
for i, filter := range filters {
|
||||
idsPerFilter[i] = idx.seriesIDs(filter)
|
||||
}
|
||||
|
||||
// collapse the set of ids
|
||||
allIDs := idsPerFilter[0]
|
||||
for i := 1; i < len(filters); i++ {
|
||||
allIDs = allIDs.Intersect(idsPerFilter[i])
|
||||
}
|
||||
|
||||
return allIDs
|
||||
}
|
||||
|
||||
// MeasurementBySeriesID returns the Measurement that is the parent of the given series id.
|
||||
func (t *database) MeasurementBySeriesID(id uint32) *Measurement {
|
||||
return t.seriesToMeasurement[id]
|
||||
}
|
||||
|
||||
// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set.
|
||||
func (t *database) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) {
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return idx, idx.seriesByTags(tags)
|
||||
}
|
||||
|
||||
// SereiesByID returns the Series that has the given id.
|
||||
func (t *database) SeriesByID(id uint32) *Series {
|
||||
return t.series[id]
|
||||
}
|
||||
|
||||
// Measurements returns all measurements that match the given filters.
|
||||
func (t *database) Measurements(filters []*Filter) []*Measurement {
|
||||
measurements := make([]*Measurement, 0, len(t.measurements))
|
||||
for _, idx := range t.measurements {
|
||||
measurements = append(measurements, idx.measurement)
|
||||
}
|
||||
return measurements
|
||||
}
|
||||
|
||||
// Names returns all measuremet names in sorted order.
|
||||
func (t *database) Names() []string {
|
||||
return t.names
|
||||
}
|
||||
|
||||
// DropSeries will clear the index of all references to a series.
|
||||
func (t *database) DropSeries(id uint32) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// DropMeasurement will clear the index of all references to a measurement and its child series.
|
||||
func (t *database) DropMeasurement(name string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// used to convert the tag set to bytes for use as a lookup key
|
||||
func marshalTags(tags map[string]string) []byte {
|
||||
s := make([]string, 0, len(tags))
|
||||
// pull out keys to sort
|
||||
for k := range tags {
|
||||
s = append(s, k)
|
||||
}
|
||||
sort.Strings(s)
|
||||
|
||||
// now append on the key values in key sorted order
|
||||
for _, k := range s {
|
||||
s = append(s, tags[k])
|
||||
}
|
||||
return []byte(strings.Join(s, "|"))
|
||||
}
|
|
@ -1,17 +1,15 @@
|
|||
package influxdb_test
|
||||
package influxdb
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
// Ensure that the index will return a sorted array of measurement names.
|
||||
func TestIndex_Names(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
func TestDatabase_Names(t *testing.T) {
|
||||
idx := databaseWithFixtureData()
|
||||
|
||||
r := idx.Names()
|
||||
exp := []string{"another_thing", "cpu_load", "key_count", "queue_depth"}
|
||||
|
@ -21,12 +19,12 @@ func TestIndex_Names(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure that we can get the measurement by the series ID.
|
||||
func TestIndex_MeasurementBySeriesID(t *testing.T) {
|
||||
idx := influxdb.NewIndex()
|
||||
m := &influxdb.Measurement{
|
||||
func TestDatabase_MeasurementBySeriesID(t *testing.T) {
|
||||
idx := newDatabase()
|
||||
m := &Measurement{
|
||||
Name: "cpu_load",
|
||||
}
|
||||
s := &influxdb.Series{
|
||||
s := &Series{
|
||||
ID: uint32(1),
|
||||
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
|
||||
}
|
||||
|
@ -34,32 +32,32 @@ func TestIndex_MeasurementBySeriesID(t *testing.T) {
|
|||
// add it and see if we can look it up
|
||||
idx.AddSeries(m.Name, s)
|
||||
mm := idx.MeasurementBySeriesID(uint32(1))
|
||||
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
}
|
||||
|
||||
// now test that we can add another
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(2),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
|
||||
idx.AddSeries(m.Name, s)
|
||||
mm = idx.MeasurementBySeriesID(uint32(2))
|
||||
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
}
|
||||
|
||||
mm = idx.MeasurementBySeriesID(uint32(1))
|
||||
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we can get an array of unique measurements by a collection of series IDs.
|
||||
func TestIndex_MeasurementsBySeriesIDs(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
func TestDatabase_MeasurementsBySeriesIDs(t *testing.T) {
|
||||
idx := databaseWithFixtureData()
|
||||
|
||||
ids := influxdb.SeriesIDs([]uint32{uint32(1), uint32(4)})
|
||||
ids := SeriesIDs([]uint32{uint32(1), uint32(4)})
|
||||
names := make([]string, 0)
|
||||
for _, m := range idx.MeasurementsForSeriesIDs(ids) {
|
||||
names = append(names, m.Name)
|
||||
|
@ -72,28 +70,28 @@ func TestIndex_MeasurementsBySeriesIDs(t *testing.T) {
|
|||
}
|
||||
|
||||
// Ensure that we can get the series object by the series ID.
|
||||
func TestIndex_SeriesBySeriesID(t *testing.T) {
|
||||
idx := influxdb.NewIndex()
|
||||
func TestDatabase_SeriesBySeriesID(t *testing.T) {
|
||||
idx := newDatabase()
|
||||
|
||||
// now test that we can add another
|
||||
s := &influxdb.Series{
|
||||
s := &Series{
|
||||
ID: uint32(2),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
|
||||
idx.AddSeries("foo", s)
|
||||
ss := idx.SeriesByID(uint32(2))
|
||||
if mustMarshalJSON(s) != mustMarshalJSON(ss) {
|
||||
if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we can get the measurement and series objects out based on measurement and tags.
|
||||
func TestIndex_MeasurementAndSeries(t *testing.T) {
|
||||
idx := influxdb.NewIndex()
|
||||
m := &influxdb.Measurement{
|
||||
func TestDatabase_MeasurementAndSeries(t *testing.T) {
|
||||
idx := newDatabase()
|
||||
m := &Measurement{
|
||||
Name: "cpu_load",
|
||||
}
|
||||
s := &influxdb.Series{
|
||||
s := &Series{
|
||||
ID: uint32(1),
|
||||
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"},
|
||||
}
|
||||
|
@ -101,30 +99,30 @@ func TestIndex_MeasurementAndSeries(t *testing.T) {
|
|||
// add it and see if we can look it up by name and tags
|
||||
idx.AddSeries(m.Name, s)
|
||||
mm, ss := idx.MeasurementAndSeries(m.Name, s.Tags)
|
||||
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
} else if mustMarshalJSON(s) != mustMarshalJSON(ss) {
|
||||
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
}
|
||||
|
||||
// now test that we can add another
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(2),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
|
||||
idx.AddSeries(m.Name, s)
|
||||
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)
|
||||
if mustMarshalJSON(m) != mustMarshalJSON(mm) {
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
} else if mustMarshalJSON(s) != mustMarshalJSON(ss) {
|
||||
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we can get the series IDs for measurements without any filters.
|
||||
func TestIndex_SeriesIDs(t *testing.T) {
|
||||
idx := influxdb.NewIndex()
|
||||
s := &influxdb.Series{
|
||||
func TestDatabase_SeriesIDs(t *testing.T) {
|
||||
idx := newDatabase()
|
||||
s := &Series{
|
||||
ID: uint32(1),
|
||||
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
|
||||
|
||||
|
@ -141,7 +139,7 @@ func TestIndex_SeriesIDs(t *testing.T) {
|
|||
}
|
||||
|
||||
// now test that we can add another
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(2),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
added = idx.AddSeries("cpu_load", s)
|
||||
|
@ -156,7 +154,7 @@ func TestIndex_SeriesIDs(t *testing.T) {
|
|||
}
|
||||
|
||||
// now add another in a different measurement
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(3),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
added = idx.AddSeries("network_in", s)
|
||||
|
@ -171,12 +169,12 @@ func TestIndex_SeriesIDs(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
func TestDatabase_SeriesIDsWhereFilter(t *testing.T) {
|
||||
idx := databaseWithFixtureData()
|
||||
|
||||
var tests = []struct {
|
||||
names []string
|
||||
filters []*influxdb.Filter
|
||||
filters []*Filter
|
||||
result []uint32
|
||||
}{
|
||||
// match against no tags
|
||||
|
@ -188,9 +186,9 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// match against all tags
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "servera.influx.com"},
|
||||
&influxdb.Filter{Key: "region", Value: "uswest"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "servera.influx.com"},
|
||||
&Filter{Key: "region", Value: "uswest"},
|
||||
},
|
||||
result: []uint32{uint32(1)},
|
||||
},
|
||||
|
@ -198,8 +196,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// match against one tag
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "region", Value: "uswest"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "region", Value: "uswest"},
|
||||
},
|
||||
result: []uint32{uint32(1), uint32(2)},
|
||||
},
|
||||
|
@ -207,8 +205,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// match against one tag, single result
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "servera.influx.com"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "servera.influx.com"},
|
||||
},
|
||||
result: []uint32{uint32(1)},
|
||||
},
|
||||
|
@ -216,8 +214,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against tag key that doesn't exist returns empty
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "foo", Value: "bar"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "foo", Value: "bar"},
|
||||
},
|
||||
result: []uint32{},
|
||||
},
|
||||
|
@ -225,8 +223,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against tag value that doesn't exist returns empty
|
||||
{
|
||||
names: []string{"cpu_load"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "foo"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "foo"},
|
||||
},
|
||||
result: []uint32{},
|
||||
},
|
||||
|
@ -234,8 +232,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag NOT value
|
||||
{
|
||||
names: []string{"key_count"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "region", Value: "useast", Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "region", Value: "useast", Not: true},
|
||||
},
|
||||
result: []uint32{uint32(3)},
|
||||
},
|
||||
|
@ -243,8 +241,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag NOT null
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "app", Value: "", Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "app", Value: "", Not: true},
|
||||
},
|
||||
result: []uint32{uint32(6)},
|
||||
},
|
||||
|
@ -252,9 +250,9 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag value and another tag NOT value
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "name", Value: "high priority"},
|
||||
&influxdb.Filter{Key: "app", Value: "paultown", Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "name", Value: "high priority"},
|
||||
&Filter{Key: "app", Value: "paultown", Not: true},
|
||||
},
|
||||
result: []uint32{uint32(5), uint32(7)},
|
||||
},
|
||||
|
@ -262,8 +260,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag value matching regex
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
},
|
||||
result: []uint32{uint32(6), uint32(7)},
|
||||
},
|
||||
|
@ -271,9 +269,9 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag value matching regex and other tag value matching value
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "name", Value: "high priority"},
|
||||
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "name", Value: "high priority"},
|
||||
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
},
|
||||
result: []uint32{uint32(6), uint32(7)},
|
||||
},
|
||||
|
@ -281,8 +279,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag value NOT matching regex
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
|
||||
},
|
||||
result: []uint32{uint32(5)},
|
||||
},
|
||||
|
@ -290,9 +288,9 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against a tag value NOT matching regex and other tag value matching value
|
||||
{
|
||||
names: []string{"queue_depth"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
|
||||
&influxdb.Filter{Key: "name", Value: "high priority"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*"), Not: true},
|
||||
&Filter{Key: "name", Value: "high priority"},
|
||||
},
|
||||
result: []uint32{uint32(5)},
|
||||
},
|
||||
|
@ -300,8 +298,8 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
// query against multiple measurements
|
||||
{
|
||||
names: []string{"cpu_load", "key_count"},
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "region", Value: "uswest"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "region", Value: "uswest"},
|
||||
},
|
||||
result: []uint32{uint32(1), uint32(2), uint32(3)},
|
||||
},
|
||||
|
@ -309,15 +307,15 @@ func TestIndex_SeriesIDsWhereFilter(t *testing.T) {
|
|||
|
||||
for i, tt := range tests {
|
||||
r := idx.SeriesIDs(tt.names, tt.filters)
|
||||
expectedIDs := influxdb.SeriesIDs(tt.result)
|
||||
expectedIDs := SeriesIDs(tt.result)
|
||||
if !r.Equals(expectedIDs) {
|
||||
t.Fatalf("%d: filters: %s: result mismatch:\n exp=%s\n got=%s", i, mustMarshalJSON(tt.filters), mustMarshalJSON(expectedIDs), mustMarshalJSON(r))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_TagKeys(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
func TestDatabase_TagKeys(t *testing.T) {
|
||||
idx := databaseWithFixtureData()
|
||||
|
||||
var tests = []struct {
|
||||
names []string
|
||||
|
@ -345,13 +343,13 @@ func TestIndex_TagKeys(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
||||
idx := indexWithFixtureData()
|
||||
func TestDatabase_TagValuesWhereFilter(t *testing.T) {
|
||||
idx := databaseWithFixtureData()
|
||||
|
||||
var tests = []struct {
|
||||
names []string
|
||||
key string
|
||||
filters []*influxdb.Filter
|
||||
filters []*Filter
|
||||
result []string
|
||||
}{
|
||||
// get the tag values across multiple measurements
|
||||
|
@ -367,8 +365,8 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
{
|
||||
names: []string{"key_count"},
|
||||
key: "region",
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "serverc.influx.com"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "serverc.influx.com"},
|
||||
},
|
||||
result: []string{"uswest"},
|
||||
},
|
||||
|
@ -377,8 +375,8 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
{
|
||||
names: []string{"key_count"},
|
||||
key: "region",
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "serverc.influx.com", Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "serverc.influx.com", Not: true},
|
||||
},
|
||||
result: []string{"useast"},
|
||||
},
|
||||
|
@ -387,9 +385,9 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
{
|
||||
names: []string{"key_count"},
|
||||
key: "region",
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Value: "serverc.influx.com"},
|
||||
&influxdb.Filter{Key: "service", Value: "redis"},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Value: "serverc.influx.com"},
|
||||
&Filter{Key: "service", Value: "redis"},
|
||||
},
|
||||
result: []string{"uswest"},
|
||||
},
|
||||
|
@ -398,8 +396,8 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
{
|
||||
names: []string{"queue_depth"},
|
||||
key: "name",
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "app", Regex: regexp.MustCompile("paul.*")},
|
||||
},
|
||||
result: []string{"high priority"},
|
||||
},
|
||||
|
@ -408,8 +406,8 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
{
|
||||
names: []string{"key_count"},
|
||||
key: "region",
|
||||
filters: []*influxdb.Filter{
|
||||
&influxdb.Filter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
|
||||
filters: []*Filter{
|
||||
&Filter{Key: "host", Regex: regexp.MustCompile("serverd.*"), Not: true},
|
||||
},
|
||||
result: []string{"uswest"},
|
||||
},
|
||||
|
@ -423,22 +421,22 @@ func TestIndex_TagValuesWhereFilter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndex_DropSeries(t *testing.T) {
|
||||
func TestDatabase_DropSeries(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
func TestIndex_DropMeasurement(t *testing.T) {
|
||||
func TestDatabase_DropMeasurement(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
func TestIndex_FieldKeys(t *testing.T) {
|
||||
func TestDatabase_FieldKeys(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// indexWithFixtureData returns a populated Index for use in many of the filtering tests
|
||||
func indexWithFixtureData() *influxdb.Index {
|
||||
idx := influxdb.NewIndex()
|
||||
s := &influxdb.Series{
|
||||
// databaseWithFixtureData returns a populated Index for use in many of the filtering tests
|
||||
func databaseWithFixtureData() *database {
|
||||
idx := newDatabase()
|
||||
s := &Series{
|
||||
ID: uint32(1),
|
||||
Tags: map[string]string{"host": "servera.influx.com", "region": "uswest"}}
|
||||
|
||||
|
@ -447,7 +445,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(2),
|
||||
Tags: map[string]string{"host": "serverb.influx.com", "region": "uswest"}}
|
||||
|
||||
|
@ -456,7 +454,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(3),
|
||||
Tags: map[string]string{"host": "serverc.influx.com", "region": "uswest", "service": "redis"}}
|
||||
|
||||
|
@ -465,7 +463,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(4),
|
||||
Tags: map[string]string{"host": "serverd.influx.com", "region": "useast", "service": "redis"}}
|
||||
|
||||
|
@ -474,7 +472,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(5),
|
||||
Tags: map[string]string{"name": "high priority"}}
|
||||
|
||||
|
@ -483,7 +481,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(6),
|
||||
Tags: map[string]string{"name": "high priority", "app": "paultown"}}
|
||||
|
||||
|
@ -492,7 +490,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(7),
|
||||
Tags: map[string]string{"name": "high priority", "app": "paulcountry"}}
|
||||
|
||||
|
@ -501,7 +499,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return nil
|
||||
}
|
||||
|
||||
s = &influxdb.Series{
|
||||
s = &Series{
|
||||
ID: uint32(8),
|
||||
Tags: map[string]string{"a": "b"}}
|
||||
|
||||
|
@ -513,7 +511,7 @@ func indexWithFixtureData() *influxdb.Index {
|
|||
return idx
|
||||
}
|
||||
|
||||
func TestIndex_SeriesIDsIntersect(t *testing.T) {
|
||||
func TestDatabase_SeriesIDsIntersect(t *testing.T) {
|
||||
var tests = []struct {
|
||||
expected []uint32
|
||||
left []uint32
|
||||
|
@ -563,14 +561,14 @@ func TestIndex_SeriesIDsIntersect(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
a := influxdb.SeriesIDs(tt.left).Intersect(tt.right)
|
||||
a := SeriesIDs(tt.left).Intersect(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s intersect %s: result mismatch:\n exp=%s\n got=%s", i, influxdb.SeriesIDs(tt.left), influxdb.SeriesIDs(tt.right), influxdb.SeriesIDs(tt.expected), influxdb.SeriesIDs(a))
|
||||
t.Fatalf("%d: %s intersect %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_SeriesIDsUnion(t *testing.T) {
|
||||
func TestDatabase_SeriesIDsUnion(t *testing.T) {
|
||||
var tests = []struct {
|
||||
expected []uint32
|
||||
left []uint32
|
||||
|
@ -620,14 +618,14 @@ func TestIndex_SeriesIDsUnion(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
a := influxdb.SeriesIDs(tt.left).Union(tt.right)
|
||||
a := SeriesIDs(tt.left).Union(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s union %s: result mismatch:\n exp=%s\n got=%s", i, influxdb.SeriesIDs(tt.left), influxdb.SeriesIDs(tt.right), influxdb.SeriesIDs(tt.expected), influxdb.SeriesIDs(a))
|
||||
t.Fatalf("%d: %s union %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndex_SeriesIDsReject(t *testing.T) {
|
||||
func TestDatabase_SeriesIDsReject(t *testing.T) {
|
||||
var tests = []struct {
|
||||
expected []uint32
|
||||
left []uint32
|
||||
|
@ -677,9 +675,9 @@ func TestIndex_SeriesIDsReject(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
a := influxdb.SeriesIDs(tt.left).Reject(tt.right)
|
||||
a := SeriesIDs(tt.left).Reject(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s reject %s: result mismatch:\n exp=%s\n got=%s", i, influxdb.SeriesIDs(tt.left), influxdb.SeriesIDs(tt.right), influxdb.SeriesIDs(tt.expected), influxdb.SeriesIDs(a))
|
||||
t.Fatalf("%d: %s reject %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
408
index.go
408
index.go
|
@ -1,408 +0,0 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Index is the in memory index structure for answering queries about measurements, tags
|
||||
// and series within a database.
|
||||
type Index struct {
|
||||
mu sync.RWMutex
|
||||
measurements map[string]*Measurement // measurement name to object and index
|
||||
seriesToMeasurement map[uint32]*Measurement // map series id to its measurement
|
||||
series map[uint32]*Series // map series id to the Series object
|
||||
names []string // sorted list of the measurement names
|
||||
}
|
||||
|
||||
func NewIndex() *Index {
|
||||
return &Index{
|
||||
measurements: make(map[string]*Measurement),
|
||||
seriesToMeasurement: make(map[uint32]*Measurement),
|
||||
series: make(map[uint32]*Series),
|
||||
names: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
// Filter represents a tag filter when looking up other tags or measurements.
|
||||
type Filter struct {
|
||||
Not bool
|
||||
Key string
|
||||
Value string
|
||||
Regex *regexp.Regexp
|
||||
}
|
||||
|
||||
type Filters []*Filter
|
||||
|
||||
// SeriesIDs is a convenience type for sorting, checking equality, and doing union and
|
||||
// intersection of collections of series ids.
|
||||
type SeriesIDs []uint32
|
||||
|
||||
func (p SeriesIDs) Len() int { return len(p) }
|
||||
func (p SeriesIDs) Less(i, j int) bool { return p[i] < p[j] }
|
||||
func (p SeriesIDs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
// Equals assumes that both are sorted. This is by design, no touchy!
|
||||
func (a SeriesIDs) Equals(seriesIDs SeriesIDs) bool {
|
||||
if len(a) != len(seriesIDs) {
|
||||
return false
|
||||
}
|
||||
for i, s := range seriesIDs {
|
||||
if a[i] != s {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Intersect returns a new collection of series ids in sorted order that is the intersection of the two.
|
||||
// The two collections must already be sorted.
|
||||
func (a SeriesIDs) Intersect(seriesIDs SeriesIDs) SeriesIDs {
|
||||
l := a
|
||||
r := seriesIDs
|
||||
|
||||
// we want to iterate through the shortest one and stop
|
||||
if len(seriesIDs) < len(a) {
|
||||
l = seriesIDs
|
||||
r = a
|
||||
}
|
||||
|
||||
// they're in sorted order so advance the counter as needed.
|
||||
// That is, don't run comparisons against lower values that we've already passed
|
||||
var i, j int
|
||||
|
||||
ids := make([]uint32, 0, len(l))
|
||||
for i < len(l) {
|
||||
if l[i] == r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
i += 1
|
||||
} else {
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
return SeriesIDs(ids)
|
||||
}
|
||||
|
||||
// Union returns a new collection of series ids in sorted order that is the union of the two.
|
||||
// The two collections must already be sorted.
|
||||
func (l SeriesIDs) Union(r SeriesIDs) SeriesIDs {
|
||||
ids := make([]uint32, 0, len(l)+len(r))
|
||||
var i, j int
|
||||
for i < len(l) && j < len(r) {
|
||||
if l[i] == r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
} else {
|
||||
ids = append(ids, r[j])
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
// now append the remainder
|
||||
if i < len(l) {
|
||||
ids = append(ids, l[i:]...)
|
||||
} else if j < len(r) {
|
||||
ids = append(ids, r[j:]...)
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original. This is useful for the NOT operator.
|
||||
// The two collections must already be sorted.
|
||||
func (l SeriesIDs) Reject(r SeriesIDs) SeriesIDs {
|
||||
var i, j int
|
||||
|
||||
ids := make([]uint32, 0, len(l))
|
||||
for i < len(l) && j < len(r) {
|
||||
if l[i] == r[j] {
|
||||
i += 1
|
||||
j += 1
|
||||
} else if l[i] < r[j] {
|
||||
ids = append(ids, l[i])
|
||||
i += 1
|
||||
} else {
|
||||
j += 1
|
||||
}
|
||||
}
|
||||
|
||||
// append the remainder
|
||||
if i < len(l) {
|
||||
ids = append(ids, l[i:]...)
|
||||
}
|
||||
|
||||
return SeriesIDs(ids)
|
||||
}
|
||||
|
||||
// AddSeries adds the series for the given measurement to the index. Returns false if already present
|
||||
func (t *Index) AddSeries(name string, s *Series) bool {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
// if there is a measurement for this id, it's already been added
|
||||
if t.seriesToMeasurement[s.ID] != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// get or create the measurement index and index it globally and in the measurement
|
||||
idx := t.createMeasurementIfNotExists(name)
|
||||
|
||||
t.seriesToMeasurement[s.ID] = idx
|
||||
t.series[s.ID] = s
|
||||
|
||||
// TODO: add this series to the global tag index
|
||||
|
||||
return idx.addSeries(s)
|
||||
}
|
||||
|
||||
// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
|
||||
func (t *Index) createMeasurementIfNotExists(name string) *Measurement {
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
idx = NewMeasurement(name)
|
||||
t.measurements[name] = idx
|
||||
t.names = append(t.names, name)
|
||||
sort.Strings(t.names)
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
// AddField adds a field to the measurement name. Returns false if already present
|
||||
func (t *Index) AddField(name string, f *Field) bool {
|
||||
panic("not implemented")
|
||||
return false
|
||||
}
|
||||
|
||||
// MeasurementsForSeriesIDs returns a collection of unique Measurements for the passed in SeriesIDs.
|
||||
func (t *Index) MeasurementsForSeriesIDs(seriesIDs SeriesIDs) []*Measurement {
|
||||
measurements := make(map[*Measurement]bool)
|
||||
|
||||
for _, id := range seriesIDs {
|
||||
measurements[t.seriesToMeasurement[id]] = true
|
||||
}
|
||||
|
||||
values := make([]*Measurement, 0, len(measurements))
|
||||
for m, _ := range measurements {
|
||||
values = append(values, m)
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// SeriesIDs returns an array of series ids for the given measurements and filters to be applied to all.
|
||||
// Filters are equivalent to and AND operation. If you want to do an OR, get the series IDs for one set,
|
||||
// then get the series IDs for another set and use the SeriesIDs.Union to combine the two.
|
||||
func (t *Index) SeriesIDs(names []string, filters Filters) SeriesIDs {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
// they want all ids if no filters are specified
|
||||
if len(filters) == 0 {
|
||||
ids := SeriesIDs(make([]uint32, 0))
|
||||
for _, idx := range t.measurements {
|
||||
ids = ids.Union(idx.ids)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
ids := SeriesIDs(make([]uint32, 0))
|
||||
for _, n := range names {
|
||||
ids = ids.Union(t.seriesIDsForName(n, filters))
|
||||
}
|
||||
|
||||
return ids
|
||||
}
|
||||
|
||||
// TagKeys returns a sorted array of unique tag keys for the given measurements.
|
||||
func (t *Index) TagKeys(names []string) []string {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if len(names) == 0 {
|
||||
names = t.names
|
||||
}
|
||||
|
||||
keys := make(map[string]bool)
|
||||
for _, n := range names {
|
||||
idx := t.measurements[n]
|
||||
if idx != nil {
|
||||
for k, _ := range idx.seriesByTagKeyValue {
|
||||
keys[k] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sortedKeys := make([]string, 0, len(keys))
|
||||
for k, _ := range keys {
|
||||
sortedKeys = append(sortedKeys, k)
|
||||
}
|
||||
sort.Strings(sortedKeys)
|
||||
|
||||
return sortedKeys
|
||||
}
|
||||
|
||||
// TagValues returns a map of unique tag values for the given measurements and key with the given filters applied.
|
||||
// Call .ToSlice() on the result to convert it into a sorted slice of strings.
|
||||
// Filters are equivalent to and AND operation. If you want to do an OR, get the tag values for one set,
|
||||
// then get the tag values for another set and do a union of the two.
|
||||
func (t *Index) TagValues(names []string, key string, filters []*Filter) TagValues {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
values := TagValues(make(map[string]bool))
|
||||
|
||||
// see if they just want all the tag values for this key
|
||||
if len(filters) == 0 {
|
||||
for _, n := range names {
|
||||
idx := t.measurements[n]
|
||||
if idx != nil {
|
||||
values.Union(idx.tagValues(key))
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// they have filters so just get a set of series ids matching them and then get the tag values from those
|
||||
seriesIDs := t.SeriesIDs(names, filters)
|
||||
return t.tagValuesForSeries(key, seriesIDs)
|
||||
}
|
||||
|
||||
// tagValuesForSeries will return a TagValues map of all the unique tag values for a collection of series.
|
||||
func (t *Index) tagValuesForSeries(key string, seriesIDs SeriesIDs) TagValues {
|
||||
values := make(map[string]bool)
|
||||
for _, id := range seriesIDs {
|
||||
s := t.series[id]
|
||||
if s == nil {
|
||||
continue
|
||||
}
|
||||
if v, ok := s.Tags[key]; ok {
|
||||
values[v] = true
|
||||
}
|
||||
}
|
||||
return TagValues(values)
|
||||
}
|
||||
|
||||
type TagValues map[string]bool
|
||||
|
||||
// ToSlice returns a sorted slice of the TagValues
|
||||
func (t TagValues) ToSlice() []string {
|
||||
a := make([]string, 0, len(t))
|
||||
for v, _ := range t {
|
||||
a = append(a, v)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return a
|
||||
}
|
||||
|
||||
// Union will modify the receiver by merging in the passed in values.
|
||||
func (l TagValues) Union(r TagValues) {
|
||||
for v, _ := range r {
|
||||
l[v] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Intersect will modify the receiver by keeping only the keys that exist in the passed in values
|
||||
func (l TagValues) Intersect(r TagValues) {
|
||||
for v, _ := range l {
|
||||
if _, ok := r[v]; !ok {
|
||||
delete(l, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//seriesIDsForName is the same as SeriesIDs, but for a specific measurement.
|
||||
func (t *Index) seriesIDsForName(name string, filters Filters) SeriesIDs {
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// process the filters one at a time to get the list of ids they return
|
||||
idsPerFilter := make([]SeriesIDs, len(filters), len(filters))
|
||||
for i, filter := range filters {
|
||||
idsPerFilter[i] = idx.seriesIDs(filter)
|
||||
}
|
||||
|
||||
// collapse the set of ids
|
||||
allIDs := idsPerFilter[0]
|
||||
for i := 1; i < len(filters); i++ {
|
||||
allIDs = allIDs.Intersect(idsPerFilter[i])
|
||||
}
|
||||
|
||||
return allIDs
|
||||
}
|
||||
|
||||
// MeasurementBySeriesID returns the Measurement that is the parent of the given series id.
|
||||
func (t *Index) MeasurementBySeriesID(id uint32) *Measurement {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.seriesToMeasurement[id]
|
||||
}
|
||||
|
||||
// MeasurementAndSeries returns the Measurement and the Series for a given measurement name and tag set.
|
||||
func (t *Index) MeasurementAndSeries(name string, tags map[string]string) (*Measurement, *Series) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
idx := t.measurements[name]
|
||||
if idx == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return idx, idx.seriesByTags(tags)
|
||||
}
|
||||
|
||||
// SereiesByID returns the Series that has the given id.
|
||||
func (t *Index) SeriesByID(id uint32) *Series {
|
||||
return t.series[id]
|
||||
}
|
||||
|
||||
// Measurements returns all measurements that match the given filters.
|
||||
func (t *Index) Measurements(filters []*Filter) []*Measurement {
|
||||
measurements := make([]*Measurement, 0, len(t.measurements))
|
||||
for _, idx := range t.measurements {
|
||||
measurements = append(measurements, idx.measurement)
|
||||
}
|
||||
return measurements
|
||||
}
|
||||
|
||||
// Names returns all measuremet names in sorted order.
|
||||
func (t *Index) Names() []string {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.names
|
||||
}
|
||||
|
||||
// DropSeries will clear the index of all references to a series.
|
||||
func (t *Index) DropSeries(id uint32) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// DropMeasurement will clear the index of all references to a measurement and its child series.
|
||||
func (t *Index) DropMeasurement(name string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// used to convert the tag set to bytes for use as a lookup key
|
||||
func marshalTags(tags map[string]string) []byte {
|
||||
s := make([]string, 0, len(tags))
|
||||
// pull out keys to sort
|
||||
for k := range tags {
|
||||
s = append(s, k)
|
||||
}
|
||||
sort.Strings(s)
|
||||
|
||||
// now append on the key values in key sorted order
|
||||
for _, k := range s {
|
||||
s = append(s, tags[k])
|
||||
}
|
||||
return []byte(strings.Join(s, "|"))
|
||||
}
|
11
metastore.go
11
metastore.go
|
@ -193,25 +193,20 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (*
|
|||
}
|
||||
|
||||
// loops through all the measurements and series in a database
|
||||
func (tx *metatx) measurementIndex(database string) *Index {
|
||||
func (tx *metatx) indexDatabase(db *database) {
|
||||
// get the bucket that holds series data for the database
|
||||
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series"))
|
||||
b := tx.Bucket([]byte("Databases")).Bucket([]byte(db.name)).Bucket([]byte("Series"))
|
||||
c := b.Cursor()
|
||||
|
||||
// create the index and populate it from the series data
|
||||
idx := NewIndex()
|
||||
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
mc := b.Bucket(k).Cursor()
|
||||
name := string(k)
|
||||
for id, v := mc.First(); id != nil; id, v = mc.Next() {
|
||||
var s *Series
|
||||
mustUnmarshalJSON(v, &s)
|
||||
idx.AddSeries(name, s)
|
||||
db.AddSeries(name, s)
|
||||
}
|
||||
}
|
||||
|
||||
return idx
|
||||
}
|
||||
|
||||
// user returns a user from the metastore by name.
|
||||
|
|
381
server.go
381
server.go
|
@ -79,8 +79,7 @@ type Server struct {
|
|||
index uint64 // highest broadcast index seen
|
||||
errors map[uint64]error // message errors
|
||||
|
||||
meta *metastore // metadata store
|
||||
metaIndexes map[string]*Index // map databases to tag indexes
|
||||
meta *metastore // metadata store
|
||||
|
||||
dataNodes map[uint64]*DataNode // data nodes by id
|
||||
|
||||
|
@ -96,7 +95,6 @@ func NewServer(client MessagingClient) *Server {
|
|||
return &Server{
|
||||
client: client,
|
||||
meta: &metastore{},
|
||||
metaIndexes: make(map[string]*Index),
|
||||
dataNodes: make(map[uint64]*DataNode),
|
||||
databases: make(map[string]*database),
|
||||
databasesByShard: make(map[uint64]*database),
|
||||
|
@ -196,15 +194,13 @@ func (s *Server) load() error {
|
|||
|
||||
// load the index
|
||||
log.Printf("Loading metadata index for %s\n", db.name)
|
||||
var idx *Index
|
||||
err := s.meta.view(func(tx *metatx) error {
|
||||
idx = tx.measurementIndex(db.name)
|
||||
tx.indexDatabase(db)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.metaIndexes[db.name] = idx
|
||||
}
|
||||
|
||||
// Load users.
|
||||
|
@ -417,7 +413,6 @@ func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) {
|
|||
|
||||
// Add to databases on server.
|
||||
s.databases[c.Name] = db
|
||||
s.metaIndexes[c.Name] = NewIndex()
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -448,7 +443,6 @@ func (s *Server) applyDeleteDatabase(m *messaging.Message) (err error) {
|
|||
|
||||
// Delete the database entry.
|
||||
delete(s.databases, c.Name)
|
||||
delete(s.metaIndexes, c.Name)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -970,9 +964,7 @@ func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
|
|||
return ErrDatabaseNotFound
|
||||
}
|
||||
|
||||
// make sure another thread didn't add it first
|
||||
idx := s.metaIndexes[c.Database]
|
||||
if _, series := idx.MeasurementAndSeries(c.Name, c.Tags); series != nil {
|
||||
if _, series := db.MeasurementAndSeries(c.Name, c.Tags); series != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -986,7 +978,7 @@ func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
idx.AddSeries(c.Name, series)
|
||||
db.AddSeries(c.Name, series)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1056,12 +1048,13 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
|
|||
func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
|
||||
// Try to find series locally first.
|
||||
s.mu.RLock()
|
||||
idx := s.metaIndexes[database]
|
||||
s.mu.RUnlock()
|
||||
|
||||
idx := s.databases[database]
|
||||
if _, series := idx.MeasurementAndSeries(name, tags); series != nil {
|
||||
s.mu.RUnlock()
|
||||
return series.ID, nil
|
||||
}
|
||||
// release the read lock so the broadcast can actually go through and acquire the write lock
|
||||
s.mu.RUnlock()
|
||||
|
||||
// If it doesn't exist then create a message and broadcast.
|
||||
c := &createSeriesIfNotExistsCommand{Database: database, Name: name, Tags: tags}
|
||||
|
@ -1080,18 +1073,26 @@ func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]
|
|||
|
||||
func (s *Server) MeasurementNames(database string) []string {
|
||||
s.mu.RLock()
|
||||
idx := s.metaIndexes[database]
|
||||
s.mu.RUnlock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return idx.names
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.names
|
||||
}
|
||||
|
||||
func (s *Server) MeasurementSeriesIDs(database, measurement string) SeriesIDs {
|
||||
s.mu.RLock()
|
||||
idx := s.metaIndexes[database]
|
||||
s.mu.RUnlock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return idx.SeriesIDs([]string{measurement}, nil)
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.SeriesIDs([]string{measurement}, nil)
|
||||
}
|
||||
|
||||
// processor runs in a separate goroutine and processes all incoming broker messages.
|
||||
|
@ -1173,344 +1174,6 @@ func (p dataNodes) Len() int { return len(p) }
|
|||
func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID }
|
||||
func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
||||
// database represents a collection of retention policies.
|
||||
type database struct {
|
||||
name string
|
||||
|
||||
policies map[string]*RetentionPolicy // retention policies by name
|
||||
shards map[uint64]*Shard // shards by id
|
||||
|
||||
defaultRetentionPolicy string
|
||||
}
|
||||
|
||||
// newDatabase returns an instance of database.
|
||||
func newDatabase() *database {
|
||||
return &database{
|
||||
policies: make(map[string]*RetentionPolicy),
|
||||
shards: make(map[uint64]*Shard),
|
||||
}
|
||||
}
|
||||
|
||||
// shardByTimestamp returns a shard that owns a given timestamp.
|
||||
func (db *database) shardByTimestamp(policy string, id uint32, timestamp time.Time) (*Shard, error) {
|
||||
p := db.policies[policy]
|
||||
if p == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
return p.shardByTimestamp(id, timestamp), nil
|
||||
}
|
||||
|
||||
// shardsByTimestamp returns all shards that own a given timestamp.
|
||||
func (db *database) shardsByTimestamp(policy string, timestamp time.Time) ([]*Shard, error) {
|
||||
p := db.policies[policy]
|
||||
if p == nil {
|
||||
return nil, ErrRetentionPolicyNotFound
|
||||
}
|
||||
return p.shardsByTimestamp(timestamp), nil
|
||||
}
|
||||
|
||||
// timeBetweenInclusive returns true if t is between min and max, inclusive.
|
||||
func timeBetweenInclusive(t, min, max time.Time) bool {
|
||||
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a database into a JSON-encoded byte slice.
|
||||
func (db *database) MarshalJSON() ([]byte, error) {
|
||||
// Copy over properties to intermediate type.
|
||||
var o databaseJSON
|
||||
o.Name = db.name
|
||||
o.DefaultRetentionPolicy = db.defaultRetentionPolicy
|
||||
for _, rp := range db.policies {
|
||||
o.Policies = append(o.Policies, rp)
|
||||
}
|
||||
for _, s := range db.shards {
|
||||
o.Shards = append(o.Shards, s)
|
||||
}
|
||||
return json.Marshal(&o)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
|
||||
func (db *database) UnmarshalJSON(data []byte) error {
|
||||
// Decode into intermediate type.
|
||||
var o databaseJSON
|
||||
if err := json.Unmarshal(data, &o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy over properties from intermediate type.
|
||||
db.name = o.Name
|
||||
db.defaultRetentionPolicy = o.DefaultRetentionPolicy
|
||||
|
||||
// Copy shard policies.
|
||||
db.policies = make(map[string]*RetentionPolicy)
|
||||
for _, rp := range o.Policies {
|
||||
db.policies[rp.Name] = rp
|
||||
}
|
||||
|
||||
// Copy shards.
|
||||
db.shards = make(map[uint64]*Shard)
|
||||
for _, s := range o.Shards {
|
||||
db.shards[s.ID] = s
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// databaseJSON represents the JSON-serialization format for a database.
|
||||
type databaseJSON struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
||||
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
||||
Shards []*Shard `json:"shards,omitempty"`
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
// structures for indexing tags. These structures are accessed through private methods on the Measurement
|
||||
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
|
||||
// go routine safe access.
|
||||
type Measurement struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Fields []*Fields `json:"fields,omitempty"`
|
||||
|
||||
// in memory index fields
|
||||
series map[string]*Series // sorted tagset string to the series object
|
||||
seriesByID map[uint32]*Series // lookup table for series by their id
|
||||
measurement *Measurement
|
||||
seriesByTagKeyValue map[string]map[string]SeriesIDs // map from tag key to value to sorted set of series ids
|
||||
ids SeriesIDs // sorted list of series IDs in this measurement
|
||||
}
|
||||
|
||||
func NewMeasurement(name string) *Measurement {
|
||||
return &Measurement{
|
||||
Name: name,
|
||||
Fields: make([]*Fields, 0),
|
||||
|
||||
series: make(map[string]*Series),
|
||||
seriesByID: make(map[uint32]*Series),
|
||||
seriesByTagKeyValue: make(map[string]map[string]SeriesIDs),
|
||||
ids: SeriesIDs(make([]uint32, 0)),
|
||||
}
|
||||
}
|
||||
|
||||
// addSeries will add a series to the measurementIndex. Returns false if already present
|
||||
func (m *Measurement) addSeries(s *Series) bool {
|
||||
if _, ok := m.seriesByID[s.ID]; ok {
|
||||
return false
|
||||
}
|
||||
m.seriesByID[s.ID] = s
|
||||
tagset := string(marshalTags(s.Tags))
|
||||
m.series[tagset] = s
|
||||
m.ids = append(m.ids, s.ID)
|
||||
// the series ID should always be higher than all others because it's a new
|
||||
// series. So don't do the sort if we don't have to.
|
||||
if len(m.ids) > 1 && m.ids[len(m.ids)-1] < m.ids[len(m.ids)-2] {
|
||||
sort.Sort(m.ids)
|
||||
}
|
||||
|
||||
// add this series id to the tag index on the measurement
|
||||
for k, v := range s.Tags {
|
||||
valueMap := m.seriesByTagKeyValue[k]
|
||||
if valueMap == nil {
|
||||
valueMap = make(map[string]SeriesIDs)
|
||||
m.seriesByTagKeyValue[k] = valueMap
|
||||
}
|
||||
ids := valueMap[v]
|
||||
ids = append(ids, s.ID)
|
||||
|
||||
// most of the time the series ID will be higher than all others because it's a new
|
||||
// series. So don't do the sort if we don't have to.
|
||||
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
|
||||
sort.Sort(ids)
|
||||
}
|
||||
valueMap[v] = ids
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// seriesByTags returns the Series that matches the given tagset.
|
||||
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
|
||||
return m.series[string(marshalTags(tags))]
|
||||
}
|
||||
|
||||
// sereisIDs returns the series ids for a given filter
|
||||
func (m *Measurement) seriesIDs(filter *Filter) (ids SeriesIDs) {
|
||||
values := m.seriesByTagKeyValue[filter.Key]
|
||||
if values == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// hanlde regex filters
|
||||
if filter.Regex != nil {
|
||||
for k, v := range values {
|
||||
if filter.Regex.MatchString(k) {
|
||||
if ids == nil {
|
||||
ids = v
|
||||
} else {
|
||||
ids = ids.Union(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
if filter.Not {
|
||||
ids = m.ids.Reject(ids)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// this is for the value is not null query
|
||||
if filter.Not && filter.Value == "" {
|
||||
for _, v := range values {
|
||||
if ids == nil {
|
||||
ids = v
|
||||
} else {
|
||||
ids.Intersect(v)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// get the ids that have the given key/value tag pair
|
||||
ids = SeriesIDs(values[filter.Value])
|
||||
|
||||
// filter out these ids from the entire set if it's a not query
|
||||
if filter.Not {
|
||||
ids = m.ids.Reject(ids)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// tagValues returns an array of unique tag values for the given key
|
||||
func (m *Measurement) tagValues(key string) TagValues {
|
||||
tags := m.seriesByTagKeyValue[key]
|
||||
values := make(map[string]bool, len(tags))
|
||||
for k, _ := range tags {
|
||||
values[k] = true
|
||||
}
|
||||
return TagValues(values)
|
||||
}
|
||||
|
||||
type Measurements []*Measurement
|
||||
|
||||
// Field represents a series field.
|
||||
type Field struct {
|
||||
ID uint8 `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Type FieldType `json:"field"`
|
||||
}
|
||||
|
||||
type FieldType int
|
||||
|
||||
const (
|
||||
Int64 FieldType = iota
|
||||
Float64
|
||||
String
|
||||
Boolean
|
||||
Binary
|
||||
)
|
||||
|
||||
// Fields represents a list of fields.
|
||||
type Fields []*Field
|
||||
|
||||
// Series belong to a Measurement and represent unique time series in a database
|
||||
type Series struct {
|
||||
ID uint32
|
||||
Tags map[string]string
|
||||
}
|
||||
|
||||
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
|
||||
type RetentionPolicy struct {
|
||||
// Unique name within database. Required.
|
||||
Name string
|
||||
|
||||
// Length of time to keep data around
|
||||
Duration time.Duration
|
||||
|
||||
ReplicaN uint32
|
||||
SplitN uint32
|
||||
|
||||
Shards []*Shard
|
||||
}
|
||||
|
||||
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
|
||||
func NewRetentionPolicy(name string) *RetentionPolicy {
|
||||
return &RetentionPolicy{
|
||||
Name: name,
|
||||
ReplicaN: DefaultReplicaN,
|
||||
SplitN: DefaultSplitN,
|
||||
Duration: DefaultShardRetention,
|
||||
}
|
||||
}
|
||||
|
||||
// shardByTimestamp returns the shard in the space that owns a given timestamp for a given series id.
|
||||
// Returns nil if the shard does not exist.
|
||||
func (rp *RetentionPolicy) shardByTimestamp(id uint32, timestamp time.Time) *Shard {
|
||||
shards := rp.shardsByTimestamp(timestamp)
|
||||
if len(shards) > 0 {
|
||||
return shards[int(id)%len(shards)]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rp *RetentionPolicy) shardsByTimestamp(timestamp time.Time) []*Shard {
|
||||
shards := make([]*Shard, 0, rp.SplitN)
|
||||
for _, s := range rp.Shards {
|
||||
if timeBetweenInclusive(timestamp, s.StartTime, s.EndTime) {
|
||||
shards = append(shards, s)
|
||||
}
|
||||
}
|
||||
return shards
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
|
||||
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(&retentionPolicyJSON{
|
||||
Name: rp.Name,
|
||||
Duration: rp.Duration,
|
||||
ReplicaN: rp.ReplicaN,
|
||||
SplitN: rp.SplitN,
|
||||
})
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
|
||||
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
||||
// Decode into intermediate type.
|
||||
var o retentionPolicyJSON
|
||||
if err := json.Unmarshal(data, &o); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Copy over properties from intermediate type.
|
||||
rp.Name = o.Name
|
||||
rp.ReplicaN = o.ReplicaN
|
||||
rp.SplitN = o.SplitN
|
||||
rp.Duration = o.Duration
|
||||
rp.Shards = o.Shards
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
|
||||
type retentionPolicyJSON struct {
|
||||
Name string `json:"name"`
|
||||
ReplicaN uint32 `json:"replicaN,omitempty"`
|
||||
SplitN uint32 `json:"splitN,omitempty"`
|
||||
Duration time.Duration `json:"duration,omitempty"`
|
||||
Shards []*Shard `json:"shards,omitempty"`
|
||||
}
|
||||
|
||||
// RetentionPolicies represents a list of shard policies.
|
||||
type RetentionPolicies []*RetentionPolicy
|
||||
|
||||
// Shards returns a list of all shards for all policies.
|
||||
func (rps RetentionPolicies) Shards() []*Shard {
|
||||
var shards []*Shard
|
||||
for _, rp := range rps {
|
||||
shards = append(shards, rp.Shards...)
|
||||
}
|
||||
return shards
|
||||
}
|
||||
|
||||
// BcryptCost is the cost associated with generating password with Bcrypt.
|
||||
// This setting is lowered during testing to improve test suite performance.
|
||||
var BcryptCost = 10
|
||||
|
|
Loading…
Reference in New Issue