Implement tsdb.Index interface on tsi1.Index.

pull/7913/head
Ben Johnson 2016-10-18 08:34:51 -06:00
parent e2c3b52ca4
commit 2a81351992
No known key found for this signature in database
GPG Key ID: 81741CD251883081
15 changed files with 1147 additions and 622 deletions

View File

@ -1792,6 +1792,37 @@ func (a Tags) HashKey() []byte {
return b[:idx]
}
// CopyTags returns a shallow copy of tags.
func CopyTags(a Tags) Tags {
other := make(Tags, len(a))
copy(other, a)
return other
}
// DeepCopyTags returns a deep copy of tags.
func DeepCopyTags(a Tags) Tags {
// Calculate size of keys/values in bytes.
var n int
for _, t := range a {
n += len(t.Key) + len(t.Value)
}
// Build single allocation for all key/values.
buf := make([]byte, n)
// Copy tags to new set.
other := make(Tags, len(a))
for i, t := range a {
copy(buf, t.Key)
other[i].Key, buf = buf[:len(t.Key)], buf[len(t.Key):]
copy(buf, t.Value)
other[i].Value, buf = buf[:len(t.Value)], buf[len(t.Value):]
}
return other
}
// Fields represents a mapping between a Point's field names and their
// values.
type Fields map[string]interface{}

View File

@ -1,447 +1,295 @@
package tsi1
import (
"bytes"
"encoding/binary"
"errors"
"io"
"fmt"
"regexp"
"sort"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/tsdb"
)
// IndexVersion is the current TSI1 index version.
const IndexVersion = 1
// Ensure index implements the interface.
var _ tsdb.Index = &Index{}
// FileSignature represents a magic number at the header of the index file.
const FileSignature = "TSI1"
// Index field size constants.
const (
// Index trailer fields
IndexVersionSize = 2
SeriesListOffsetSize = 8
SeriesListSizeSize = 8
MeasurementBlockOffsetSize = 8
MeasurementBlockSizeSize = 8
IndexTrailerSize = IndexVersionSize +
SeriesListOffsetSize +
SeriesListSizeSize +
MeasurementBlockOffsetSize +
MeasurementBlockSizeSize
)
// Index errors.
var (
ErrInvalidIndex = errors.New("invalid index")
ErrUnsupportedIndexVersion = errors.New("unsupported index version")
)
// Index represents a collection of measurement, tag, and series data.
// Index represents a collection of layered index files and WAL.
type Index struct {
data []byte
file *IndexFile
// Components
slist SeriesList
mblk MeasurementBlock
// TODO(benbjohnson): Use layered list of index files.
// TODO(benbjohnson): Add write ahead log.
}
// UnmarshalBinary opens an index from data.
// The byte slice is retained so it must be kept open.
func (i *Index) UnmarshalBinary(data []byte) error {
// Ensure magic number exists at the beginning.
if len(data) < len(FileSignature) {
return io.ErrShortBuffer
} else if !bytes.Equal(data[:len(FileSignature)], []byte(FileSignature)) {
return ErrInvalidIndex
}
// SetFile explicitly sets a file in the index.
func (i *Index) SetFile(f *IndexFile) { i.file = f }
// Read index trailer.
t, err := ReadIndexTrailer(data)
if err != nil {
return err
}
// Slice measurement block data.
buf := data[t.MeasurementBlock.Offset:]
buf = buf[:t.MeasurementBlock.Size]
// Unmarshal measurement block.
if err := i.mblk.UnmarshalBinary(buf); err != nil {
return err
}
// Slice series list data.
buf = data[t.SeriesList.Offset:]
buf = buf[:t.SeriesList.Size]
// Unmarshal series list.
if err := i.slist.UnmarshalBinary(buf); err != nil {
return err
}
// Save reference to entire data block.
i.data = data
return nil
func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) {
panic("TODO: Requires WAL")
}
// Close closes the index file.
func (i *Index) Close() error {
i.slist = SeriesList{}
i.mblk = MeasurementBlock{}
return nil
// Measurement retrieves a measurement by name.
func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) {
return i.measurement(name), nil
}
// TagValueElem returns a list of series ids for a measurement/tag/value.
func (i *Index) TagValueElem(name, key, value []byte) (TagValueElem, error) {
// Find measurement.
e, ok := i.mblk.Elem(name)
if !ok {
return TagValueElem{}, nil
}
func (i *Index) measurement(name []byte) *tsdb.Measurement {
m := tsdb.NewMeasurement(string(name))
// Find tag set block.
tblk, err := i.tagSetBlock(&e)
if err != nil {
return TagValueElem{}, err
}
return tblk.TagValueElem(key, value), nil
}
// Iterate over measurement series.
itr := i.file.MeasurementSeriesIterator(name)
// tagSetBlock returns a tag set block for a measurement.
func (i *Index) tagSetBlock(e *MeasurementElem) (TagSet, error) {
// Slice tag set data.
buf := i.data[e.TagSet.Offset:]
buf = buf[:e.TagSet.Size]
// Unmarshal block.
var blk TagSet
if err := blk.UnmarshalBinary(buf); err != nil {
return TagSet{}, err
}
return blk, nil
}
// Indices represents a layered set of indices.
type Indices []*Index
// IndexWriter represents a naive implementation of an index builder.
type IndexWriter struct {
series indexSeries
mms indexMeasurements
}
// NewIndexWriter returns a new instance of IndexWriter.
func NewIndexWriter() *IndexWriter {
return &IndexWriter{
mms: make(indexMeasurements),
}
}
// Add adds a series to the index.
func (iw *IndexWriter) Add(name string, tags models.Tags) {
// Add to series list.
iw.series = append(iw.series, indexSerie{name: name, tags: tags})
// Find or create measurement.
mm, ok := iw.mms[name]
if !ok {
mm.name = []byte(name)
mm.tagset = make(indexTagset)
iw.mms[name] = mm
}
// Add tagset.
for _, tag := range tags {
t, ok := mm.tagset[string(tag.Key)]
if !ok {
t.name = tag.Key
t.values = make(indexValues)
mm.tagset[string(tag.Key)] = t
var id uint64 // TEMPORARY
var sname []byte
var tags models.Tags
var deleted bool
for {
if itr.Next(&sname, &tags, &deleted); sname == nil {
break
}
v, ok := t.values[string(tag.Value)]
if !ok {
v.name = tag.Value
t.values[string(tag.Value)] = v
// TODO: Handle deleted series.
// Append series to to measurement.
// TODO: Remove concept of series ids.
m.AddSeries(&tsdb.Series{
ID: id,
Key: string(sname),
Tags: models.CopyTags(tags),
})
// TEMPORARY: Increment ID.
id++
}
if !m.HasSeries() {
return nil
}
return m
}
// Measurements returns a list of all measurements.
func (i *Index) Measurements() (tsdb.Measurements, error) {
var mms tsdb.Measurements
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
mms = append(mms, i.measurement(e.Name))
}
return mms, nil
}
func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
return i.measurementsByExpr(expr)
}
func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) {
if expr == nil {
return nil, false, nil
}
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok {
return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String())
}
// Retrieve value or regex expression from RHS.
var value string
var regex *regexp.Regexp
if influxql.IsRegexOp(e.Op) {
re, ok := e.RHS.(*influxql.RegexLiteral)
if !ok {
return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String())
}
regex = re.Val
} else {
s, ok := e.RHS.(*influxql.StringLiteral)
if !ok {
return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String())
}
value = s.Val
}
// Match on name, if specified.
if tag.Val == "_name" {
return i.measurementsByNameFilter(e.Op, value, regex), true, nil
} else if influxql.IsSystemName(tag.Val) {
return nil, false, nil
}
return i.measurementsByTagFilter(e.Op, tag.Val, value, regex), true, nil
case influxql.OR, influxql.AND:
lhsIDs, lhsOk, err := i.measurementsByExpr(e.LHS)
if err != nil {
return nil, false, err
}
rhsIDs, rhsOk, err := i.measurementsByExpr(e.RHS)
if err != nil {
return nil, false, err
}
if lhsOk && rhsOk {
if e.Op == influxql.OR {
return lhsIDs.Union(rhsIDs), true, nil
}
return lhsIDs.Intersect(rhsIDs), true, nil
} else if lhsOk {
return lhsIDs, true, nil
} else if rhsOk {
return rhsIDs, true, nil
}
return nil, false, nil
default:
return nil, false, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
return i.measurementsByExpr(e.Expr)
default:
return nil, false, fmt.Errorf("%#v", expr)
}
}
// WriteTo writes the index to w.
func (iw *IndexWriter) WriteTo(w io.Writer) (n int64, err error) {
var t IndexTrailer
// measurementsByNameFilter returns the sorted measurements matching a name.
func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements {
var mms tsdb.Measurements
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
var matched bool
switch op {
case influxql.EQ:
matched = string(e.Name) == val
case influxql.NEQ:
matched = string(e.Name) != val
case influxql.EQREGEX:
matched = regex.Match(e.Name)
case influxql.NEQREGEX:
matched = !regex.Match(e.Name)
}
// Write magic number.
if err := writeTo(w, []byte(FileSignature), &n); err != nil {
return n, err
if matched {
mms = append(mms, i.measurement(e.Name))
}
}
// Write series list.
t.SeriesList.Offset = n
if err := iw.writeSeriesListTo(w, &n); err != nil {
return n, err
}
t.SeriesList.Size = n - t.SeriesList.Offset
// Sort measurement names.
names := iw.mms.Names()
// Write tagset blocks in measurement order.
if err := iw.writeTagsetsTo(w, names, &n); err != nil {
return n, err
}
// Write measurement block.
t.MeasurementBlock.Offset = n
if err := iw.writeMeasurementBlockTo(w, names, &n); err != nil {
return n, err
}
t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
// Write trailer.
if err := iw.writeTrailerTo(w, t, &n); err != nil {
return n, err
}
return n, nil
sort.Sort(mms)
return mms
}
func (iw *IndexWriter) writeSeriesListTo(w io.Writer, n *int64) error {
// Ensure series are sorted.
sort.Sort(iw.series)
func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) tsdb.Measurements {
var mms tsdb.Measurements
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
mm := i.measurement(e.Name)
// Write all series.
sw := NewSeriesListWriter()
for _, serie := range iw.series {
if err := sw.Add(serie.name, serie.tags); err != nil {
return err
}
}
// Flush series list.
nn, err := sw.WriteTo(w)
*n += nn
if err != nil {
return err
}
// Add series to each measurement and key/value.
for i := range iw.series {
serie := &iw.series[i]
// Lookup series offset.
serie.offset = sw.Offset(serie.name, serie.tags)
if serie.offset == 0 {
panic("series not found")
}
// Add series id to measurement, tag key, and tag value.
mm := iw.mms[serie.name]
mm.seriesIDs = append(mm.seriesIDs, serie.offset)
iw.mms[serie.name] = mm
// Add series id to each tag value.
for _, tag := range serie.tags {
t := mm.tagset[string(tag.Key)]
v := t.values[string(tag.Value)]
v.seriesIDs = append(v.seriesIDs, serie.offset)
t.values[string(tag.Value)] = v
}
}
return nil
}
func (iw *IndexWriter) writeTagsetsTo(w io.Writer, names []string, n *int64) error {
for _, name := range names {
if err := iw.writeTagsetTo(w, name, n); err != nil {
return err
}
}
return nil
}
// writeTagsetTo writes a single tagset to w and saves the tagset offset.
func (iw *IndexWriter) writeTagsetTo(w io.Writer, name string, n *int64) error {
mm := iw.mms[name]
tsw := NewTagSetWriter()
for _, tag := range mm.tagset {
// Mark tag deleted.
if tag.deleted {
tsw.AddTag(tag.name, true)
tagVals := mm.SeriesByTagKeyValue(key)
if tagVals == nil {
continue
}
// Add each value.
for _, value := range tag.values {
sort.Sort(uint32Slice(value.seriesIDs))
tsw.AddTagValue(tag.name, value.name, value.deleted, value.seriesIDs)
// If the operator is non-regex, only check the specified value.
var tagMatch bool
if op == influxql.EQ || op == influxql.NEQ {
if _, ok := tagVals[val]; ok {
tagMatch = true
}
} else {
// Else, the operator is a regex and we have to check all tag
// values against the regular expression.
for tagVal := range tagVals {
if regex.MatchString(tagVal) {
tagMatch = true
break
}
}
}
//
// XNOR gate
//
// tags match | operation is EQ | measurement matches
// --------------------------------------------------
// True | True | True
// True | False | False
// False | True | False
// False | False | True
if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) {
mms = append(mms, mm)
break
}
}
// Save tagset offset to measurement.
mm.offset = *n
sort.Sort(mms)
return mms
}
// Write tagset to writer.
nn, err := tsw.WriteTo(w)
*n += nn
func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) {
itr := i.file.MeasurementIterator()
mms := make([]*tsdb.Measurement, 0, len(names))
for e := itr.Next(); e != nil; e = itr.Next() {
for _, name := range names {
if string(e.Name) == name {
mms = append(mms, i.measurement(e.Name))
break
}
}
}
return mms, nil
}
func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) {
itr := i.file.MeasurementIterator()
var mms tsdb.Measurements
for e := itr.Next(); e != nil; e = itr.Next() {
if re.Match(e.Name) {
mms = append(mms, i.measurement(e.Name))
}
}
return mms, nil
}
func (i *Index) DropMeasurement(name []byte) error {
panic("TODO: Requires WAL")
}
func (i *Index) CreateSeriesIndexIfNotExists(measurement string, series *tsdb.Series) (*tsdb.Series, error) {
panic("TODO: Requires WAL")
}
func (i *Index) Series(key []byte) (*tsdb.Series, error) {
panic("TODO")
}
func (i *Index) DropSeries(keys []string) error {
panic("TODO: Requires WAL")
}
func (i *Index) SeriesN() (n uint64, err error) {
itr := i.file.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
n += uint64(e.Series.N)
}
return n, nil
}
func (i *Index) TagsForSeries(key string) (models.Tags, error) {
ss, err := i.Series([]byte(key))
if err != nil {
return err
return nil, err
}
// Save tagset offset to measurement.
mm.size = *n - mm.offset
iw.mms[name] = mm
return nil
return ss.Tags, nil
}
func (iw *IndexWriter) writeMeasurementBlockTo(w io.Writer, names []string, n *int64) error {
mw := NewMeasurementBlockWriter()
// Add measurement data.
for _, mm := range iw.mms {
mw.Add(mm.name, mm.offset, mm.size, mm.seriesIDs)
}
// Write data to writer.
nn, err := mw.WriteTo(w)
*n += nn
if err != nil {
return err
}
return nil
func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) {
panic("TODO")
}
// writeTrailerTo writes the index trailer to w.
func (iw *IndexWriter) writeTrailerTo(w io.Writer, t IndexTrailer, n *int64) error {
// Write series list info.
if err := writeUint64To(w, uint64(t.SeriesList.Offset), n); err != nil {
return err
} else if err := writeUint64To(w, uint64(t.SeriesList.Size), n); err != nil {
return err
}
// Write measurement block info.
if err := writeUint64To(w, uint64(t.MeasurementBlock.Offset), n); err != nil {
return err
} else if err := writeUint64To(w, uint64(t.MeasurementBlock.Size), n); err != nil {
return err
}
// Write index encoding version.
if err := writeUint16To(w, IndexVersion, n); err != nil {
return err
}
return nil
func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
panic("TODO")
}
type indexSerie struct {
name string
tags models.Tags
deleted bool
offset uint32
}
type indexSeries []indexSerie
func (a indexSeries) Len() int { return len(a) }
func (a indexSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a indexSeries) Less(i, j int) bool {
if a[i].name != a[j].name {
return a[i].name < a[j].name
}
return models.CompareTags(a[i].tags, a[j].tags) == -1
}
type indexMeasurement struct {
name []byte
deleted bool
tagset indexTagset
offset int64 // tagset offset
size int64 // tagset size
seriesIDs []uint32
}
type indexMeasurements map[string]indexMeasurement
// Names returns a sorted list of measurement names.
func (m indexMeasurements) Names() []string {
a := make([]string, 0, len(m))
for name := range m {
a = append(a, name)
}
sort.Strings(a)
return a
}
type indexTag struct {
name []byte
deleted bool
values indexValues
}
type indexTagset map[string]indexTag
type indexValue struct {
name []byte
deleted bool
seriesIDs []uint32
}
type indexValues map[string]indexValue
// ReadIndexTrailer returns the index trailer from data.
func ReadIndexTrailer(data []byte) (IndexTrailer, error) {
var t IndexTrailer
// Read version.
t.Version = int(binary.BigEndian.Uint16(data[len(data)-IndexVersionSize:]))
if t.Version != IndexVersion {
return t, ErrUnsupportedIndexVersion
}
// Slice trailer data.
buf := data[len(data)-IndexTrailerSize:]
// Read series list info.
t.SeriesList.Offset = int64(binary.BigEndian.Uint64(buf[0:SeriesListOffsetSize]))
buf = buf[SeriesListOffsetSize:]
t.SeriesList.Size = int64(binary.BigEndian.Uint64(buf[0:SeriesListSizeSize]))
buf = buf[SeriesListSizeSize:]
// Read measurement block info.
t.MeasurementBlock.Offset = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockOffsetSize]))
buf = buf[MeasurementBlockOffsetSize:]
t.MeasurementBlock.Size = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockSizeSize]))
buf = buf[MeasurementBlockSizeSize:]
return t, nil
}
// IndexTrailer represents meta data written to the end of the index.
type IndexTrailer struct {
Version int
SeriesList struct {
Offset int64
Size int64
}
MeasurementBlock struct {
Offset int64
Size int64
}
}
type uint32Slice []uint32
func (a uint32Slice) Len() int { return len(a) }
func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] }

View File

@ -0,0 +1,495 @@
package tsi1
import (
"bytes"
"encoding/binary"
"errors"
"io"
"sort"
"github.com/influxdata/influxdb/models"
)
// IndexFileVersion is the current TSI1 index file version.
const IndexFileVersion = 1
// FileSignature represents a magic number at the header of the index file.
const FileSignature = "TSI1"
// IndexFile field size constants.
const (
// IndexFile trailer fields
IndexFileVersionSize = 2
SeriesListOffsetSize = 8
SeriesListSizeSize = 8
MeasurementBlockOffsetSize = 8
MeasurementBlockSizeSize = 8
IndexFileTrailerSize = IndexFileVersionSize +
SeriesListOffsetSize +
SeriesListSizeSize +
MeasurementBlockOffsetSize +
MeasurementBlockSizeSize
)
// IndexFile errors.
var (
ErrInvalidIndexFile = errors.New("invalid index file")
ErrUnsupportedIndexFileVersion = errors.New("unsupported index file version")
)
// IndexFile represents a collection of measurement, tag, and series data.
type IndexFile struct {
data []byte
// Components
slist SeriesList
mblk MeasurementBlock
}
// UnmarshalBinary opens an index from data.
// The byte slice is retained so it must be kept open.
func (i *IndexFile) UnmarshalBinary(data []byte) error {
// Ensure magic number exists at the beginning.
if len(data) < len(FileSignature) {
return io.ErrShortBuffer
} else if !bytes.Equal(data[:len(FileSignature)], []byte(FileSignature)) {
return ErrInvalidIndexFile
}
// Read index file trailer.
t, err := ReadIndexFileTrailer(data)
if err != nil {
return err
}
// Slice measurement block data.
buf := data[t.MeasurementBlock.Offset:]
buf = buf[:t.MeasurementBlock.Size]
// Unmarshal measurement block.
if err := i.mblk.UnmarshalBinary(buf); err != nil {
return err
}
// Slice series list data.
buf = data[t.SeriesList.Offset:]
buf = buf[:t.SeriesList.Size]
// Unmarshal series list.
if err := i.slist.UnmarshalBinary(buf); err != nil {
return err
}
// Save reference to entire data block.
i.data = data
return nil
}
// Close closes the index file.
func (i *IndexFile) Close() error {
i.slist = SeriesList{}
i.mblk = MeasurementBlock{}
return nil
}
// TagValueElem returns a list of series ids for a measurement/tag/value.
func (i *IndexFile) TagValueElem(name, key, value []byte) (TagValueElem, error) {
// Find measurement.
e, ok := i.mblk.Elem(name)
if !ok {
return TagValueElem{}, nil
}
// Find tag set block.
tblk, err := i.tagSetBlock(&e)
if err != nil {
return TagValueElem{}, err
}
return tblk.TagValueElem(key, value), nil
}
// tagSetBlock returns a tag set block for a measurement.
func (i *IndexFile) tagSetBlock(e *MeasurementElem) (TagSet, error) {
// Slice tag set data.
buf := i.data[e.TagSet.Offset:]
buf = buf[:e.TagSet.Size]
// Unmarshal block.
var blk TagSet
if err := blk.UnmarshalBinary(buf); err != nil {
return TagSet{}, err
}
return blk, nil
}
// MeasurementIterator returns an iterator over all measurements.
func (i *IndexFile) MeasurementIterator() MeasurementIterator {
return i.mblk.Iterator()
}
// MeasurementSeriesIterator returns an iterator over a measurement's series.
func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator {
// Find measurement element.
e, ok := i.mblk.Elem(name)
if !ok {
return &seriesIterator{}
}
// Return iterator.
return &seriesIterator{
n: e.Series.N,
data: e.Series.Data,
seriesList: &i.slist,
}
}
// seriesIterator iterates over a list of raw data.
type seriesIterator struct {
i, n uint32
data []byte
seriesList *SeriesList
}
// Next returns the next decoded series. Uses name & tags as reusable buffers.
// Returns nils when the iterator is complete.
func (itr *seriesIterator) Next(name *[]byte, tags *models.Tags, deleted *bool) {
// Return nil if we've reached the end.
if itr.i == itr.n {
*name, *tags = nil, nil
return
}
// Move forward and retrieved offset.
offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:])
// Read from series list into buffers.
itr.seriesList.DecodeSeriesAt(offset, name, tags, deleted)
// Move iterator forward.
itr.i++
}
// IndexFiles represents a layered set of index files.
type IndexFiles []*IndexFile
// IndexFileWriter represents a naive implementation of an index file builder.
type IndexFileWriter struct {
series indexFileSeries
mms indexFileMeasurements
}
// NewIndexFileWriter returns a new instance of IndexFileWriter.
func NewIndexFileWriter() *IndexFileWriter {
return &IndexFileWriter{
mms: make(indexFileMeasurements),
}
}
// Add adds a series to the index file.
func (iw *IndexFileWriter) Add(name []byte, tags models.Tags) {
// Add to series list.
iw.series = append(iw.series, indexFileSerie{name: name, tags: tags})
// Find or create measurement.
mm, ok := iw.mms[string(name)]
if !ok {
mm.name = name
mm.tagset = make(indexFileTagset)
iw.mms[string(name)] = mm
}
// Add tagset.
for _, tag := range tags {
t, ok := mm.tagset[string(tag.Key)]
if !ok {
t.name = tag.Key
t.values = make(indexFileValues)
mm.tagset[string(tag.Key)] = t
}
v, ok := t.values[string(tag.Value)]
if !ok {
v.name = tag.Value
t.values[string(tag.Value)] = v
}
}
}
// WriteTo writes the index file to w.
func (iw *IndexFileWriter) WriteTo(w io.Writer) (n int64, err error) {
var t IndexFileTrailer
// Write magic number.
if err := writeTo(w, []byte(FileSignature), &n); err != nil {
return n, err
}
// Write series list.
t.SeriesList.Offset = n
if err := iw.writeSeriesListTo(w, &n); err != nil {
return n, err
}
t.SeriesList.Size = n - t.SeriesList.Offset
// Sort measurement names.
names := iw.mms.Names()
// Write tagset blocks in measurement order.
if err := iw.writeTagsetsTo(w, names, &n); err != nil {
return n, err
}
// Write measurement block.
t.MeasurementBlock.Offset = n
if err := iw.writeMeasurementBlockTo(w, names, &n); err != nil {
return n, err
}
t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset
// Write trailer.
if err := iw.writeTrailerTo(w, t, &n); err != nil {
return n, err
}
return n, nil
}
func (iw *IndexFileWriter) writeSeriesListTo(w io.Writer, n *int64) error {
// Ensure series are sorted.
sort.Sort(iw.series)
// Write all series.
sw := NewSeriesListWriter()
for _, serie := range iw.series {
if err := sw.Add(serie.name, serie.tags); err != nil {
return err
}
}
// Flush series list.
nn, err := sw.WriteTo(w)
*n += nn
if err != nil {
return err
}
// Add series to each measurement and key/value.
for i := range iw.series {
serie := &iw.series[i]
// Lookup series offset.
serie.offset = sw.Offset(serie.name, serie.tags)
if serie.offset == 0 {
panic("series not found")
}
// Add series id to measurement, tag key, and tag value.
mm := iw.mms[string(serie.name)]
mm.seriesIDs = append(mm.seriesIDs, serie.offset)
iw.mms[string(serie.name)] = mm
// Add series id to each tag value.
for _, tag := range serie.tags {
t := mm.tagset[string(tag.Key)]
v := t.values[string(tag.Value)]
v.seriesIDs = append(v.seriesIDs, serie.offset)
t.values[string(tag.Value)] = v
}
}
return nil
}
func (iw *IndexFileWriter) writeTagsetsTo(w io.Writer, names []string, n *int64) error {
for _, name := range names {
if err := iw.writeTagsetTo(w, name, n); err != nil {
return err
}
}
return nil
}
// writeTagsetTo writes a single tagset to w and saves the tagset offset.
func (iw *IndexFileWriter) writeTagsetTo(w io.Writer, name string, n *int64) error {
mm := iw.mms[name]
tsw := NewTagSetWriter()
for _, tag := range mm.tagset {
// Mark tag deleted.
if tag.deleted {
tsw.AddTag(tag.name, true)
continue
}
// Add each value.
for _, value := range tag.values {
sort.Sort(uint32Slice(value.seriesIDs))
tsw.AddTagValue(tag.name, value.name, value.deleted, value.seriesIDs)
}
}
// Save tagset offset to measurement.
mm.offset = *n
// Write tagset to writer.
nn, err := tsw.WriteTo(w)
*n += nn
if err != nil {
return err
}
// Save tagset offset to measurement.
mm.size = *n - mm.offset
iw.mms[name] = mm
return nil
}
func (iw *IndexFileWriter) writeMeasurementBlockTo(w io.Writer, names []string, n *int64) error {
mw := NewMeasurementBlockWriter()
// Add measurement data.
for _, mm := range iw.mms {
mw.Add(mm.name, mm.offset, mm.size, mm.seriesIDs)
}
// Write data to writer.
nn, err := mw.WriteTo(w)
*n += nn
if err != nil {
return err
}
return nil
}
// writeTrailerTo writes the index file trailer to w.
func (iw *IndexFileWriter) writeTrailerTo(w io.Writer, t IndexFileTrailer, n *int64) error {
// Write series list info.
if err := writeUint64To(w, uint64(t.SeriesList.Offset), n); err != nil {
return err
} else if err := writeUint64To(w, uint64(t.SeriesList.Size), n); err != nil {
return err
}
// Write measurement block info.
if err := writeUint64To(w, uint64(t.MeasurementBlock.Offset), n); err != nil {
return err
} else if err := writeUint64To(w, uint64(t.MeasurementBlock.Size), n); err != nil {
return err
}
// Write index file encoding version.
if err := writeUint16To(w, IndexFileVersion, n); err != nil {
return err
}
return nil
}
type indexFileSerie struct {
name []byte
tags models.Tags
deleted bool
offset uint32
}
type indexFileSeries []indexFileSerie
func (a indexFileSeries) Len() int { return len(a) }
func (a indexFileSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a indexFileSeries) Less(i, j int) bool {
if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 {
return cmp == -1
}
return models.CompareTags(a[i].tags, a[j].tags) == -1
}
type indexFileMeasurement struct {
name []byte
deleted bool
tagset indexFileTagset
offset int64 // tagset offset
size int64 // tagset size
seriesIDs []uint32
}
type indexFileMeasurements map[string]indexFileMeasurement
// Names returns a sorted list of measurement names.
func (m indexFileMeasurements) Names() []string {
a := make([]string, 0, len(m))
for name := range m {
a = append(a, name)
}
sort.Strings(a)
return a
}
type indexFileTag struct {
name []byte
deleted bool
values indexFileValues
}
type indexFileTagset map[string]indexFileTag
type indexFileValue struct {
name []byte
deleted bool
seriesIDs []uint32
}
type indexFileValues map[string]indexFileValue
// ReadIndexFileTrailer returns the index file trailer from data.
func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) {
var t IndexFileTrailer
// Read version.
t.Version = int(binary.BigEndian.Uint16(data[len(data)-IndexFileVersionSize:]))
if t.Version != IndexFileVersion {
return t, ErrUnsupportedIndexFileVersion
}
// Slice trailer data.
buf := data[len(data)-IndexFileTrailerSize:]
// Read series list info.
t.SeriesList.Offset = int64(binary.BigEndian.Uint64(buf[0:SeriesListOffsetSize]))
buf = buf[SeriesListOffsetSize:]
t.SeriesList.Size = int64(binary.BigEndian.Uint64(buf[0:SeriesListSizeSize]))
buf = buf[SeriesListSizeSize:]
// Read measurement block info.
t.MeasurementBlock.Offset = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockOffsetSize]))
buf = buf[MeasurementBlockOffsetSize:]
t.MeasurementBlock.Size = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockSizeSize]))
buf = buf[MeasurementBlockSizeSize:]
return t, nil
}
// IndexFileTrailer represents meta data written to the end of the index file.
type IndexFileTrailer struct {
Version int
SeriesList struct {
Offset int64
Size int64
}
MeasurementBlock struct {
Offset int64
Size int64
}
}
type uint32Slice []uint32
func (a uint32Slice) Len() int { return len(a) }
func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] }

View File

@ -0,0 +1,161 @@
package tsi1_test
import (
"bytes"
"fmt"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
)
// Ensure a simple index file can be built and opened.
func TestCreateIndexFile(t *testing.T) {
if _, err := CreateIndexFile([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
}
// Ensure index file generation can be successfully built.
func TestGenerateIndexFile(t *testing.T) {
// Build generated index file.
idx, err := GenerateIndexFile(10, 3, 4)
if err != nil {
t.Fatal(err)
}
// Verify that tag/value series can be fetched.
if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil {
t.Fatal(err)
} else if e.Series.N == 0 {
t.Fatal("expected series")
}
}
func BenchmarkIndexFile_TagValueSeries(b *testing.B) {
b.Run("M=1,K=2,V=3", func(b *testing.B) {
benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(1, 2, 3))
})
b.Run("M=10,K=5,V=5", func(b *testing.B) {
benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(10, 5, 5))
})
b.Run("M=10,K=7,V=5", func(b *testing.B) {
benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(10, 7, 7))
})
}
func benchmarkIndexFile_TagValueSeries(b *testing.B, idx *tsi1.IndexFile) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil {
b.Fatal(err)
} else if e.Series.N == 0 {
b.Fatal("expected series")
}
}
}
// CreateIndexFile creates an index file with a given set of series.
func CreateIndexFile(series []Series) (*tsi1.IndexFile, error) {
// Add series to the writer.
ifw := tsi1.NewIndexFileWriter()
for _, serie := range series {
ifw.Add(serie.Name, serie.Tags)
}
// Write index file to buffer.
var buf bytes.Buffer
if _, err := ifw.WriteTo(&buf); err != nil {
return nil, err
}
// Load index file from buffer.
var f tsi1.IndexFile
if err := f.UnmarshalBinary(buf.Bytes()); err != nil {
return nil, err
}
return &f, nil
}
// GenerateIndexFile generates an index file from a set of series based on the count arguments.
// Total series returned will equal measurementN * tagN * valueN.
func GenerateIndexFile(measurementN, tagN, valueN int) (*tsi1.IndexFile, error) {
tagValueN := pow(valueN, tagN)
iw := tsi1.NewIndexFileWriter()
for i := 0; i < measurementN; i++ {
name := []byte(fmt.Sprintf("measurement%d", i))
// Generate tag sets.
for j := 0; j < tagValueN; j++ {
var tags models.Tags
for k := 0; k < tagN; k++ {
key := []byte(fmt.Sprintf("key%d", k))
value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN)))
tags = append(tags, models.Tag{Key: key, Value: value})
}
iw.Add(name, tags)
}
}
// Write index file to buffer.
var buf bytes.Buffer
if _, err := iw.WriteTo(&buf); err != nil {
return nil, err
}
// Load index file from buffer.
var idx tsi1.IndexFile
if err := idx.UnmarshalBinary(buf.Bytes()); err != nil {
return nil, err
}
return &idx, nil
}
func MustGenerateIndexFile(measurementN, tagN, valueN int) *tsi1.IndexFile {
idx, err := GenerateIndexFile(measurementN, tagN, valueN)
if err != nil {
panic(err)
}
return idx
}
var indexFileCache struct {
MeasurementN int
TagN int
ValueN int
IndexFile *tsi1.IndexFile
}
// MustFindOrGenerateIndexFile returns a cached index file or generates one if it doesn't exist.
func MustFindOrGenerateIndexFile(measurementN, tagN, valueN int) *tsi1.IndexFile {
// Use cache if fields match and the index file has been generated.
if indexFileCache.MeasurementN == measurementN &&
indexFileCache.TagN == tagN &&
indexFileCache.ValueN == valueN &&
indexFileCache.IndexFile != nil {
return indexFileCache.IndexFile
}
// Generate and cache.
indexFileCache.MeasurementN = measurementN
indexFileCache.TagN = tagN
indexFileCache.ValueN = valueN
indexFileCache.IndexFile = MustGenerateIndexFile(measurementN, tagN, valueN)
return indexFileCache.IndexFile
}
func pow(x, y int) int {
r := 1
for i := 0; i < y; i++ {
r *= x
}
return r
}

View File

@ -1,158 +1,67 @@
package tsi1_test
import (
"bytes"
"fmt"
"testing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
)
// Ensure a simple index can be built and opened.
func TestIndex(t *testing.T) {
series := []Series{
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})},
}
// Add series to the writer.
iw := tsi1.NewIndexWriter()
for _, serie := range series {
iw.Add(serie.Name, serie.Tags)
}
// Write index to buffer.
var buf bytes.Buffer
if _, err := iw.WriteTo(&buf); err != nil {
t.Fatal(err)
}
// Load index from buffer.
var idx tsi1.Index
if err := idx.UnmarshalBinary(buf.Bytes()); err != nil {
t.Fatal(err)
}
}
// Ensure index generation can be successfully built.
func TestGenerateIndex(t *testing.T) {
// Build generated index.
idx, err := GenerateIndex(10, 3, 4)
// Ensure index can return a single measurement by name.
func TestIndex_Measurement(t *testing.T) {
// Build an index file.
f, err := CreateIndexFile([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
})
if err != nil {
t.Fatal(err)
}
// Verify that tag/value series can be fetched.
if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil {
t.Fatal(err)
} else if e.Series.N == 0 {
t.Fatal("expected series")
}
}
func BenchmarkIndex_TagValueSeries(b *testing.B) {
b.Run("M=1,K=2,V=3", func(b *testing.B) {
benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(1, 2, 3))
})
b.Run("M=10,K=5,V=5", func(b *testing.B) {
benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(10, 5, 5))
})
b.Run("M=10,K=7,V=5", func(b *testing.B) {
benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(10, 7, 7))
})
}
func benchmarkIndex_TagValueSeries(b *testing.B, idx *tsi1.Index) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil {
b.Fatal(err)
} else if e.Series.N == 0 {
b.Fatal("expected series")
}
}
}
// GenerateIndex Generates an index from a set of series based on the count arguments.
// Total series returned will equal measurementN * tagN * valueN.
func GenerateIndex(measurementN, tagN, valueN int) (*tsi1.Index, error) {
tagValueN := pow(valueN, tagN)
println("generating", measurementN*pow(valueN, tagN))
iw := tsi1.NewIndexWriter()
for i := 0; i < measurementN; i++ {
name := fmt.Sprintf("measurement%d", i)
// Generate tag sets.
for j := 0; j < tagValueN; j++ {
var tags models.Tags
for k := 0; k < tagN; k++ {
key := []byte(fmt.Sprintf("key%d", k))
value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN)))
tags = append(tags, models.Tag{Key: key, Value: value})
}
iw.Add(name, tags)
}
}
// Write index to buffer.
var buf bytes.Buffer
if _, err := iw.WriteTo(&buf); err != nil {
return nil, err
}
println("file size", buf.Len())
// Load index from buffer.
// Create an index from the single file.
var idx tsi1.Index
if err := idx.UnmarshalBinary(buf.Bytes()); err != nil {
return nil, err
idx.SetFile(f)
// Verify measurement is correct.
if mm, err := idx.Measurement([]byte("cpu")); err != nil {
t.Fatal(err)
} else if mm == nil {
t.Fatal("expected measurement")
}
// Verify non-existent measurement doesn't exist.
if mm, err := idx.Measurement([]byte("no_such_measurement")); err != nil {
t.Fatal(err)
} else if mm != nil {
t.Fatal("expected nil measurement")
}
return &idx, nil
}
func MustGenerateIndex(measurementN, tagN, valueN int) *tsi1.Index {
idx, err := GenerateIndex(measurementN, tagN, valueN)
// Ensure index can return a list of all measurements.
func TestIndex_Measurements(t *testing.T) {
// Build an index file.
f, err := CreateIndexFile([]Series{
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
})
if err != nil {
panic(err)
}
return idx
}
var indexCache struct {
MeasurementN int
TagN int
ValueN int
Index *tsi1.Index
}
// MustFindOrGenerateIndex returns a cached index or generates one if it doesn't exist.
func MustFindOrGenerateIndex(measurementN, tagN, valueN int) *tsi1.Index {
// Use cache if fields match and the index has been generated.
if indexCache.MeasurementN == measurementN &&
indexCache.TagN == tagN &&
indexCache.ValueN == valueN &&
indexCache.Index != nil {
return indexCache.Index
t.Fatal(err)
}
// Generate and cache.
indexCache.MeasurementN = measurementN
indexCache.TagN = tagN
indexCache.ValueN = valueN
indexCache.Index = MustGenerateIndex(measurementN, tagN, valueN)
return indexCache.Index
}
// Create an index from the single file.
var idx tsi1.Index
idx.SetFile(f)
func pow(x, y int) int {
r := 1
for i := 0; i < y; i++ {
r *= x
// Retrieve measurements and verify.
if mms, err := idx.Measurements(); err != nil {
t.Fatal(err)
} else if len(mms) != 2 {
t.Fatalf("expected measurement count: %d", len(mms))
} else if mms[0].Name != "cpu" {
t.Fatalf("unexpected measurement(0): %s", mms[0].Name)
} else if mms[1].Name != "mem" {
t.Fatalf("unexpected measurement(1): %s", mms[1].Name)
}
return r
}

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"io"
"sort"
"github.com/influxdata/influxdb/pkg/rhh"
)
@ -19,6 +20,9 @@ const (
// Measurement field size constants.
const (
// 1 byte offset for the block to ensure non-zero offsets.
MeasurementFillSize = 1
// Measurement trailer fields
MeasurementBlockVersionSize = 2
MeasurementBlockSize = 8
@ -109,6 +113,33 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error {
return nil
}
// Iterator returns an iterator over all measurements.
func (blk *MeasurementBlock) Iterator() MeasurementIterator {
return &measurementIterator{data: blk.data[MeasurementFillSize:]}
}
// measurementIterator iterates over a list measurements in a block.
type measurementIterator struct {
elem MeasurementElem
data []byte
}
// Next returns the next measurement. Returns false when iterator is complete.
func (itr *measurementIterator) Next() *MeasurementElem {
// Return nil when we run out of data.
if len(itr.data) == 0 {
return nil
}
// Unmarshal the element at the current position.
itr.elem.UnmarshalBinary(itr.data)
// Move the data forward past the record.
itr.data = itr.data[itr.elem.Size:]
return &itr.elem
}
// ReadMeasurementBlockTrailer returns the trailer from data.
func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) {
var t MeasurementBlockTrailer
@ -170,6 +201,14 @@ type MeasurementElem struct {
N uint32 // series count
Data []byte // serialized series data
}
// Size in bytes, set after unmarshaling.
Size int
}
// Deleted returns true if the tombstone flag is set.
func (e *MeasurementElem) Deleted() bool {
return (e.Flag & MeasurementTombstoneFlag) != 0
}
// SeriesID returns series ID at an index.
@ -188,6 +227,8 @@ func (e *MeasurementElem) SeriesIDs() []uint32 {
// UnmarshalBinary unmarshals data into e.
func (e *MeasurementElem) UnmarshalBinary(data []byte) error {
start := len(data)
// Parse flag data.
e.Flag, data = data[0], data[1:]
@ -202,7 +243,10 @@ func (e *MeasurementElem) UnmarshalBinary(data []byte) error {
// Parse series data.
v, n := binary.Uvarint(data)
e.Series.N, data = uint32(v), data[n:]
e.Series.Data = data[:e.Series.N*SeriesIDSize]
e.Series.Data, data = data[:e.Series.N*SeriesIDSize], data[e.Series.N*SeriesIDSize:]
// Save length of elem.
e.Size = start - len(data)
return nil
}
@ -242,6 +286,29 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// Sort names.
names := make([]string, 0, len(mw.mms))
for name := range mw.mms {
names = append(names, name)
}
sort.Strings(names)
// Encode key list.
for _, name := range names {
// Retrieve measurement and save offset.
mm := mw.mms[name]
mm.offset = n
mw.mms[name] = mm
// Write measurement
if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil {
return n, err
}
}
// Save starting offset of hash index.
hoff := n
// Build key hash map
m := rhh.NewHashMap(rhh.Options{
Capacity: len(mw.mms),
@ -252,35 +319,21 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) {
m.Put([]byte(name), &mm)
}
// Encode key list.
offsets := make([]int64, m.Cap())
for i := 0; i < m.Cap(); i++ {
k, v := m.Elem(i)
if v == nil {
continue
}
mm := v.(*measurement)
// Save current offset so we can use it in the hash index.
offsets[i] = n
// Write measurement
if err := mw.writeMeasurementTo(w, k, mm, &n); err != nil {
return n, err
}
}
// Save starting offset of hash index.
hoff := n
// Encode hash map length.
if err := writeUint32To(w, uint32(m.Cap()), &n); err != nil {
return n, err
}
// Encode hash map offset entries.
for i := range offsets {
if err := writeUint64To(w, uint64(offsets[i]), &n); err != nil {
for i := 0; i < m.Cap(); i++ {
_, v := m.Elem(i)
var offset int64
if mm, ok := v.(*measurement); ok {
offset = mm.offset
}
if err := writeUint64To(w, uint64(offset), &n); err != nil {
return n, err
}
}
@ -351,6 +404,7 @@ type measurement struct {
size int64
}
seriesIDs []uint32
offset int64
}
func (mm measurement) flag() byte {

View File

@ -41,6 +41,7 @@ const (
// SeriesList represents the section of the index which holds the term
// dictionary and a sorted list of series keys.
type SeriesList struct {
data []byte
termData []byte
seriesData []byte
}
@ -75,18 +76,18 @@ func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) {
}
// EncodeSeries returns a dictionary-encoded series key.
func (l *SeriesList) EncodeSeries(name string, tags models.Tags) []byte {
func (l *SeriesList) EncodeSeries(name []byte, tags models.Tags) []byte {
// Build a buffer with the minimum space for the name, tag count, and tags.
buf := make([]byte, 2+len(tags))
return l.AppendEncodeSeries(buf[:0], name, tags)
}
// AppendEncodeSeries appends an encoded series value to dst and returns the new slice.
func (l *SeriesList) AppendEncodeSeries(dst []byte, name string, tags models.Tags) []byte {
func (l *SeriesList) AppendEncodeSeries(dst []byte, name []byte, tags models.Tags) []byte {
var buf [binary.MaxVarintLen32]byte
// Append encoded name.
n := binary.PutUvarint(buf[:], uint64(l.EncodeTerm([]byte(name))))
n := binary.PutUvarint(buf[:], uint64(l.EncodeTerm(name)))
dst = append(dst, buf[:n]...)
// Append encoded tag count.
@ -105,16 +106,32 @@ func (l *SeriesList) AppendEncodeSeries(dst []byte, name string, tags models.Tag
return dst
}
// DecodeSeriesAt decodes the series at a given offset.
func (l *SeriesList) DecodeSeriesAt(offset uint32, name *[]byte, tags *models.Tags, deleted *bool) {
data := l.data[offset:]
// Read flag.
flag, data := data[0], data[1:]
*deleted = (flag & SeriesTombstoneFlag) != 0
l.DecodeSeries(data, name, tags)
}
// DecodeSeries decodes a dictionary encoded series into a name and tagset.
func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) {
func (l *SeriesList) DecodeSeries(v []byte, name *[]byte, tags *models.Tags) {
// Read name.
offset, n := binary.Uvarint(v)
name, v = string(l.DecodeTerm(uint32(offset))), v[n:]
*name, v = l.DecodeTerm(uint32(offset)), v[n:]
// Read tag count.
tagN, n := binary.Uvarint(v)
v = v[n:]
// Clear tags, if necessary.
if len(*tags) > 0 {
*tags = (*tags)[0:]
}
// Loop over tag key/values.
for i := 0; i < int(tagN); i++ {
// Read key.
@ -128,8 +145,6 @@ func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) {
// Add to tagset.
tags.Set(key, value)
}
return name, tags
}
// DecodeTerm returns the term at the given offset.
@ -184,6 +199,9 @@ func (l *SeriesList) SeriesCount() uint32 {
func (l *SeriesList) UnmarshalBinary(data []byte) error {
t := ReadSeriesListTrailer(data)
// Save entire block.
l.data = data
// Slice term list data.
l.termData = data[t.TermList.Offset:]
l.termData = l.termData[:t.TermList.Size]
@ -213,23 +231,23 @@ func NewSeriesListWriter() *SeriesListWriter {
// Add adds a series to the writer's set.
// Returns an ErrSeriesOverflow if no more series can be held in the writer.
func (sw *SeriesListWriter) Add(name string, tags models.Tags) error {
func (sw *SeriesListWriter) Add(name []byte, tags models.Tags) error {
return sw.append(name, tags, false)
}
// Delete marks a series as tombstoned.
func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error {
func (sw *SeriesListWriter) Delete(name []byte, tags models.Tags) error {
return sw.append(name, tags, true)
}
func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool) error {
func (sw *SeriesListWriter) append(name []byte, tags models.Tags, deleted bool) error {
// Ensure writer doesn't add too many series.
if len(sw.series) == math.MaxInt32 {
return ErrSeriesOverflow
}
// Increment term counts.
sw.terms[name]++
sw.terms[string(name)]++
for _, t := range tags {
sw.terms[string(t.Key)]++
sw.terms[string(t.Value)]++
@ -381,12 +399,12 @@ func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, t SeriesListTrailer, n *
// Offset returns the series offset from the writer.
// Only valid after the series list has been written to a writer.
func (sw *SeriesListWriter) Offset(name string, tags models.Tags) uint32 {
func (sw *SeriesListWriter) Offset(name []byte, tags models.Tags) uint32 {
// Find position of series.
i := sort.Search(len(sw.series), func(i int) bool {
s := &sw.series[i]
if s.name != name {
return s.name >= name
if cmp := bytes.Compare(s.name, name); cmp != 0 {
return cmp != -1
}
return models.CompareTags(s.tags, tags) != -1
})
@ -394,7 +412,7 @@ func (sw *SeriesListWriter) Offset(name string, tags models.Tags) uint32 {
// Ignore if it's not an exact match.
if i >= len(sw.series) {
return 0
} else if s := &sw.series[i]; s.name != name || !s.tags.Equal(tags) {
} else if s := &sw.series[i]; !bytes.Equal(s.name, name) || !s.tags.Equal(tags) {
return 0
}
@ -437,7 +455,7 @@ type SeriesListTrailer struct {
}
type serie struct {
name string
name []byte
tags models.Tags
deleted bool
offset uint32
@ -448,8 +466,8 @@ type series []serie
func (a series) Len() int { return len(a) }
func (a series) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a series) Less(i, j int) bool {
if a[i].name != a[j].name {
return a[i].name < a[j].name
if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 {
return cmp == -1
}
return models.CompareTags(a[i].tags, a[j].tags) == -1
}

View File

@ -13,9 +13,9 @@ import (
// Ensure series list can be unmarshaled.
func TestSeriesList_UnmarshalBinary(t *testing.T) {
if _, err := CreateSeriesList([]Series{
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}); err != nil {
t.Fatal(err)
}
@ -24,9 +24,9 @@ func TestSeriesList_UnmarshalBinary(t *testing.T) {
// Ensure series list contains the correct term count and term encoding.
func TestSeriesList_Terms(t *testing.T) {
l := MustCreateSeriesList([]Series{
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
})
// Verify term count is correct.
@ -53,9 +53,9 @@ func TestSeriesList_Terms(t *testing.T) {
// Ensure series list contains the correct set of series.
func TestSeriesList_Series(t *testing.T) {
series := []Series{
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
{Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})},
}
l := MustCreateSeriesList(series)
@ -65,9 +65,11 @@ func TestSeriesList_Series(t *testing.T) {
}
// Ensure series can encode & decode correctly.
var name []byte
var tags models.Tags
for _, series := range series {
name, tags := l.DecodeSeries(l.EncodeSeries(series.Name, series.Tags))
if name != series.Name || !reflect.DeepEqual(tags, series.Tags) {
l.DecodeSeries(l.EncodeSeries(series.Name, series.Tags), &name, &tags)
if !bytes.Equal(name, series.Name) || !reflect.DeepEqual(tags, series.Tags) {
t.Fatalf("encoding mismatch: got=%s/%#v, exp=%s/%#v", name, tags, series.Name, series.Tags)
}
}
@ -82,7 +84,7 @@ func TestSeriesList_Series(t *testing.T) {
}
// Verify non-existent series doesn't exist.
if offset, deleted := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 {
if offset, deleted := l.SeriesOffset(l.EncodeSeries([]byte("foo"), models.NewTags(map[string]string{"region": "north"}))); offset != 0 {
t.Fatalf("series should not exist: offset=%d", offset)
} else if deleted {
t.Fatalf("series should not be deleted")
@ -125,6 +127,6 @@ func MustCreateSeriesList(a []Series) *tsi1.SeriesList {
// Series represents name/tagset pairs that are used in testing.
type Series struct {
Name string
Name []byte
Tags models.Tags
}

View File

@ -60,13 +60,13 @@ func (l *TermList) OffsetString(v string) uint32 {
}
// AppendEncodedSeries dictionary encodes a series and appends it to the buffer.
func (l *TermList) AppendEncodedSeries(dst []byte, name string, tags models.Tags) []byte {
func (l *TermList) AppendEncodedSeries(dst []byte, name []byte, tags models.Tags) []byte {
var buf [binary.MaxVarintLen32]byte
// Encode name.
offset := l.OffsetString(name)
offset := l.Offset(name)
if offset == 0 {
panic("name not in term list: " + name)
panic("name not in term list: " + string(name))
}
n := binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)

View File

@ -7,8 +7,19 @@ import (
"os"
"github.com/cespare/xxhash"
"github.com/influxdata/influxdb/models"
)
// MeasurementIterator represents a iterator over a list of measurements.
type MeasurementIterator interface {
Next() *MeasurementElem
}
// SeriesIterator represents a iterator over a list of series.
type SeriesIterator interface {
Next(name *[]byte, tags *models.Tags, deleted *bool)
}
// writeTo writes write v into w. Updates n.
func writeTo(w io.Writer, v []byte, n *int64) error {
nn, err := w.Write(v)

View File

@ -23,6 +23,7 @@ import (
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
/*
// Ensure engine can load the metadata index after reopening.
func TestEngine_LoadMetadataIndex(t *testing.T) {
e := MustOpenEngine()
@ -107,6 +108,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) {
t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags)
}
}
*/
// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart
func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
@ -571,7 +573,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index
// e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index
// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
@ -964,9 +966,9 @@ func MustOpenEngine() *Engine {
if err := e.Open(); err != nil {
panic(err)
}
if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil {
panic(err)
}
// if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil {
// panic(err)
// }
return e
}
@ -1010,6 +1012,7 @@ func (e *Engine) MustMeasurement(name string) *tsdb.Measurement {
return m
}
/*
// MustNewDatabaseIndex creates a tsdb.DatabaseIndex, panicking if there is an
// error doing do.
func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex {
@ -1019,6 +1022,7 @@ func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex {
}
return index
}
*/
// WritePointsString parses a string buffer and writes the points.
func (e *Engine) WritePointsString(buf ...string) error {

View File

@ -9,9 +9,6 @@ import (
)
type Index interface {
Open() error
Close() error
CreateMeasurementIndexIfNotExists(name string) (*Measurement, error)
Measurement(name []byte) (*Measurement, error)
Measurements() (Measurements, error)
@ -20,7 +17,7 @@ type Index interface {
MeasurementsByRegex(re *regexp.Regexp) (Measurements, error)
DropMeasurement(name []byte) error
CreateSeriesIndexIfNotExists(measurment string, series *Series) (*Series, error)
CreateSeriesIndexIfNotExists(measurement string, series *Series) (*Series, error)
Series(key []byte) (*Series, error)
DropSeries(keys []string) error
@ -29,6 +26,4 @@ type Index interface {
MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
TagsForSeries(key string) (models.Tags, error)
Dereference(b []byte)
}

View File

@ -11,8 +11,6 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/escape"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/estimator/hll"
internal "github.com/influxdata/influxdb/tsdb/internal"
"github.com/gogo/protobuf/proto"
@ -20,6 +18,7 @@ import (
//go:generate protoc --gogo_out=. internal/meta.proto
/*
// DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags.
// Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.
type DatabaseIndex struct {
@ -491,6 +490,7 @@ func (d *DatabaseIndex) Dereference(b []byte) {
s.Dereference(b)
}
}
*/
// Measurement represents a collection of time series in a database. It also
// contains in memory structures for indexing tags. Exported functions are
@ -601,7 +601,7 @@ func (m *Measurement) Cardinality(key string) int {
return n
}
// CardinalityBytes returns the number of values associated with the given tag key.
// CardinalityBytes returns the number of values associated with tag key
func (m *Measurement) CardinalityBytes(key []byte) int {
var n int
m.mu.RLock()
@ -1435,7 +1435,7 @@ func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
// Swap implements sort.Interface.
func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Measurements) intersect(other Measurements) Measurements {
func (a Measurements) Intersect(other Measurements) Measurements {
l := a
r := other
@ -1465,7 +1465,7 @@ func (a Measurements) intersect(other Measurements) Measurements {
return result
}
func (a Measurements) union(other Measurements) Measurements {
func (a Measurements) Union(other Measurements) Measurements {
result := make(Measurements, 0, len(a)+len(other))
var i, j int
for i < len(a) && j < len(other) {
@ -1900,6 +1900,13 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs)
return tagValues
}
func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs {
m.mu.RLock()
ret := m.seriesByTagKeyValue[key]
m.mu.RUnlock()
return ret
}
// stringSet represents a set of strings.
type stringSet map[string]struct{}

View File

@ -233,6 +233,7 @@ func benchmarkMarshalTags(b *testing.B, keyN int) {
}
}
/*
func BenchmarkCreateSeriesIndex_1K(b *testing.B) {
benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3))
}
@ -263,6 +264,7 @@ func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) {
}
}
}
*/
type TestSeries struct {
Measurement string

View File

@ -259,18 +259,6 @@ func (s *Shard) Open() error {
if err := e.Open(); err != nil {
return err
}
// Load metadata index.
start := time.Now()
index, err := NewDatabaseIndex(s.database)
if err != nil {
return err
}
if err := e.LoadMetadataIndex(s.id, index); err != nil {
return err
}
s.engine = e
s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))