Series filtering.
parent
62269c3cea
commit
62d2b3ebe9
|
@ -1990,11 +1990,6 @@ func (p *point) Reset() {
|
|||
p.it.end = 0
|
||||
}
|
||||
|
||||
// ParseSeriesTags parses tags from a series key.
|
||||
func ParseSeriesTags(key []byte) Tags {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
// MarshalBinary encodes all the fields to their proper type and returns the binary
|
||||
// represenation
|
||||
// NOTE: uint64 is specifically not supported due to potential overflow when we decode
|
||||
|
|
|
@ -53,7 +53,7 @@ type Engine interface {
|
|||
Measurement(name []byte) (*Measurement, error)
|
||||
Measurements() (Measurements, error)
|
||||
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
|
||||
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
|
||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||
MeasurementFields(measurement string) *MeasurementFields
|
||||
|
||||
// Statistics will return statistics relevant to this engine.
|
||||
|
|
|
@ -135,14 +135,6 @@ func (i *Index) files() []File {
|
|||
return a
|
||||
}
|
||||
|
||||
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) {
|
||||
// FIXME(benbjohnson): Read lock log file during lookup.
|
||||
if mm := i.measurement(name); mm == nil {
|
||||
return mm, nil
|
||||
}
|
||||
return i.logFiles[0].CreateMeasurementIndexIfNotExists(name)
|
||||
}
|
||||
|
||||
// Measurement retrieves a measurement by name.
|
||||
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
|
||||
return i.measurement(name), nil
|
||||
|
@ -373,15 +365,15 @@ func (i *Index) MeasurementsByName(names [][]byte) ([]*tsdb.Measurement, error)
|
|||
return mms, nil
|
||||
}
|
||||
|
||||
func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
|
||||
func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
itr := i.MeasurementIterator()
|
||||
var mms tsdb.Measurements
|
||||
var a [][]byte
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
if re.Match(e.Name()) {
|
||||
mms = append(mms, i.measurement(e.Name()))
|
||||
a = append(a, e.Name())
|
||||
}
|
||||
}
|
||||
return mms, nil
|
||||
return a, nil
|
||||
}
|
||||
|
||||
// DropMeasurement deletes a measurement from the index.
|
||||
|
@ -408,20 +400,32 @@ func (i *Index) Series(name []byte, tags models.Tags) SeriesElem {
|
|||
}
|
||||
|
||||
func (i *Index) DropSeries(keys [][]byte) error {
|
||||
panic("TODO: Requires WAL")
|
||||
for _, key := range keys {
|
||||
name, tags, err := models.ParseKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := i.logFiles[0].DeleteSeries([]byte(name), tags); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) SeriesN() (n uint64, err error) {
|
||||
// TODO(edd): Use sketches.
|
||||
return 0, nil
|
||||
|
||||
// HACK(benbjohnson): Use first log file until edd adds sketches.
|
||||
return i.logFiles[0].SeriesN(), nil
|
||||
}
|
||||
|
||||
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
panic("TODO")
|
||||
panic("TODO(edd)")
|
||||
}
|
||||
|
||||
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
|
||||
panic("TODO")
|
||||
panic("TODO(edd)")
|
||||
}
|
||||
|
||||
// Dereference is a nop.
|
||||
|
@ -500,10 +504,59 @@ func (i *Index) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Rege
|
|||
// TagSets returns an ordered list of tag sets for a measurement by dimension
|
||||
// and filtered by an optional conditional expression.
|
||||
func (i *Index) TagSets(name []byte, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
|
||||
var tagSets []*influxql.TagSet
|
||||
// TODO(benbjohnson): Iterate over filtered series and build tag sets.
|
||||
panic("TODO")
|
||||
return tagSets, nil
|
||||
itr, err := i.MeasurementSeriesByExprIterator(name, condition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if itr == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// For every series, get the tag values for the requested tag keys i.e.
|
||||
// dimensions. This is the TagSet for that series. Series with the same
|
||||
// TagSet are then grouped together, because for the purpose of GROUP BY
|
||||
// they are part of the same composite series.
|
||||
tagSets := make(map[string]*influxql.TagSet, 64)
|
||||
|
||||
for e := itr.Next(); e != nil; e = itr.Next() {
|
||||
tags := make(map[string]string, len(dimensions))
|
||||
|
||||
// Build the TagSet for this series.
|
||||
for _, dim := range dimensions {
|
||||
tags[dim] = e.Tags().GetString(dim)
|
||||
}
|
||||
|
||||
// Convert the TagSet to a string, so it can be added to a map
|
||||
// allowing TagSets to be handled as a set.
|
||||
tagsAsKey := tsdb.MarshalTags(tags)
|
||||
tagSet, ok := tagSets[string(tagsAsKey)]
|
||||
if !ok {
|
||||
// This TagSet is new, create a new entry for it.
|
||||
tagSet = &influxql.TagSet{
|
||||
Tags: tags,
|
||||
Key: tagsAsKey,
|
||||
}
|
||||
}
|
||||
// Associate the series and filter with the Tagset.
|
||||
tagSet.AddFilter(string(SeriesElemKey(e)), e.Expr())
|
||||
|
||||
// Ensure it's back in the map.
|
||||
tagSets[string(tagsAsKey)] = tagSet
|
||||
}
|
||||
|
||||
// Sort the series in each tag set.
|
||||
for _, t := range tagSets {
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
// The TagSets have been created, as a map of TagSets. Just send
|
||||
// the values back as a slice, sorting for consistency.
|
||||
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
|
||||
for _, v := range tagSets {
|
||||
sortedTagsSets = append(sortedTagsSets, v)
|
||||
}
|
||||
sort.Sort(byTagKey(sortedTagsSets))
|
||||
|
||||
return sortedTagsSets, nil
|
||||
}
|
||||
|
||||
// MeasurementSeriesByExprIterator returns a series iterator for a measurement
|
||||
|
@ -580,6 +633,7 @@ func (i *Index) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr)
|
|||
return newSeriesExprIterator(i.MeasurementSeriesIterator(name), &influxql.BooleanLiteral{Val: true}), nil
|
||||
}
|
||||
|
||||
// FIXME(benbjohnson): Require measurement field info.
|
||||
/*
|
||||
// For fields, return all series from this measurement.
|
||||
if key.Val != "_name" && ((key.Type == influxql.Unknown && i.hasField(key.Val)) || key.Type == influxql.AnyField || (key.Type != influxql.Tag && key.Type != influxql.Unknown)) {
|
||||
|
|
|
@ -3,16 +3,20 @@ package tsi1
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/mmap"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Log errors.
|
||||
var (
|
||||
ErrLogEntryChecksumMismatch = errors.New("log entry checksum mismatch")
|
||||
)
|
||||
|
||||
// Log entry flag constants.
|
||||
|
@ -31,8 +35,7 @@ type LogFile struct {
|
|||
entries []LogEntry // parsed entries
|
||||
|
||||
// In-memory index.
|
||||
series logSeries
|
||||
mms logMeasurements
|
||||
mms logMeasurements
|
||||
|
||||
// Filepath to the log file.
|
||||
Path string
|
||||
|
@ -69,10 +72,6 @@ func (f *LogFile) open() error {
|
|||
} else if fi.Size() == 0 {
|
||||
return nil
|
||||
}
|
||||
println("FI", fi.Size())
|
||||
|
||||
buf, _ := ioutil.ReadFile(f.Path)
|
||||
Hexdump(buf)
|
||||
|
||||
// Open a read-only memory map of the existing data.
|
||||
data, err := mmap.Map(f.Path)
|
||||
|
@ -91,6 +90,9 @@ func (f *LogFile) open() error {
|
|||
}
|
||||
f.entries = append(f.entries, e)
|
||||
|
||||
// Execute entry against in-memory index.
|
||||
f.execEntry(&e)
|
||||
|
||||
// Move buffer forward.
|
||||
buf = buf[e.Size:]
|
||||
}
|
||||
|
@ -107,37 +109,78 @@ func (f *LogFile) Close() error {
|
|||
if f.data != nil {
|
||||
mmap.Unmap(f.data)
|
||||
}
|
||||
|
||||
f.entries = nil
|
||||
f.mms = make(logMeasurements)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *LogFile) CreateMeasurementIndexIfNotExists(name []byte) (*tsdb.Measurement, error) {
|
||||
panic("TODO")
|
||||
// MeasurementNames returns an ordered list of measurement names.
|
||||
func (f *LogFile) MeasurementNames() []string {
|
||||
a := make([]string, 0, len(f.mms))
|
||||
for name := range f.mms {
|
||||
a = append(a, name)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// DeleteMeasurement adds a tombstone for a measurement to the log file.
|
||||
func (f *LogFile) DeleteMeasurement(name []byte) error {
|
||||
// Append log entry.
|
||||
if err := f.append(LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}); err != nil {
|
||||
e := LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}
|
||||
if err := f.appendEntry(&e); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete measurement from index.
|
||||
mm := f.measurement(name)
|
||||
mm.deleted = true
|
||||
f.mms[string(name)] = mm
|
||||
|
||||
f.execEntry(&e)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTagKey adds a tombstone for a tag key to the log file.
|
||||
func (f *LogFile) DeleteTagKey(name, key []byte) error {
|
||||
return f.append(LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Tags: models.Tags{{Key: key}}})
|
||||
e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Tags: models.Tags{{Key: key}}}
|
||||
if err := f.appendEntry(&e); err != nil {
|
||||
return err
|
||||
}
|
||||
f.execEntry(&e)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTagValue adds a tombstone for a tag value to the log file.
|
||||
func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
|
||||
return f.append(LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Tags: models.Tags{{Key: key, Value: value}}})
|
||||
e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Tags: models.Tags{{Key: key, Value: value}}}
|
||||
if err := f.appendEntry(&e); err != nil {
|
||||
return err
|
||||
}
|
||||
f.execEntry(&e)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddSeries adds a series to the log file.
|
||||
func (f *LogFile) AddSeries(name []byte, tags models.Tags) error {
|
||||
e := LogEntry{Name: name, Tags: tags}
|
||||
if err := f.appendEntry(&e); err != nil {
|
||||
return err
|
||||
}
|
||||
f.execEntry(&e)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteSeries adds a tombstone for a series to the log file.
|
||||
func (f *LogFile) DeleteSeries(name []byte, tags models.Tags) error {
|
||||
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, Name: name, Tags: tags}
|
||||
if err := f.appendEntry(&e); err != nil {
|
||||
return err
|
||||
}
|
||||
f.execEntry(&e)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SeriesN returns the total number of series in the file.
|
||||
func (f *LogFile) SeriesN() (n uint64) {
|
||||
for _, mm := range f.mms {
|
||||
n += uint64(len(mm.series))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Series returns a series reference.
|
||||
|
@ -160,71 +203,10 @@ func (f *LogFile) Series(name []byte, tags models.Tags) SeriesElem {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AddSeries adds a series to the log file.
|
||||
func (f *LogFile) AddSeries(name []byte, tags models.Tags) error {
|
||||
return f.insertSeries(LogEntry{Name: name, Tags: tags})
|
||||
}
|
||||
|
||||
// DeleteSeries adds a tombstone for a series to the log file.
|
||||
func (f *LogFile) DeleteSeries(name []byte, tags models.Tags) error {
|
||||
return f.insertSeries(LogEntry{Flag: LogEntrySeriesTombstoneFlag, Name: name, Tags: tags})
|
||||
}
|
||||
|
||||
// insertSeries inserts a series entry.
|
||||
func (f *LogFile) insertSeries(e LogEntry) error {
|
||||
// Append log entry.
|
||||
if err := f.append(e); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if series is deleted.
|
||||
deleted := (e.Flag & LogEntrySeriesTombstoneFlag) != 0
|
||||
|
||||
// Insert series to list.
|
||||
f.series.insert(e.Name, e.Tags, deleted)
|
||||
|
||||
// Fetch measurement.
|
||||
mm := f.measurement(e.Name)
|
||||
|
||||
// Save tags.
|
||||
for _, t := range e.Tags {
|
||||
// Fetch key.
|
||||
ts, ok := mm.tagSet[string(t.Key)]
|
||||
if !ok {
|
||||
ts = logTagSet{name: t.Key, tagValues: make(map[string]logTagValue)}
|
||||
}
|
||||
|
||||
// Fetch value.
|
||||
tv, ok := ts.tagValues[string(t.Value)]
|
||||
if !ok {
|
||||
tv.name = t.Value
|
||||
}
|
||||
tv.insertEntry(e)
|
||||
ts.tagValues[string(t.Value)] = tv
|
||||
|
||||
// Save key.
|
||||
mm.tagSet[string(t.Key)] = ts
|
||||
}
|
||||
|
||||
// Insert series to list.
|
||||
// TODO: Remove global series list.
|
||||
mm.series.insert(e.Name, e.Tags, deleted)
|
||||
|
||||
// Save measurement.
|
||||
f.mms[string(e.Name)] = mm
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// append adds a generic entry to the end of the file.
|
||||
func (f *LogFile) append(e LogEntry) error {
|
||||
// appendEntry adds a log entry to the end of the file.
|
||||
func (f *LogFile) appendEntry(e *LogEntry) error {
|
||||
// Marshal entry to the local buffer.
|
||||
f.buf = appendLogEntry(f.buf[0:], &e)
|
||||
|
||||
// Append checksum.
|
||||
var buf [4]byte
|
||||
binary.BigEndian.PutUint32(buf[:], e.Checksum)
|
||||
f.buf = append(f.buf, buf[:]...)
|
||||
f.buf = appendLogEntry(f.buf[0:], e)
|
||||
|
||||
// Save the size of the record.
|
||||
e.Size = len(f.buf)
|
||||
|
@ -242,11 +224,86 @@ func (f *LogFile) append(e LogEntry) error {
|
|||
}
|
||||
|
||||
// Save entry to in-memory list.
|
||||
f.entries = append(f.entries, e)
|
||||
f.entries = append(f.entries, *e)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// execEntry executes a log entry against the in-memory index.
|
||||
// This is done after appending and on replay of the log.
|
||||
func (f *LogFile) execEntry(e *LogEntry) {
|
||||
switch e.Flag {
|
||||
case LogEntryMeasurementTombstoneFlag:
|
||||
f.execDeleteMeasurementEntry(e)
|
||||
case LogEntryTagKeyTombstoneFlag:
|
||||
f.execDeleteTagKeyEntry(e)
|
||||
case LogEntryTagValueTombstoneFlag:
|
||||
f.execDeleteTagValueEntry(e)
|
||||
default:
|
||||
f.execSeriesEntry(e)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *LogFile) execDeleteMeasurementEntry(e *LogEntry) {
|
||||
mm := f.measurement(e.Name)
|
||||
mm.deleted = true
|
||||
mm.tagSet = make(map[string]logTagSet)
|
||||
mm.series = nil
|
||||
f.mms[string(e.Name)] = mm
|
||||
}
|
||||
|
||||
func (f *LogFile) execDeleteTagKeyEntry(e *LogEntry) {
|
||||
key := e.Tags[0].Key
|
||||
|
||||
mm := f.measurement(e.Name)
|
||||
ts := mm.createTagSetIfNotExists(key)
|
||||
|
||||
ts.deleted = true
|
||||
|
||||
mm.tagSet[string(key)] = ts
|
||||
f.mms[string(e.Name)] = mm
|
||||
}
|
||||
|
||||
func (f *LogFile) execDeleteTagValueEntry(e *LogEntry) {
|
||||
key, value := e.Tags[0].Key, e.Tags[0].Value
|
||||
|
||||
mm := f.measurement(e.Name)
|
||||
ts := mm.createTagSetIfNotExists(key)
|
||||
tv := ts.createTagValueIfNotExists(value)
|
||||
|
||||
tv.deleted = true
|
||||
|
||||
ts.tagValues[string(value)] = tv
|
||||
mm.tagSet[string(key)] = ts
|
||||
f.mms[string(e.Name)] = mm
|
||||
}
|
||||
|
||||
func (f *LogFile) execSeriesEntry(e *LogEntry) {
|
||||
// Check if series is deleted.
|
||||
deleted := (e.Flag & LogEntrySeriesTombstoneFlag) != 0
|
||||
|
||||
// Fetch measurement.
|
||||
mm := f.measurement(e.Name)
|
||||
|
||||
// Save tags.
|
||||
for _, t := range e.Tags {
|
||||
ts := mm.createTagSetIfNotExists(t.Key)
|
||||
tv := ts.createTagValueIfNotExists(t.Value)
|
||||
|
||||
tv.insertEntry(e)
|
||||
|
||||
ts.tagValues[string(t.Value)] = tv
|
||||
mm.tagSet[string(t.Key)] = ts
|
||||
}
|
||||
|
||||
// Insert series to list.
|
||||
// TODO: Remove global series list.
|
||||
mm.series.insert(e.Name, e.Tags, deleted)
|
||||
|
||||
// Save measurement.
|
||||
f.mms[string(e.Name)] = mm
|
||||
}
|
||||
|
||||
// measurement returns a measurement by name.
|
||||
func (f *LogFile) measurement(name []byte) logMeasurement {
|
||||
mm, ok := f.mms[string(name)]
|
||||
|
@ -319,14 +376,23 @@ func (f *LogFile) CompactTo(w io.Writer) (n int64, err error) {
|
|||
}
|
||||
|
||||
func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error {
|
||||
// Ensure series are sorted.
|
||||
sort.Sort(f.series)
|
||||
|
||||
// Write all series.
|
||||
sw := NewSeriesBlockWriter()
|
||||
for _, serie := range f.series {
|
||||
if err := sw.Add(serie.name, serie.tags); err != nil {
|
||||
return err
|
||||
|
||||
// Retreve measurement names in order.
|
||||
names := f.MeasurementNames()
|
||||
|
||||
// Add series from measurements in order.
|
||||
for _, name := range names {
|
||||
mm := f.mms[name]
|
||||
|
||||
// Ensure series are sorted.
|
||||
sort.Sort(mm.series)
|
||||
|
||||
for _, serie := range mm.series {
|
||||
if err := sw.Add(serie.name, serie.tags); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,28 +404,32 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, n *int64) error {
|
|||
}
|
||||
|
||||
// Add series to each measurement and key/value.
|
||||
for i := range f.series {
|
||||
serie := &f.series[i]
|
||||
for _, name := range names {
|
||||
mm := f.mms[name]
|
||||
|
||||
// Lookup series offset.
|
||||
serie.offset = sw.Offset(serie.name, serie.tags)
|
||||
if serie.offset == 0 {
|
||||
panic("series not found")
|
||||
for i := range mm.series {
|
||||
serie := &mm.series[i]
|
||||
|
||||
// Lookup series offset.
|
||||
serie.offset = sw.Offset(serie.name, serie.tags)
|
||||
if serie.offset == 0 {
|
||||
panic("series not found")
|
||||
}
|
||||
|
||||
// Add series id to measurement, tag key, and tag value.
|
||||
mm.seriesIDs = append(mm.seriesIDs, serie.offset)
|
||||
|
||||
// Add series id to each tag value.
|
||||
for _, tag := range serie.tags {
|
||||
t := mm.tagSet[string(tag.Key)]
|
||||
|
||||
v := t.tagValues[string(tag.Value)]
|
||||
v.seriesIDs = append(v.seriesIDs, serie.offset)
|
||||
t.tagValues[string(tag.Value)] = v
|
||||
}
|
||||
}
|
||||
|
||||
// Add series id to measurement, tag key, and tag value.
|
||||
mm := f.mms[string(serie.name)]
|
||||
mm.seriesIDs = append(mm.seriesIDs, serie.offset)
|
||||
f.mms[string(serie.name)] = mm
|
||||
|
||||
// Add series id to each tag value.
|
||||
for _, tag := range serie.tags {
|
||||
t := mm.tagSet[string(tag.Key)]
|
||||
|
||||
v := t.tagValues[string(tag.Value)]
|
||||
v.seriesIDs = append(v.seriesIDs, serie.offset)
|
||||
t.tagValues[string(tag.Value)] = v
|
||||
}
|
||||
f.mms[string(name)] = mm
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -431,13 +501,11 @@ func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, n *int64)
|
|||
|
||||
// reset clears all the compaction fields on the in-memory index.
|
||||
func (f *LogFile) reset() {
|
||||
// Clear series compaction fields.
|
||||
for i := range f.series {
|
||||
f.series[i].offset = 0
|
||||
}
|
||||
|
||||
// Clear measurement compaction fields.
|
||||
for name, mm := range f.mms {
|
||||
for i := range mm.series {
|
||||
mm.series[i].offset = 0
|
||||
}
|
||||
|
||||
mm.offset, mm.size, mm.seriesIDs = 0, 0, nil
|
||||
for key, tagSet := range mm.tagSet {
|
||||
for value, tagValue := range tagSet.tagValues {
|
||||
|
@ -461,6 +529,7 @@ type LogEntry struct {
|
|||
|
||||
// UnmarshalBinary unmarshals data into e.
|
||||
func (e *LogEntry) UnmarshalBinary(data []byte) error {
|
||||
orig := data
|
||||
start := len(data)
|
||||
|
||||
// Parse flag data.
|
||||
|
@ -488,9 +557,17 @@ func (e *LogEntry) UnmarshalBinary(data []byte) error {
|
|||
tag.Value, data = data[n:n+int(sz)], data[n+int(sz):]
|
||||
}
|
||||
|
||||
// Compute checksum.
|
||||
chk := crc32.ChecksumIEEE(orig[:start-len(data)])
|
||||
|
||||
// Parse checksum.
|
||||
e.Checksum, data = binary.BigEndian.Uint32(data[:4]), data[4:]
|
||||
|
||||
// Verify checksum.
|
||||
if chk != e.Checksum {
|
||||
return ErrLogEntryChecksumMismatch
|
||||
}
|
||||
|
||||
// Save length of elem.
|
||||
e.Size = start - len(data)
|
||||
|
||||
|
@ -613,6 +690,14 @@ func (m *logMeasurement) Name() []byte { return m.name }
|
|||
func (m *logMeasurement) Deleted() bool { return m.deleted }
|
||||
func (m *logMeasurement) TagKeyIterator() TagKeyIterator { panic("TODO") }
|
||||
|
||||
func (m *logMeasurement) createTagSetIfNotExists(key []byte) logTagSet {
|
||||
ts, ok := m.tagSet[string(key)]
|
||||
if !ok {
|
||||
ts = logTagSet{name: key, tagValues: make(map[string]logTagValue)}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// logMeasurementSlice is a sortable list of log measurements.
|
||||
type logMeasurementSlice []logMeasurement
|
||||
|
||||
|
@ -636,8 +721,16 @@ func (itr *logMeasurementIterator) Next() (e MeasurementElem) {
|
|||
|
||||
type logTagSet struct {
|
||||
name []byte
|
||||
tagValues map[string]logTagValue
|
||||
deleted bool
|
||||
tagValues map[string]logTagValue
|
||||
}
|
||||
|
||||
func (ts *logTagSet) createTagValueIfNotExists(value []byte) logTagValue {
|
||||
tv, ok := ts.tagValues[string(value)]
|
||||
if !ok {
|
||||
tv = logTagValue{name: value}
|
||||
}
|
||||
return tv
|
||||
}
|
||||
|
||||
type logTagValue struct {
|
||||
|
@ -651,7 +744,7 @@ type logTagValue struct {
|
|||
|
||||
// insertEntry inserts an entry into the tag value in sorted order.
|
||||
// If another entry matches the name/tags then it is overrwritten.
|
||||
func (tv *logTagValue) insertEntry(e LogEntry) {
|
||||
func (tv *logTagValue) insertEntry(e *LogEntry) {
|
||||
i := sort.Search(len(tv.entries), func(i int) bool {
|
||||
if cmp := bytes.Compare(tv.entries[i].Name, e.Name); cmp != 0 {
|
||||
return cmp != -1
|
||||
|
@ -661,14 +754,14 @@ func (tv *logTagValue) insertEntry(e LogEntry) {
|
|||
|
||||
// Update entry if it already exists.
|
||||
if i < len(tv.entries) && bytes.Equal(tv.entries[i].Name, e.Name) && tv.entries[i].Tags.Equal(e.Tags) {
|
||||
tv.entries[i] = e
|
||||
tv.entries[i] = *e
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise insert new entry.
|
||||
tv.entries = append(tv.entries, LogEntry{})
|
||||
copy(tv.entries[i+1:], tv.entries[i:])
|
||||
tv.entries[i] = e
|
||||
tv.entries[i] = *e
|
||||
}
|
||||
|
||||
// logSeriesIterator represents an iterator over a slice of series.
|
||||
|
|
|
@ -33,6 +33,21 @@ func TestLogFile_AddSeries(t *testing.T) {
|
|||
} else if e := itr.Next(); e != nil {
|
||||
t.Fatalf("expected eof, got: %#v", e)
|
||||
}
|
||||
|
||||
// Reopen file and re-verify.
|
||||
if err := f.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify data.
|
||||
itr = f.MeasurementIterator()
|
||||
if e := itr.Next(); e == nil || string(e.Name()) != "cpu" {
|
||||
t.Fatalf("unexpected measurement: %#v", e)
|
||||
} else if e := itr.Next(); e == nil || string(e.Name()) != "mem" {
|
||||
t.Fatalf("unexpected measurement: %#v", e)
|
||||
} else if e := itr.Next(); e != nil {
|
||||
t.Fatalf("expected eof, got: %#v", e)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure log file can delete an existing measurement.
|
||||
|
@ -99,6 +114,17 @@ func (f *LogFile) Close() error {
|
|||
return f.LogFile.Close()
|
||||
}
|
||||
|
||||
// Reopen closes and reopens the file.
|
||||
func (f *LogFile) Reopen() error {
|
||||
if err := f.LogFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.LogFile.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateLogFile creates a new temporary log file and adds a list of series.
|
||||
func CreateLogFile(series []Series) (*LogFile, error) {
|
||||
f := MustOpenLogFile()
|
||||
|
|
|
@ -354,6 +354,24 @@ type SeriesElem interface {
|
|||
Expr() influxql.Expr
|
||||
}
|
||||
|
||||
// SeriesElemKey encodes e as a series key.
|
||||
func SeriesElemKey(e SeriesElem) []byte {
|
||||
name, tags := e.Name(), e.Tags()
|
||||
|
||||
// TODO: Precompute allocation size.
|
||||
// FIXME: Handle escaping.
|
||||
|
||||
var buf []byte
|
||||
buf = append(buf, name...)
|
||||
for _, t := range tags {
|
||||
buf = append(buf, ',')
|
||||
buf = append(buf, t.Key...)
|
||||
buf = append(buf, '=')
|
||||
buf = append(buf, t.Value...)
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// CompareSeriesElem returns -1 if a < b, 1 if a > b, and 0 if equal.
|
||||
func CompareSeriesElem(a, b SeriesElem) int {
|
||||
if cmp := bytes.Compare(a.Name(), b.Name()); cmp != 0 {
|
||||
|
@ -789,3 +807,9 @@ func assert(condition bool, msg string, v ...interface{}) {
|
|||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
||||
type byTagKey []*influxql.TagSet
|
||||
|
||||
func (t byTagKey) Len() int { return len(t) }
|
||||
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
|
||||
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||
|
|
|
@ -294,8 +294,8 @@ func (e *Engine) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool
|
|||
return e.index.MeasurementsByExpr(expr)
|
||||
}
|
||||
|
||||
func (e *Engine) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
|
||||
return e.index.MeasurementsByRegex(re)
|
||||
func (e *Engine) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
return e.index.MeasurementNamesByRegex(re)
|
||||
}
|
||||
|
||||
// MeasurementFields returns the measurement fields for a measurement.
|
||||
|
@ -1466,7 +1466,7 @@ func (e *Engine) createTagSetGroupIterators(ref *influxql.VarRef, name string, s
|
|||
|
||||
// createVarRefSeriesIterator creates an iterator for a variable reference for a series.
|
||||
func (e *Engine) createVarRefSeriesIterator(ref *influxql.VarRef, name string, seriesKey string, t *influxql.TagSet, filter influxql.Expr, conditionFields []influxql.VarRef, opt influxql.IteratorOptions) (influxql.Iterator, error) {
|
||||
tfs := models.ParseSeriesTags([]byte(seriesKey))
|
||||
_, tfs, _ := models.ParseKey([]byte(seriesKey))
|
||||
tags := influxql.NewTags(tfs.Map())
|
||||
|
||||
// Create options specific for this series.
|
||||
|
|
|
@ -12,12 +12,11 @@ type Index interface {
|
|||
Open() error
|
||||
Close() error
|
||||
|
||||
CreateMeasurementIndexIfNotExists(name []byte) (*Measurement, error)
|
||||
Measurement(name []byte) (*Measurement, error)
|
||||
Measurements() (Measurements, error)
|
||||
MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
|
||||
MeasurementsByName(names [][]byte) ([]*Measurement, error)
|
||||
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
|
||||
MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
|
||||
DropMeasurement(name []byte) error
|
||||
|
||||
CreateSeriesIfNotExists(name []byte, tags models.Tags) error
|
||||
|
|
15
tsdb/meta.go
15
tsdb/meta.go
|
@ -1,7 +1,6 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
@ -392,15 +391,15 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme
|
|||
return measurements
|
||||
}
|
||||
|
||||
// MeasurementsByRegex returns the measurements that match the regex.
|
||||
func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) {
|
||||
// MeasurementNamesByRegex returns the measurements that match the regex.
|
||||
func (d *DatabaseIndex) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
var matches Measurements
|
||||
var matches [][]byte
|
||||
for _, m := range d.measurements {
|
||||
if re.MatchString(m.Name) {
|
||||
matches = append(matches, m)
|
||||
matches = append(matches,[]byte(m.Name))
|
||||
}
|
||||
}
|
||||
return matches, nil
|
||||
|
@ -1696,9 +1695,3 @@ func MeasurementFromSeriesKey(key string) string {
|
|||
k, _, _ := models.ParseKey([]byte(key))
|
||||
return escape.UnescapeString(k)
|
||||
}
|
||||
|
||||
type byTagKey []*influxql.TagSet
|
||||
|
||||
func (t byTagKey) Len() int { return len(t) }
|
||||
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
|
||||
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
|
||||
|
|
|
@ -481,14 +481,15 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point,
|
|||
dropped, n int
|
||||
reason string
|
||||
)
|
||||
|
||||
if s.options.Config.MaxValuesPerTag > 0 {
|
||||
// Validate that all the new points would not exceed any limits, if so, we drop them
|
||||
// and record why/increment counters
|
||||
for i, p := range points {
|
||||
tags := p.Tags()
|
||||
|
||||
m := s.Measurement([]byte(p.Name()))
|
||||
// Measurement doesn't exist yet, can't check the limit
|
||||
m := s.Measurement([]byte(p.Name()))
|
||||
if m != nil {
|
||||
var dropPoint bool
|
||||
for _, tag := range tags {
|
||||
|
@ -789,16 +790,16 @@ func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error
|
|||
}
|
||||
|
||||
// Loop over matching measurements.
|
||||
measurements, err := s.engine.MeasurementsByRegex(src.Regex.Val)
|
||||
names, err := s.engine.MeasurementNamesByRegex(src.Regex.Val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range measurements {
|
||||
for _, name := range names {
|
||||
other := &influxql.Measurement{
|
||||
Database: src.Database,
|
||||
RetentionPolicy: src.RetentionPolicy,
|
||||
Name: m.Name,
|
||||
Name: string(name),
|
||||
}
|
||||
set[other.String()] = other
|
||||
}
|
||||
|
|
|
@ -498,8 +498,8 @@ cpu,host=serverB,region=uswest value=25 0
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer itr.Close()
|
||||
fitr := itr.(influxql.FloatIterator)
|
||||
defer itr.Close()
|
||||
|
||||
// Read values from iterator.
|
||||
if p, err := fitr.Next(); err != nil {
|
||||
|
|
Loading…
Reference in New Issue