Merge pull request #9164 from influxdata/sgc-inmem-startup

inmem startup improvments
pull/9270/head
Stuart Carnie 2017-12-29 13:33:49 -07:00 committed by GitHub
commit 80f1120c3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 321 additions and 324 deletions

View File

@ -281,8 +281,8 @@ func ParseKeyBytes(buf []byte) ([]byte, Tags) {
return buf[:i], tags
}
func ParseTags(buf []byte) (Tags, error) {
return parseTags(buf), nil
func ParseTags(buf []byte) Tags {
return parseTags(buf)
}
func ParseName(buf []byte) ([]byte, error) {
@ -1528,9 +1528,12 @@ func parseTags(buf []byte) Tags {
return nil
}
tags := make(Tags, 0, bytes.Count(buf, []byte(",")))
tags := make(Tags, bytes.Count(buf, []byte(",")))
p := 0
walkTags(buf, func(key, value []byte) bool {
tags = append(tags, NewTag(key, value))
tags[p].Key = key
tags[p].Value = value
p++
return true
})
return tags

View File

@ -2381,6 +2381,13 @@ func BenchmarkEscapeString_QuotesAndBackslashes(b *testing.B) {
}
}
func BenchmarkParseTags(b *testing.B) {
tags := []byte("cpu,tag0=value0,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5")
for i := 0; i < b.N; i++ {
models.ParseTags(tags)
}
}
func init() {
// Force uint support to be enabled for testing.
models.EnableUintSupport()

View File

@ -157,7 +157,7 @@ RETRY:
keyb := []byte(key)
mm, _ := models.ParseName(keyb)
c.row.measurement = string(mm)
c.tags, _ = models.ParseTags(keyb)
c.tags = models.ParseTags(keyb)
c.filterset = mapValuer{"_name": c.row.measurement}
for _, tag := range c.tags {

View File

@ -690,9 +690,9 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error {
}
if err := e.FileStore.WalkKeys(nil, func(key []byte, typ byte) error {
fieldType, err := tsmFieldTypeToInfluxQLDataType(typ)
if err != nil {
return err
fieldType := BlockTypeToInfluxQLDataType(typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", typ)
}
if err := e.addToIndexFromKey(key, fieldType); err != nil {
@ -1050,9 +1050,9 @@ func (e *Engine) overlay(r io.Reader, basePath string, asNew bool) error {
// lock contention on the index.
merged := merge(readers...)
for v := range merged {
fieldType, err := tsmFieldTypeToInfluxQLDataType(v.typ)
if err != nil {
return err
fieldType := BlockTypeToInfluxQLDataType(v.typ)
if fieldType == influxql.Unknown {
return fmt.Errorf("unknown block type: %v", v.typ)
}
if err := e.addToIndexFromKey(v.key, fieldType); err != nil {
@ -1122,8 +1122,7 @@ func (e *Engine) addToIndexFromKey(key []byte, fieldType influxql.DataType) erro
// Build in-memory index, if necessary.
if e.index.Type() == inmem.IndexName {
tags, _ := models.ParseTags(seriesKey)
if err := e.index.InitializeSeries(seriesKey, name, tags); err != nil {
if err := e.index.InitializeSeries(seriesKey, name, models.ParseTags(seriesKey)); err != nil {
return err
}
}
@ -2703,21 +2702,22 @@ func SeriesFieldKeyBytes(seriesKey, field string) []byte {
return b
}
func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
switch typ {
case BlockFloat64:
return influxql.Float, nil
case BlockInteger:
return influxql.Integer, nil
case BlockUnsigned:
return influxql.Unsigned, nil
case BlockBoolean:
return influxql.Boolean, nil
case BlockString:
return influxql.String, nil
default:
return influxql.Unknown, fmt.Errorf("unknown block type: %v", typ)
var (
blockToFieldType = []influxql.DataType{
BlockFloat64: influxql.Float,
BlockInteger: influxql.Integer,
BlockBoolean: influxql.Boolean,
BlockString: influxql.String,
BlockUnsigned: influxql.Unsigned,
}
)
func BlockTypeToInfluxQLDataType(typ byte) influxql.DataType {
if int(typ) < len(blockToFieldType) {
return blockToFieldType[typ]
}
return influxql.Unknown
}
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.

View File

@ -1391,6 +1391,27 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
}
}
func makeBlockTypeSlice(n int) []byte {
r := make([]byte, n)
b := tsm1.BlockFloat64
m := tsm1.BlockUnsigned + 1
for i := 0; i < len(r); i++ {
r[i] = b % m
}
return r
}
var blockType = influxql.Unknown
func BenchmarkBlockTypeToInfluxQLDataType(b *testing.B) {
t := makeBlockTypeSlice(100)
for i := 0; i < b.N; i++ {
for j := 0; j < len(t); j++ {
blockType = tsm1.BlockTypeToInfluxQLDataType(t[j])
}
}
}
// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
// is not raised.
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {

View File

@ -53,8 +53,8 @@ type Index struct {
fieldset *tsdb.MeasurementFieldSet
// In-memory metadata index, built on load and updated when new series come in
measurements map[string]*Measurement // measurement name to object and index
series map[string]*Series // map series key to the Series object
measurements map[string]*measurement // measurement name to object and index
series map[string]*series // map series key to the Series object
seriesSketch, seriesTSSketch *hll.Plus
measurementsSketch, measurementsTSSketch *hll.Plus
@ -68,8 +68,8 @@ func NewIndex(database string, sfile *tsdb.SeriesFile) *Index {
index := &Index{
database: database,
sfile: sfile,
measurements: make(map[string]*Measurement),
series: make(map[string]*Series),
measurements: make(map[string]*measurement),
series: make(map[string]*series),
}
index.seriesSketch = hll.NewDefaultPlus()
@ -92,7 +92,7 @@ func (i *Index) Database() string {
}
// Series returns a series by key.
func (i *Index) Series(key []byte) (*Series, error) {
func (i *Index) Series(key []byte) (*series, error) {
i.mu.RLock()
s := i.series[string(key)]
i.mu.RUnlock()
@ -117,7 +117,7 @@ func (i *Index) SeriesN() int64 {
}
// Measurement returns the measurement object from the index by the name
func (i *Index) Measurement(name []byte) (*Measurement, error) {
func (i *Index) Measurement(name []byte) (*measurement, error) {
i.mu.RLock()
defer i.mu.RUnlock()
return i.measurements[string(name)], nil
@ -138,11 +138,11 @@ func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, erro
}
// MeasurementsByName returns a list of measurements.
func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error) {
func (i *Index) MeasurementsByName(names [][]byte) ([]*measurement, error) {
i.mu.RLock()
defer i.mu.RUnlock()
a := make([]*Measurement, 0, len(names))
a := make([]*measurement, 0, len(names))
for _, name := range names {
if m := i.measurements[string(name)]; m != nil {
a = append(a, m)
@ -202,14 +202,12 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m
// set the in memory ID for query processing on this shard
// The series key and tags are clone to prevent a memory leak
series := NewSeries([]byte(string(key)), tags.Clone())
series.ID = seriesID
skey := string(key)
ss = newSeries(seriesID, m, skey, tags.Clone())
i.series[skey] = ss
series.SetMeasurement(m)
i.series[string(key)] = series
m.AddSeries(series)
series.AssignShard(shardID, time.Now().UnixNano())
m.AddSeries(ss)
ss.AssignShard(shardID, time.Now().UnixNano())
// Add the series to the series sketch.
i.seriesSketch.Add(key)
@ -219,7 +217,7 @@ func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags m
// CreateMeasurementIndexIfNotExists creates or retrieves an in memory index
// object for the measurement
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement {
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *measurement {
name = escape.Unescape(name)
// See if the measurement exists using a read-lock
@ -239,7 +237,7 @@ func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement {
// and acquire the write lock
m = i.measurements[string(name)]
if m == nil {
m = NewMeasurement(i.database, string(name))
m = newMeasurement(i.database, string(name))
i.measurements[string(name)] = m
// Add the measurement to the measurements sketch.
@ -313,19 +311,19 @@ func (i *Index) TagKeyHasAuthorizedSeries(auth query.Authorizer, name []byte, ke
// possible to get the set of unique series IDs for a given measurement name
// and tag key.
var authorized bool
mm.SeriesByTagKeyValue(key).Range(func(_ string, seriesIDs SeriesIDs) bool {
mm.SeriesByTagKeyValue(key).Range(func(_ string, sIDs seriesIDs) bool {
if auth == nil || auth == query.OpenAuthorizer {
authorized = true
return false
}
for _, id := range seriesIDs {
for _, id := range sIDs {
s := mm.SeriesByID(id)
if s == nil {
continue
}
if auth.AuthorizeSeriesRead(i.database, mm.name, s.Tags()) {
if auth.AuthorizeSeriesRead(i.database, mm.NameBytes, s.Tags) {
authorized = true
return false
}
@ -388,13 +386,13 @@ func (i *Index) MeasurementTagKeyValuesByExpr(auth query.Authorizer, name []byte
if s == nil {
continue
}
if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement().name, s.Tags()) {
if auth != nil && !auth.AuthorizeSeriesRead(i.database, s.Measurement.NameBytes, s.Tags) {
continue
}
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, t := range s.Tags() {
for _, t := range s.Tags {
if idx, ok := keyIdxs[string(t.Key)]; ok {
resultSet[idx].add(string(t.Value))
} else if string(t.Key) > keys[len(keys)-1] {
@ -452,7 +450,7 @@ func (i *Index) TagsForSeries(key string) (models.Tags, error) {
if ss == nil {
return nil, nil
}
return ss.Tags(), nil
return ss.Tags, nil
}
// MeasurementNamesByExpr takes an expression containing only tags and returns a
@ -469,7 +467,7 @@ func (i *Index) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.Expr
a := make([][]byte, 0, len(i.measurements))
for _, m := range i.measurements {
if m.Authorized(auth) {
a = append(a, m.name)
a = append(a, m.NameBytes)
}
}
bytesutil.Sort(a)
@ -561,7 +559,7 @@ func (i *Index) measurementNamesByNameFilter(auth query.Authorizer, op influxql.
}
if matched && m.Authorized(auth) {
names = append(names, m.name)
names = append(names, m.NameBytes)
}
}
bytesutil.Sort(names)
@ -593,7 +591,7 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF
// Check the tag values belonging to the tag key for equivalence to the
// tag value being filtered on.
tagVals.Range(func(tv string, seriesIDs SeriesIDs) bool {
tagVals.Range(func(tv string, seriesIDs seriesIDs) bool {
if !valEqual(tv) {
return true // No match. Keep checking.
}
@ -613,7 +611,7 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF
continue
}
if s != nil && auth.AuthorizeSeriesRead(i.database, m.name, s.Tags()) {
if s != nil && auth.AuthorizeSeriesRead(i.database, m.NameBytes, s.Tags) {
// The Range call can return early as a matching
// tag value with an authorized series has been found.
authorized = true
@ -640,7 +638,7 @@ func (i *Index) measurementNamesByTagFilters(auth query.Authorizer, filter *TagF
// False | True | False
// False | False | True
if tagMatch == (filter.Op == influxql.EQ || filter.Op == influxql.EQREGEX) && authorized {
names = append(names, []byte(m.Name))
names = append(names, m.NameBytes)
}
}
@ -656,7 +654,7 @@ func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
var matches [][]byte
for _, m := range i.measurements {
if re.MatchString(m.Name) {
matches = append(matches, []byte(m.Name))
matches = append(matches, m.NameBytes)
}
}
return matches, nil
@ -714,13 +712,13 @@ func (i *Index) DropSeries(key []byte, ts int64) error {
delete(i.series, k)
// Remove the measurement's reference.
series.Measurement().DropSeries(series)
series.Measurement.DropSeries(series)
// Mark the series as deleted.
series.Delete(ts)
// If the measurement no longer has any series, remove it as well.
if !series.Measurement().HasSeries() {
i.dropMeasurement(series.Measurement().Name)
if !series.Measurement.HasSeries() {
i.dropMeasurement(series.Measurement.Name)
}
return nil
@ -778,7 +776,7 @@ func (i *Index) SetFieldName(measurement []byte, name string) {
// ForEachMeasurementName iterates over each measurement name.
func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
i.mu.RLock()
mms := make(Measurements, 0, len(i.measurements))
mms := make(measurements, 0, len(i.measurements))
for _, m := range i.measurements {
mms = append(mms, m)
}
@ -786,7 +784,7 @@ func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error {
i.mu.RUnlock()
for _, m := range mms {
if err := fn([]byte(m.Name)); err != nil {
if err := fn(m.NameBytes); err != nil {
return err
}
}
@ -931,7 +929,7 @@ func (i *Index) SeriesIDIterator(opt query.IteratorOptions) (tsdb.SeriesIDIterat
defer i.mu.RUnlock()
// Read and sort all measurements.
mms := make(Measurements, 0, len(i.measurements))
mms := make(measurements, 0, len(i.measurements))
for _, mm := range i.measurements {
mms = append(mms, mm)
}
@ -1148,9 +1146,9 @@ func NewShardIndex(id uint64, database, path string, sfile *tsdb.SeriesFile, opt
// seriesIDIterator emits series ids.
type seriesIDIterator struct {
database string
mms Measurements
mms measurements
keys struct {
buf []*Series
buf []*series
i int
}
opt query.IteratorOptions
@ -1179,7 +1177,7 @@ func (itr *seriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
series := itr.keys.buf[itr.keys.i]
itr.keys.i++
if !itr.opt.Authorizer.AuthorizeSeriesRead(itr.database, series.measurement.name, series.tags) {
if !itr.opt.Authorizer.AuthorizeSeriesRead(itr.database, series.Measurement.NameBytes, series.Tags) {
continue
}
@ -1224,20 +1222,20 @@ var errMaxSeriesPerDatabaseExceeded = errors.New("max series per database exceed
type seriesIterator struct {
keys [][]byte
elem series
elem seriesElement
}
type series struct {
type seriesElement struct {
tsdb.SeriesElem
name []byte
tags models.Tags
deleted bool
}
func (s series) Name() []byte { return s.name }
func (s series) Tags() models.Tags { return s.tags }
func (s series) Deleted() bool { return s.deleted }
func (s series) Expr() influxql.Expr { return nil }
func (s seriesElement) Name() []byte { return s.name }
func (s seriesElement) Tags() models.Tags { return s.tags }
func (s seriesElement) Deleted() bool { return s.deleted }
func (s seriesElement) Expr() influxql.Expr { return nil }
func (itr *seriesIterator) Next() tsdb.SeriesElem {
if len(itr.keys) == 0 {

View File

@ -20,43 +20,43 @@ import (
// contains in memory structures for indexing tags. Exported functions are
// goroutine safe while un-exported functions assume the caller will use the
// appropriate locks.
type Measurement struct {
database string
Name string `json:"name,omitempty"`
name []byte // cached version as []byte
type measurement struct {
Database string
Name string `json:"name,omitempty"`
NameBytes []byte // cached version as []byte
mu sync.RWMutex
fieldNames map[string]struct{}
// in-memory index fields
seriesByID map[uint64]*Series // lookup table for series by their id
seriesByTagKeyValue map[string]*TagKeyValue // map from tag key to value to sorted set of series ids
seriesByID map[uint64]*series // lookup table for series by their id
seriesByTagKeyValue map[string]*tagKeyValue // map from tag key to value to sorted set of series ids
// lazyily created sorted series IDs
sortedSeriesIDs SeriesIDs // sorted list of series IDs in this measurement
sortedSeriesIDs seriesIDs // sorted list of series IDs in this measurement
// Indicates whether the seriesByTagKeyValueMap needs to be rebuilt as it contains deleted series
// that waste memory.
dirty bool
}
// NewMeasurement allocates and initializes a new Measurement.
func NewMeasurement(database, name string) *Measurement {
return &Measurement{
database: database,
Name: name,
name: []byte(name),
fieldNames: make(map[string]struct{}),
// newMeasurement allocates and initializes a new Measurement.
func newMeasurement(database, name string) *measurement {
return &measurement{
Database: database,
Name: name,
NameBytes: []byte(name),
seriesByID: make(map[uint64]*Series),
seriesByTagKeyValue: make(map[string]*TagKeyValue),
fieldNames: make(map[string]struct{}),
seriesByID: make(map[uint64]*series),
seriesByTagKeyValue: make(map[string]*tagKeyValue),
}
}
// Authorized determines if this Measurement is authorized to be read, according
// to the provided Authorizer. A measurement is authorized to be read if at
// least one undeleted series from the measurement is authorized to be read.
func (m *Measurement) Authorized(auth query.Authorizer) bool {
func (m *measurement) Authorized(auth query.Authorizer) bool {
// Note(edd): the cost of this check scales linearly with the number of series
// belonging to a measurement, which means it may become expensive when there
// are large numbers of series on a measurement.
@ -68,14 +68,14 @@ func (m *Measurement) Authorized(auth query.Authorizer) bool {
continue
}
if auth == nil || auth.AuthorizeSeriesRead(m.database, m.name, s.tags) {
if auth == nil || auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) {
return true
}
}
return false
}
func (m *Measurement) HasField(name string) bool {
func (m *measurement) HasField(name string) bool {
m.mu.RLock()
_, hasField := m.fieldNames[name]
m.mu.RUnlock()
@ -83,24 +83,24 @@ func (m *Measurement) HasField(name string) bool {
}
// SeriesByID returns a series by identifier.
func (m *Measurement) SeriesByID(id uint64) *Series {
func (m *measurement) SeriesByID(id uint64) *series {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesByID[id]
}
// SeriesByIDMap returns the internal seriesByID map.
func (m *Measurement) SeriesByIDMap() map[uint64]*Series {
func (m *measurement) SeriesByIDMap() map[uint64]*series {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesByID
}
// SeriesByIDSlice returns a list of series by identifiers.
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
func (m *measurement) SeriesByIDSlice(ids []uint64) []*series {
m.mu.RLock()
defer m.mu.RUnlock()
a := make([]*Series, len(ids))
a := make([]*series, len(ids))
for i, id := range ids {
a[i] = m.seriesByID[id]
}
@ -108,7 +108,7 @@ func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
}
// AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
func (m *measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
m.mu.RLock()
defer m.mu.RUnlock()
for _, id := range ids {
@ -120,7 +120,7 @@ func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
}
// SeriesKeysByID returns the a list of keys for a set of ids.
func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte {
func (m *measurement) SeriesKeysByID(ids seriesIDs) [][]byte {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([][]byte, 0, len(ids))
@ -140,7 +140,7 @@ func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte {
}
// SeriesKeys returns the keys of every series in this measurement
func (m *Measurement) SeriesKeys() [][]byte {
func (m *measurement) SeriesKeys() [][]byte {
m.mu.RLock()
defer m.mu.RUnlock()
keys := make([][]byte, 0, len(m.seriesByID))
@ -158,7 +158,7 @@ func (m *Measurement) SeriesKeys() [][]byte {
return keys
}
func (m *Measurement) SeriesIDs() SeriesIDs {
func (m *measurement) SeriesIDs() seriesIDs {
m.mu.RLock()
if len(m.sortedSeriesIDs) == len(m.seriesByID) {
s := m.sortedSeriesIDs
@ -176,7 +176,7 @@ func (m *Measurement) SeriesIDs() SeriesIDs {
m.sortedSeriesIDs = m.sortedSeriesIDs[:0]
if cap(m.sortedSeriesIDs) < len(m.seriesByID) {
m.sortedSeriesIDs = make(SeriesIDs, 0, len(m.seriesByID))
m.sortedSeriesIDs = make(seriesIDs, 0, len(m.seriesByID))
}
for k, v := range m.seriesByID {
@ -192,28 +192,28 @@ func (m *Measurement) SeriesIDs() SeriesIDs {
}
// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (m *Measurement) HasTagKey(k string) bool {
func (m *measurement) HasTagKey(k string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
_, hasTag := m.seriesByTagKeyValue[k]
return hasTag
}
func (m *Measurement) HasTagKeyValue(k, v []byte) bool {
func (m *measurement) HasTagKeyValue(k, v []byte) bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesByTagKeyValue[string(k)].Contains(string(v))
}
// HasSeries returns true if there is at least 1 series under this measurement.
func (m *Measurement) HasSeries() bool {
func (m *measurement) HasSeries() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.seriesByID) > 0
}
// Cardinality returns the number of values associated with the given tag key.
func (m *Measurement) Cardinality(key string) int {
func (m *measurement) Cardinality(key string) int {
var n int
m.mu.RLock()
n = m.cardinality(key)
@ -221,12 +221,12 @@ func (m *Measurement) Cardinality(key string) int {
return n
}
func (m *Measurement) cardinality(key string) int {
func (m *measurement) cardinality(key string) int {
return m.seriesByTagKeyValue[key].Cardinality()
}
// CardinalityBytes returns the number of values associated with the given tag key.
func (m *Measurement) CardinalityBytes(key []byte) int {
func (m *measurement) CardinalityBytes(key []byte) int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesByTagKeyValue[string(key)].Cardinality()
@ -234,7 +234,7 @@ func (m *Measurement) CardinalityBytes(key []byte) int {
// AddSeries adds a series to the measurement's index.
// It returns true if the series was added successfully or false if the series was already present.
func (m *Measurement) AddSeries(s *Series) bool {
func (m *measurement) AddSeries(s *series) bool {
if s == nil {
return false
}
@ -260,10 +260,10 @@ func (m *Measurement) AddSeries(s *Series) bool {
}
// add this series id to the tag index on the measurement
s.ForEachTag(func(t models.Tag) {
for _, t := range s.Tags {
valueMap := m.seriesByTagKeyValue[string(t.Key)]
if valueMap == nil {
valueMap = NewTagKeyValue()
valueMap = newTagKeyValue()
m.seriesByTagKeyValue[string(t.Key)] = valueMap
}
ids := valueMap.LoadByte(t.Value)
@ -274,14 +274,14 @@ func (m *Measurement) AddSeries(s *Series) bool {
if len(ids) > 1 && ids[len(ids)-1] < ids[len(ids)-2] {
sort.Sort(ids)
}
valueMap.Store(string(t.Value), ids)
})
valueMap.StoreByte(t.Value, ids)
}
return true
}
// DropSeries removes a series from the measurement's index.
func (m *Measurement) DropSeries(series *Series) {
func (m *measurement) DropSeries(series *series) {
seriesID := series.ID
m.mu.Lock()
defer m.mu.Unlock()
@ -299,7 +299,7 @@ func (m *Measurement) DropSeries(series *Series) {
m.dirty = true
}
func (m *Measurement) Rebuild() *Measurement {
func (m *measurement) Rebuild() *measurement {
m.mu.RLock()
// Nothing needs to be rebuilt.
@ -309,7 +309,7 @@ func (m *Measurement) Rebuild() *Measurement {
}
// Create a new measurement from the state of the existing measurement
nm := NewMeasurement(m.database, string(m.name))
nm := newMeasurement(m.Database, string(m.NameBytes))
nm.fieldNames = m.fieldNames
m.mu.RUnlock()
@ -339,7 +339,7 @@ func (m *Measurement) Rebuild() *Measurement {
// filters walks the where clause of a select statement and returns a map with all series ids
// matching the where clause and any filter expression that should be applied to each
func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
func (m *measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]influxql.Expr, error) {
if condition == nil {
return m.SeriesIDs(), nil, nil
}
@ -347,7 +347,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
}
// ForEachSeriesByExpr iterates over all series filtered by condition.
func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error {
func (m *measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error {
// Retrieve matching series ids.
ids, _, err := m.filters(condition)
if err != nil {
@ -357,7 +357,7 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags
// Iterate over each series.
for _, id := range ids {
s := m.SeriesByID(id)
if err := fn(s.Tags()); err != nil {
if err := fn(s.Tags); err != nil {
return err
}
}
@ -373,7 +373,7 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) {
func (m *measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*query.TagSet, error) {
// get the unique set of series ids and the filters that should be applied to each
ids, filters, err := m.filters(opt.Condition)
if err != nil {
@ -412,13 +412,13 @@ func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que
continue
}
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.database, m.name, s.Tags()) {
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) {
continue
}
var tagsAsKey []byte
if len(dims) > 0 {
tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags())
tagsAsKey = tsdb.MakeTagsKey(dims, s.Tags)
}
tagSet := tagSets[string(tagsAsKey)]
@ -461,7 +461,7 @@ func (m *Measurement) TagSets(shardID uint64, opt query.IteratorOptions) ([]*que
}
// intersectSeriesFilters performs an intersection for two sets of ids and filter expressions.
func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
func intersectSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) {
// We only want to allocate a slice and map of the smaller size.
var ids []uint64
if len(lids) > len(rids) {
@ -515,7 +515,7 @@ func intersectSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs
}
// unionSeriesFilters performs a union for two sets of ids and filter expressions.
func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
func unionSeriesFilters(lids, rids seriesIDs, lfilters, rfilters FilterExprs) (seriesIDs, FilterExprs) {
ids := make([]uint64, 0, len(lids)+len(rids))
// Setup the filters with the smallest size since we will discard filters
@ -594,14 +594,14 @@ func unionSeriesFilters(lids, rids SeriesIDs, lfilters, rfilters FilterExprs) (S
}
// SeriesIDsByTagKey returns a list of all series for a tag key.
func (m *Measurement) SeriesIDsByTagKey(key []byte) SeriesIDs {
func (m *measurement) SeriesIDsByTagKey(key []byte) seriesIDs {
tagVals := m.seriesByTagKeyValue[string(key)]
if tagVals == nil {
return nil
}
var ids SeriesIDs
tagVals.RangeAll(func(_ string, a SeriesIDs) {
var ids seriesIDs
tagVals.RangeAll(func(_ string, a seriesIDs) {
ids = append(ids, a...)
})
sort.Sort(ids)
@ -609,7 +609,7 @@ func (m *Measurement) SeriesIDsByTagKey(key []byte) SeriesIDs {
}
// SeriesIDsByTagValue returns a list of all series for a tag value.
func (m *Measurement) SeriesIDsByTagValue(key, value []byte) SeriesIDs {
func (m *measurement) SeriesIDsByTagValue(key, value []byte) seriesIDs {
tagVals := m.seriesByTagKeyValue[string(key)]
if tagVals == nil {
return nil
@ -618,14 +618,14 @@ func (m *Measurement) SeriesIDsByTagValue(key, value []byte) SeriesIDs {
}
// IDsForExpr returns the series IDs that are candidates to match the given expression.
func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs {
func (m *measurement) IDsForExpr(n *influxql.BinaryExpr) seriesIDs {
ids, _, _ := m.idsForExpr(n)
return ids
}
// idsForExpr returns a collection of series ids and a filter expression that should
// be used to filter points from those series.
func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Expr, error) {
func (m *measurement) idsForExpr(n *influxql.BinaryExpr) (seriesIDs, influxql.Expr, error) {
// If this binary expression has another binary expression, then this
// is some expression math and we should just pass it to the underlying query.
if _, ok := n.LHS.(*influxql.BinaryExpr); ok {
@ -661,7 +661,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
// if we're looking for series with a specific tag value
if str, ok := value.(*influxql.StringLiteral); ok {
var ids SeriesIDs
var ids seriesIDs
// Special handling for "_name" to match measurement name.
if name.Val == "_name" {
@ -677,22 +677,22 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
ids = tagVals.Load(str.Val)
} else {
// Make a copy of all series ids and mark the ones we need to evict.
seriesIDs := newEvictSeriesIDs(m.SeriesIDs())
sIDs := newEvictSeriesIDs(m.SeriesIDs())
// Go through each slice and mark the values we find as zero so
// they can be removed later.
tagVals.RangeAll(func(_ string, a SeriesIDs) {
seriesIDs.mark(a)
tagVals.RangeAll(func(_ string, a seriesIDs) {
sIDs.mark(a)
})
// Make a new slice with only the remaining ids.
ids = seriesIDs.evict()
ids = sIDs.evict()
}
} else if n.Op == influxql.NEQ {
if str.Val != "" {
ids = m.SeriesIDs().Reject(tagVals.Load(str.Val))
} else {
tagVals.RangeAll(func(_ string, a SeriesIDs) {
tagVals.RangeAll(func(_ string, a seriesIDs) {
ids = append(ids, a...)
})
sort.Sort(ids)
@ -703,7 +703,7 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
// if we're looking for series with a tag value that matches a regex
if re, ok := value.(*influxql.RegexLiteral); ok {
var ids SeriesIDs
var ids seriesIDs
// Special handling for "_name" to match measurement name.
if name.Val == "_name" {
@ -723,24 +723,24 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
// If we should not include the empty string, include series that match our condition.
if empty && n.Op == influxql.EQREGEX {
// See comments above for EQ with a StringLiteral.
seriesIDs := newEvictSeriesIDs(m.SeriesIDs())
tagVals.RangeAll(func(k string, a SeriesIDs) {
sIDs := newEvictSeriesIDs(m.SeriesIDs())
tagVals.RangeAll(func(k string, a seriesIDs) {
if !re.Val.MatchString(k) {
seriesIDs.mark(a)
sIDs.mark(a)
}
})
ids = seriesIDs.evict()
ids = sIDs.evict()
} else if empty && n.Op == influxql.NEQREGEX {
ids = make(SeriesIDs, 0, len(m.SeriesIDs()))
tagVals.RangeAll(func(k string, a SeriesIDs) {
ids = make(seriesIDs, 0, len(m.SeriesIDs()))
tagVals.RangeAll(func(k string, a seriesIDs) {
if !re.Val.MatchString(k) {
ids = append(ids, a...)
}
})
sort.Sort(ids)
} else if !empty && n.Op == influxql.EQREGEX {
ids = make(SeriesIDs, 0, len(m.SeriesIDs()))
tagVals.RangeAll(func(k string, a SeriesIDs) {
ids = make(seriesIDs, 0, len(m.SeriesIDs()))
tagVals.RangeAll(func(k string, a seriesIDs) {
if re.Val.MatchString(k) {
ids = append(ids, a...)
}
@ -748,27 +748,27 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
sort.Sort(ids)
} else if !empty && n.Op == influxql.NEQREGEX {
// See comments above for EQ with a StringLiteral.
seriesIDs := newEvictSeriesIDs(m.SeriesIDs())
tagVals.RangeAll(func(k string, a SeriesIDs) {
sIDs := newEvictSeriesIDs(m.SeriesIDs())
tagVals.RangeAll(func(k string, a seriesIDs) {
if re.Val.MatchString(k) {
seriesIDs.mark(a)
sIDs.mark(a)
}
})
ids = seriesIDs.evict()
ids = sIDs.evict()
}
return ids, nil, nil
}
// compare tag values
if ref, ok := value.(*influxql.VarRef); ok {
var ids SeriesIDs
var ids seriesIDs
if n.Op == influxql.NEQ {
ids = m.SeriesIDs()
}
rhsTagVals := m.seriesByTagKeyValue[ref.Val]
tagVals.RangeAll(func(k string, a SeriesIDs) {
tagVals.RangeAll(func(k string, a seriesIDs) {
tags := a.Intersect(rhsTagVals.Load(k))
if n.Op == influxql.EQ {
ids = ids.Union(tags)
@ -808,7 +808,7 @@ func (fe FilterExprs) Len() int {
// WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and
// a map from those series IDs to filter expressions that should be used to limit points returned in
// the final query result.
func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error) {
func (m *measurement) WalkWhereForSeriesIds(expr influxql.Expr) (seriesIDs, FilterExprs, error) {
switch n := expr.(type) {
case *influxql.BinaryExpr:
switch n.Op {
@ -872,7 +872,7 @@ func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, Filt
// expandExpr returns a list of expressions expanded by all possible tag
// combinations.
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
func (m *measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
// Retrieve list of unique values for each tag.
valuesByTagKey := m.uniqueTagValues(expr)
@ -930,7 +930,7 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
func (m *measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (seriesIDs, error) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
if expr == nil {
@ -954,7 +954,7 @@ func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
}
// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) {
func (m *measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) {
if expr == nil {
set := make(map[string]struct{})
for _, key := range m.TagKeys() {
@ -1022,7 +1022,7 @@ func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, er
}
// tagKeysByFilter will filter the tag keys for the measurement.
func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet {
func (m *measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet {
ss := newStringSet()
for _, key := range m.TagKeys() {
var matched bool
@ -1081,7 +1081,7 @@ func copyTagExprs(a []tagExpr) []tagExpr {
}
// uniqueTagValues returns a list of unique tag values used in an expression.
func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
func (m *measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
// Track unique value per tag.
tags := make(map[string]map[string]struct{})
@ -1131,18 +1131,18 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
}
// Measurements represents a list of *Measurement.
type Measurements []*Measurement
type measurements []*measurement
// Len implements sort.Interface.
func (a Measurements) Len() int { return len(a) }
func (a measurements) Len() int { return len(a) }
// Less implements sort.Interface.
func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
// Swap implements sort.Interface.
func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Measurements) Intersect(other Measurements) Measurements {
func (a measurements) Intersect(other measurements) measurements {
l := a
r := other
@ -1156,7 +1156,7 @@ func (a Measurements) Intersect(other Measurements) Measurements {
// That is, don't run comparisons against lower values that we've already passed
var i, j int
result := make(Measurements, 0, len(l))
result := make(measurements, 0, len(l))
for i < len(l) && j < len(r) {
if l[i].Name == r[j].Name {
result = append(result, l[i])
@ -1172,8 +1172,8 @@ func (a Measurements) Intersect(other Measurements) Measurements {
return result
}
func (a Measurements) Union(other Measurements) Measurements {
result := make(Measurements, 0, len(a)+len(other))
func (a measurements) Union(other measurements) measurements {
result := make(measurements, 0, len(a)+len(other))
var i, j int
for i < len(a) && j < len(other) {
if a[i].Name == other[j].Name {
@ -1199,33 +1199,38 @@ func (a Measurements) Union(other Measurements) Measurements {
return result
}
// Series belong to a Measurement and represent unique time series in a database.
type Series struct {
mu sync.RWMutex
Key string
tags models.Tags
ID uint64
measurement *Measurement
shardIDs map[uint64]struct{} // shards that have this series defined
deleted bool
// series belong to a Measurement and represent unique time series in a database.
type series struct {
// lastModified tracks the last time the series was created. If the series
// already exists and a request to create is received (a no-op), lastModified
// is increased to track that it is still in use.
lastModified int64
// immutable
ID uint64
Measurement *measurement
Key string
Tags models.Tags
mu sync.RWMutex
shardIDs map[uint64]struct{} // shards that have this series defined
deleted bool
}
// NewSeries returns an initialized series struct
func NewSeries(key []byte, tags models.Tags) *Series {
return &Series{
Key: string(key),
tags: tags,
// newSeries returns an initialized series struct
func newSeries(id uint64, m *measurement, key string, tags models.Tags) *series {
return &series{
ID: id,
Measurement: m,
Key: key,
Tags: tags,
shardIDs: make(map[uint64]struct{}),
lastModified: time.Now().UTC().UnixNano(),
}
}
func (s *Series) AssignShard(shardID uint64, ts int64) {
func (s *series) AssignShard(shardID uint64, ts int64) {
atomic.StoreInt64(&s.lastModified, ts)
if s.Assigned(shardID) {
return
@ -1239,7 +1244,7 @@ func (s *Series) AssignShard(shardID uint64, ts int64) {
s.mu.Unlock()
}
func (s *Series) UnassignShard(shardID uint64, ts int64) {
func (s *series) UnassignShard(shardID uint64, ts int64) {
s.mu.Lock()
if s.LastModified() < ts {
delete(s.shardIDs, shardID)
@ -1247,66 +1252,26 @@ func (s *Series) UnassignShard(shardID uint64, ts int64) {
s.mu.Unlock()
}
func (s *Series) Assigned(shardID uint64) bool {
func (s *series) Assigned(shardID uint64) bool {
s.mu.RLock()
_, ok := s.shardIDs[shardID]
s.mu.RUnlock()
return ok
}
func (s *Series) LastModified() int64 {
func (s *series) LastModified() int64 {
return atomic.LoadInt64(&s.lastModified)
}
func (s *Series) ShardN() int {
func (s *series) ShardN() int {
s.mu.RLock()
n := len(s.shardIDs)
s.mu.RUnlock()
return n
}
// Measurement returns the measurement on the series.
func (s *Series) Measurement() *Measurement {
return s.measurement
}
// SetMeasurement sets the measurement on the series.
func (s *Series) SetMeasurement(m *Measurement) {
s.measurement = m
}
// ForEachTag executes fn for every tag. Iteration occurs under lock.
func (s *Series) ForEachTag(fn func(models.Tag)) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, t := range s.tags {
fn(t)
}
}
// Tags returns a copy of the tags under lock.
func (s *Series) Tags() models.Tags {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tags
}
// CopyTags clones the tags on the series in-place,
func (s *Series) CopyTags() {
s.mu.Lock()
defer s.mu.Unlock()
s.tags = s.tags.Clone()
}
// GetTagString returns a tag value under lock.
func (s *Series) GetTagString(key string) string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tags.GetString(key)
}
// Delete marks this series as deleted. A deleted series should not be returned for queries.
func (s *Series) Delete(ts int64) {
func (s *series) Delete(ts int64) {
s.mu.Lock()
if s.LastModified() < ts {
s.deleted = true
@ -1315,7 +1280,7 @@ func (s *Series) Delete(ts int64) {
}
// Deleted indicates if this was previously deleted.
func (s *Series) Deleted() bool {
func (s *series) Deleted() bool {
s.mu.RLock()
v := s.deleted
s.mu.RUnlock()
@ -1326,18 +1291,18 @@ func (s *Series) Deleted() bool {
// ids mapping to a set of tag values.
//
// TODO(edd): This could possibly be replaced by a sync.Map once we use Go 1.9.
type TagKeyValue struct {
type tagKeyValue struct {
mu sync.RWMutex
valueIDs map[string]SeriesIDs
valueIDs map[string]seriesIDs
}
// NewTagKeyValue initialises a new TagKeyValue.
func NewTagKeyValue() *TagKeyValue {
return &TagKeyValue{valueIDs: make(map[string]SeriesIDs)}
func newTagKeyValue() *tagKeyValue {
return &tagKeyValue{valueIDs: make(map[string]seriesIDs)}
}
// Cardinality returns the number of values in the TagKeyValue.
func (t *TagKeyValue) Cardinality() int {
func (t *tagKeyValue) Cardinality() int {
if t == nil {
return 0
}
@ -1348,7 +1313,7 @@ func (t *TagKeyValue) Cardinality() int {
}
// Contains returns true if the TagKeyValue contains value.
func (t *TagKeyValue) Contains(value string) bool {
func (t *tagKeyValue) Contains(value string) bool {
if t == nil {
return false
}
@ -1360,33 +1325,35 @@ func (t *TagKeyValue) Contains(value string) bool {
}
// Load returns the SeriesIDs for the provided tag value.
func (t *TagKeyValue) Load(value string) SeriesIDs {
func (t *tagKeyValue) Load(value string) seriesIDs {
if t == nil {
return nil
}
t.mu.RLock()
defer t.mu.RUnlock()
return t.valueIDs[value]
sIDs := t.valueIDs[value]
t.mu.RUnlock()
return sIDs
}
// LoadByte returns the SeriesIDs for the provided tag value. It makes use of
// Go's compiler optimisation for avoiding a copy when accessing maps with a []byte.
func (t *TagKeyValue) LoadByte(value []byte) SeriesIDs {
func (t *tagKeyValue) LoadByte(value []byte) seriesIDs {
if t == nil {
return nil
}
t.mu.RLock()
defer t.mu.RUnlock()
return t.valueIDs[string(value)]
sIDs := t.valueIDs[string(value)]
t.mu.RUnlock()
return sIDs
}
// Range calls f sequentially on each key and value. A call to Range on a nil
// TagKeyValue is a no-op.
//
// If f returns false then iteration over any remaining keys or values will cease.
func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) {
func (t *tagKeyValue) Range(f func(tagValue string, a seriesIDs) bool) {
if t == nil {
return
}
@ -1402,35 +1369,42 @@ func (t *TagKeyValue) Range(f func(tagValue string, a SeriesIDs) bool) {
// RangeAll calls f sequentially on each key and value. A call to RangeAll on a
// nil TagKeyValue is a no-op.
func (t *TagKeyValue) RangeAll(f func(k string, a SeriesIDs)) {
t.Range(func(k string, a SeriesIDs) bool {
func (t *tagKeyValue) RangeAll(f func(k string, a seriesIDs)) {
t.Range(func(k string, a seriesIDs) bool {
f(k, a)
return true
})
}
// Store stores ids under the value key.
func (t *TagKeyValue) Store(value string, ids SeriesIDs) {
func (t *tagKeyValue) Store(value string, ids seriesIDs) {
t.mu.Lock()
defer t.mu.Unlock()
t.valueIDs[value] = ids
t.mu.Unlock()
}
// StoreByte stores ids under the value key.
func (t *tagKeyValue) StoreByte(value []byte, ids seriesIDs) {
t.mu.Lock()
t.valueIDs[string(value)] = ids
t.mu.Unlock()
}
// SeriesIDs is a convenience type for sorting, checking equality, and doing
// union and intersection of collections of series ids.
type SeriesIDs []uint64
type seriesIDs []uint64
// Len implements sort.Interface.
func (a SeriesIDs) Len() int { return len(a) }
func (a seriesIDs) Len() int { return len(a) }
// Less implements sort.Interface.
func (a SeriesIDs) Less(i, j int) bool { return a[i] < a[j] }
func (a seriesIDs) Less(i, j int) bool { return a[i] < a[j] }
// Swap implements sort.Interface.
func (a SeriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Equals assumes that both are sorted.
func (a SeriesIDs) Equals(other SeriesIDs) bool {
func (a seriesIDs) Equals(other seriesIDs) bool {
if len(a) != len(other) {
return false
}
@ -1444,7 +1418,7 @@ func (a SeriesIDs) Equals(other SeriesIDs) bool {
// Intersect returns a new collection of series ids in sorted order that is the intersection of the two.
// The two collections must already be sorted.
func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs {
func (a seriesIDs) Intersect(other seriesIDs) seriesIDs {
l := a
r := other
@ -1471,12 +1445,12 @@ func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs {
}
}
return SeriesIDs(ids)
return seriesIDs(ids)
}
// Union returns a new collection of series ids in sorted order that is the union of the two.
// The two collections must already be sorted.
func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs {
func (a seriesIDs) Union(other seriesIDs) seriesIDs {
l := a
r := other
ids := make([]uint64, 0, len(l)+len(r))
@ -1507,7 +1481,7 @@ func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs {
// Reject returns a new collection of series ids in sorted order with the passed in set removed from the original.
// This is useful for the NOT operator. The two collections must already be sorted.
func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs {
func (a seriesIDs) Reject(other seriesIDs) seriesIDs {
l := a
r := other
var i, j int
@ -1530,7 +1504,7 @@ func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs {
ids = append(ids, l[i:]...)
}
return SeriesIDs(ids)
return seriesIDs(ids)
}
// seriesID is a series id that may or may not have been evicted from the
@ -1563,9 +1537,9 @@ func newEvictSeriesIDs(ids []uint64) evictSeriesIDs {
// mark marks all of the ids in the sorted slice to be evicted from the list of
// series ids. If an id to be evicted does not exist, it just gets ignored.
func (a *evictSeriesIDs) mark(ids []uint64) {
seriesIDs := a.ids
sIDs := a.ids
for _, id := range ids {
if len(seriesIDs) == 0 {
if len(sIDs) == 0 {
break
}
@ -1573,29 +1547,29 @@ func (a *evictSeriesIDs) mark(ids []uint64) {
// the first element does not match the value we're
// looking for.
i := 0
if seriesIDs[0].val < id {
i = sort.Search(len(seriesIDs), func(i int) bool {
return seriesIDs[i].val >= id
if sIDs[0].val < id {
i = sort.Search(len(sIDs), func(i int) bool {
return sIDs[i].val >= id
})
}
if i >= len(seriesIDs) {
if i >= len(sIDs) {
break
} else if seriesIDs[i].val == id {
if !seriesIDs[i].evict {
seriesIDs[i].evict = true
} else if sIDs[i].val == id {
if !sIDs[i].evict {
sIDs[i].evict = true
a.sz--
}
// Skip over this series since it has been evicted and won't be
// encountered again.
i++
}
seriesIDs = seriesIDs[i:]
sIDs = sIDs[i:]
}
}
// evict creates a new slice with only the series that have not been evicted.
func (a *evictSeriesIDs) evict() (ids SeriesIDs) {
func (a *evictSeriesIDs) evict() (ids seriesIDs) {
if a.sz == 0 {
return ids
}
@ -1621,7 +1595,7 @@ type TagFilter struct {
// WalkTagKeys calls fn for each tag key associated with m. The order of the
// keys is undefined.
func (m *Measurement) WalkTagKeys(fn func(k string)) {
func (m *measurement) WalkTagKeys(fn func(k string)) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -1631,7 +1605,7 @@ func (m *Measurement) WalkTagKeys(fn func(k string)) {
}
// TagKeys returns a list of the measurement's tag names, in sorted order.
func (m *Measurement) TagKeys() []string {
func (m *measurement) TagKeys() []string {
m.mu.RLock()
keys := make([]string, 0, len(m.seriesByTagKeyValue))
for k := range m.seriesByTagKeyValue {
@ -1643,12 +1617,12 @@ func (m *Measurement) TagKeys() []string {
}
// TagValues returns all the values for the given tag key, in an arbitrary order.
func (m *Measurement) TagValues(auth query.Authorizer, key string) []string {
func (m *measurement) TagValues(auth query.Authorizer, key string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
values := make([]string, 0, m.seriesByTagKeyValue[key].Cardinality())
m.seriesByTagKeyValue[key].RangeAll(func(k string, a SeriesIDs) {
m.seriesByTagKeyValue[key].RangeAll(func(k string, a seriesIDs) {
if auth == nil {
values = append(values, k)
} else {
@ -1657,7 +1631,7 @@ func (m *Measurement) TagValues(auth query.Authorizer, key string) []string {
if s == nil {
continue
}
if auth.AuthorizeSeriesRead(m.database, m.name, s.Tags()) {
if auth.AuthorizeSeriesRead(m.Database, m.NameBytes, s.Tags) {
values = append(values, k)
return
}
@ -1668,7 +1642,7 @@ func (m *Measurement) TagValues(auth query.Authorizer, key string) []string {
}
// SetFieldName adds the field name to the measurement.
func (m *Measurement) SetFieldName(name string) {
func (m *measurement) SetFieldName(name string) {
m.mu.RLock()
_, ok := m.fieldNames[name]
m.mu.RUnlock()
@ -1683,7 +1657,7 @@ func (m *Measurement) SetFieldName(name string) {
}
// FieldNames returns a list of the measurement's field names, in an arbitrary order.
func (m *Measurement) FieldNames() []string {
func (m *measurement) FieldNames() []string {
m.mu.RLock()
defer m.mu.RUnlock()
@ -1695,7 +1669,7 @@ func (m *Measurement) FieldNames() []string {
}
// SeriesByTagKeyValue returns the TagKeyValue for the provided tag key.
func (m *Measurement) SeriesByTagKeyValue(key string) *TagKeyValue {
func (m *measurement) SeriesByTagKeyValue(key string) *tagKeyValue {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesByTagKeyValue[key]

View File

@ -1,4 +1,4 @@
package inmem_test
package inmem
import (
"fmt"
@ -8,15 +8,14 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"github.com/influxdata/influxql"
)
// Test comparing SeriesIDs for equality.
func TestSeriesIDs_Equals(t *testing.T) {
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3})
ids2 := inmem.SeriesIDs([]uint64{1, 2, 3})
ids3 := inmem.SeriesIDs([]uint64{4, 5, 6})
ids1 := seriesIDs([]uint64{1, 2, 3})
ids2 := seriesIDs([]uint64{1, 2, 3})
ids3 := seriesIDs([]uint64{4, 5, 6})
if !ids1.Equals(ids2) {
t.Fatal("expected ids1 == ids2")
@ -27,10 +26,10 @@ func TestSeriesIDs_Equals(t *testing.T) {
// Test intersecting sets of SeriesIDs.
func TestSeriesIDs_Intersect(t *testing.T) {
// Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)'
ids1 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
ids2 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
exp := inmem.SeriesIDs([]uint64{1, 3})
// Test swapping l & r, all branches of if-else, and exit loop when 'j < len(r)'
ids1 := seriesIDs([]uint64{1, 3, 4, 5, 6})
ids2 := seriesIDs([]uint64{1, 2, 3, 7})
exp := seriesIDs([]uint64{1, 3})
got := ids1.Intersect(ids2)
if !exp.Equals(got) {
@ -38,9 +37,9 @@ func TestSeriesIDs_Intersect(t *testing.T) {
}
// Test exit for loop when 'i < len(l)'
ids1 = inmem.SeriesIDs([]uint64{1})
ids2 = inmem.SeriesIDs([]uint64{1, 2})
exp = inmem.SeriesIDs([]uint64{1})
ids1 = seriesIDs([]uint64{1})
ids2 = seriesIDs([]uint64{1, 2})
exp = seriesIDs([]uint64{1})
got = ids1.Intersect(ids2)
if !exp.Equals(got) {
@ -51,9 +50,9 @@ func TestSeriesIDs_Intersect(t *testing.T) {
// Test union sets of SeriesIDs.
func TestSeriesIDs_Union(t *testing.T) {
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
exp := inmem.SeriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
ids1 := seriesIDs([]uint64{1, 2, 3, 7})
ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
exp := seriesIDs([]uint64{1, 2, 3, 4, 5, 6, 7})
got := ids1.Union(ids2)
if !exp.Equals(got) {
@ -61,9 +60,9 @@ func TestSeriesIDs_Union(t *testing.T) {
}
// Test exit because of 'i < len(l)' and append remainder from right.
ids1 = inmem.SeriesIDs([]uint64{1})
ids2 = inmem.SeriesIDs([]uint64{1, 2})
exp = inmem.SeriesIDs([]uint64{1, 2})
ids1 = seriesIDs([]uint64{1})
ids2 = seriesIDs([]uint64{1, 2})
exp = seriesIDs([]uint64{1, 2})
got = ids1.Union(ids2)
if !exp.Equals(got) {
@ -74,9 +73,9 @@ func TestSeriesIDs_Union(t *testing.T) {
// Test removing one set of SeriesIDs from another.
func TestSeriesIDs_Reject(t *testing.T) {
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
ids1 := inmem.SeriesIDs([]uint64{1, 2, 3, 7})
ids2 := inmem.SeriesIDs([]uint64{1, 3, 4, 5, 6})
exp := inmem.SeriesIDs([]uint64{2, 7})
ids1 := seriesIDs([]uint64{1, 2, 3, 7})
ids2 := seriesIDs([]uint64{1, 3, 4, 5, 6})
exp := seriesIDs([]uint64{2, 7})
got := ids1.Reject(ids2)
if !exp.Equals(got) {
@ -84,9 +83,9 @@ func TestSeriesIDs_Reject(t *testing.T) {
}
// Test exit because of 'i < len(l)'.
ids1 = inmem.SeriesIDs([]uint64{1})
ids2 = inmem.SeriesIDs([]uint64{1, 2})
exp = inmem.SeriesIDs{}
ids1 = seriesIDs([]uint64{1})
ids2 = seriesIDs([]uint64{1, 2})
exp = seriesIDs{}
got = ids1.Reject(ids2)
if !exp.Equals(got) {
@ -95,14 +94,14 @@ func TestSeriesIDs_Reject(t *testing.T) {
}
func TestMeasurement_AddSeries_Nil(t *testing.T) {
m := inmem.NewMeasurement("foo", "cpu")
m := newMeasurement("foo", "cpu")
if m.AddSeries(nil) {
t.Fatalf("AddSeries mismatch: exp false, got true")
}
}
func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
m := inmem.NewMeasurement("foo", "cpu")
m := newMeasurement("foo", "cpu")
var dst []string
dst = m.AppendSeriesKeysByID(dst, []uint64{1})
if exp, got := 0, len(dst); exp != got {
@ -111,9 +110,8 @@ func TestMeasurement_AppendSeriesKeysByID_Missing(t *testing.T) {
}
func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
m := inmem.NewMeasurement("foo", "cpu")
s := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
s.ID = 1
m := newMeasurement("foo", "cpu")
s := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
m.AddSeries(s)
var dst []string
@ -128,13 +126,11 @@ func TestMeasurement_AppendSeriesKeysByID_Exists(t *testing.T) {
}
func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
m := inmem.NewMeasurement("foo", "cpu")
s1 := inmem.NewSeries([]byte("cpu,host=foo"), models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
s1.ID = 1
m := newMeasurement("foo", "cpu")
s1 := newSeries(1, m, "cpu,host=foo", models.Tags{models.NewTag([]byte("host"), []byte("foo"))})
m.AddSeries(s1)
s2 := inmem.NewSeries([]byte("cpu,host=bar"), models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
s2.ID = 2
s2 := newSeries(2, m, "cpu,host=bar", models.Tags{models.NewTag([]byte("host"), []byte("bar"))})
m.AddSeries(s2)
m.DropSeries(s1)
@ -147,12 +143,11 @@ func TestMeasurement_TagsSet_Deadlock(t *testing.T) {
}
func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
m := inmem.NewMeasurement("foo", "cpu")
m := newMeasurement("foo", "cpu")
for i := 0; i < 100000; i++ {
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.NewTag(
s := newSeries(uint64(i), m, "cpu", models.Tags{models.NewTag(
[]byte("host"),
[]byte(fmt.Sprintf("host%d", i)))})
s.ID = uint64(i)
m.AddSeries(s)
}
@ -178,12 +173,11 @@ func BenchmarkMeasurement_SeriesIDForExp_EQRegex(b *testing.B) {
}
func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
m := inmem.NewMeasurement("foo", "cpu")
m := newMeasurement("foo", "cpu")
for i := 0; i < 100000; i++ {
s := inmem.NewSeries([]byte("cpu"), models.Tags{models.Tag{
s := newSeries(uint64(i), m, "cpu", models.Tags{models.Tag{
Key: []byte("host"),
Value: []byte(fmt.Sprintf("host%d", i))}})
s.ID = uint64(i)
m.AddSeries(s)
}
@ -210,11 +204,10 @@ func BenchmarkMeasurement_SeriesIDForExp_NERegex(b *testing.B) {
}
func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) {
m := inmem.NewMeasurement("foo", "m")
m := newMeasurement("foo", "m")
for i := 0; i < n; i++ {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
s := inmem.NewSeries([]byte(fmt.Sprintf("m,tag1=value1,tag2=value2")), models.NewTags(tags))
s.ID = uint64(i)
s := newSeries(uint64(i), m, fmt.Sprintf("m,tag1=value1,tag2=value2"), models.NewTags(tags))
s.AssignShard(0, time.Now().UnixNano())
m.AddSeries(s)
}

View File

@ -7,7 +7,6 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
)
// Ensure tags can be marshaled into a byte slice.
@ -142,18 +141,20 @@ func benchmarkMakeTagsKey(b *testing.B, keyN int) {
type TestSeries struct {
Measurement string
Series *inmem.Series
Key string
Tags models.Tags
}
func genTestSeries(mCnt, tCnt, vCnt int) []*TestSeries {
measurements := genStrList("measurement", mCnt)
tagSets := NewTagSetGenerator(tCnt, vCnt).AllSets()
series := []*TestSeries{}
var series []*TestSeries
for _, m := range measurements {
for _, ts := range tagSets {
series = append(series, &TestSeries{
Measurement: m,
Series: inmem.NewSeries([]byte(fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts)))), models.NewTags(ts)),
Key: fmt.Sprintf("%s:%s", m, string(tsdb.MarshalTags(ts))),
Tags: models.NewTags(ts),
})
}
}

View File

@ -1732,7 +1732,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
points := []models.Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now())
p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}
@ -1774,7 +1774,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
points := []models.Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now())
p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}

View File

@ -540,7 +540,7 @@ func testStoreCardinalityTombstoning(t *testing.T, store *Store) {
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now()))
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
@ -623,7 +623,7 @@ func testStoreCardinalityUnique(t *testing.T, store *Store) {
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now()))
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
@ -694,7 +694,7 @@ func testStoreCardinalityDuplicates(t *testing.T, store *Store) {
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now()))
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points.
@ -778,7 +778,7 @@ func testStoreCardinalityCompactions(store *Store) error {
points := make([]models.Point, 0, len(series))
for _, s := range series {
points = append(points, models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": 1.0}, time.Now()))
points = append(points, models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": 1.0}, time.Now()))
}
// Create requested number of shards in the store & write points across
@ -1331,7 +1331,7 @@ func benchmarkStoreOpen(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt, shardCnt int)
points := []models.Point{}
for _, s := range series {
for val := 0.0; val < float64(pntCnt); val++ {
p := models.MustNewPoint(s.Measurement, s.Series.Tags(), map[string]interface{}{"value": val}, time.Now())
p := models.MustNewPoint(s.Measurement, s.Tags, map[string]interface{}{"value": val}, time.Now())
points = append(points, p)
}
}