From 2a81351992b9727c4384fd98c80448b10cffb69e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 18 Oct 2016 08:34:51 -0600 Subject: [PATCH] Implement tsdb.Index interface on tsi1.Index. --- models/points.go | 31 ++ tsdb/engine/tsi1/index.go | 652 +++++++++++----------------- tsdb/engine/tsi1/index_file.go | 495 +++++++++++++++++++++ tsdb/engine/tsi1/index_file_test.go | 161 +++++++ tsdb/engine/tsi1/index_test.go | 179 ++------ tsdb/engine/tsi1/measurement.go | 102 ++++- tsdb/engine/tsi1/series.go | 54 ++- tsdb/engine/tsi1/series_test.go | 28 +- tsdb/engine/tsi1/term_list.go | 6 +- tsdb/engine/tsi1/tsi1.go | 11 + tsdb/engine/tsm1/engine_test.go | 12 +- tsdb/index.go | 7 +- tsdb/meta.go | 17 +- tsdb/meta_test.go | 2 + tsdb/shard.go | 12 - 15 files changed, 1147 insertions(+), 622 deletions(-) create mode 100644 tsdb/engine/tsi1/index_file.go create mode 100644 tsdb/engine/tsi1/index_file_test.go diff --git a/models/points.go b/models/points.go index b5eb37e611..c6c6d80955 100644 --- a/models/points.go +++ b/models/points.go @@ -1792,6 +1792,37 @@ func (a Tags) HashKey() []byte { return b[:idx] } +// CopyTags returns a shallow copy of tags. +func CopyTags(a Tags) Tags { + other := make(Tags, len(a)) + copy(other, a) + return other +} + +// DeepCopyTags returns a deep copy of tags. +func DeepCopyTags(a Tags) Tags { + // Calculate size of keys/values in bytes. + var n int + for _, t := range a { + n += len(t.Key) + len(t.Value) + } + + // Build single allocation for all key/values. + buf := make([]byte, n) + + // Copy tags to new set. + other := make(Tags, len(a)) + for i, t := range a { + copy(buf, t.Key) + other[i].Key, buf = buf[:len(t.Key)], buf[len(t.Key):] + + copy(buf, t.Value) + other[i].Value, buf = buf[:len(t.Value)], buf[len(t.Value):] + } + + return other +} + // Fields represents a mapping between a Point's field names and their // values. type Fields map[string]interface{} diff --git a/tsdb/engine/tsi1/index.go b/tsdb/engine/tsi1/index.go index 015234a54e..69e2426f1e 100644 --- a/tsdb/engine/tsi1/index.go +++ b/tsdb/engine/tsi1/index.go @@ -1,447 +1,295 @@ package tsi1 import ( - "bytes" - "encoding/binary" - "errors" - "io" + "fmt" + "regexp" "sort" + "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/tsdb" ) -// IndexVersion is the current TSI1 index version. -const IndexVersion = 1 +// Ensure index implements the interface. +var _ tsdb.Index = &Index{} -// FileSignature represents a magic number at the header of the index file. -const FileSignature = "TSI1" - -// Index field size constants. -const ( - // Index trailer fields - IndexVersionSize = 2 - SeriesListOffsetSize = 8 - SeriesListSizeSize = 8 - MeasurementBlockOffsetSize = 8 - MeasurementBlockSizeSize = 8 - - IndexTrailerSize = IndexVersionSize + - SeriesListOffsetSize + - SeriesListSizeSize + - MeasurementBlockOffsetSize + - MeasurementBlockSizeSize -) - -// Index errors. -var ( - ErrInvalidIndex = errors.New("invalid index") - ErrUnsupportedIndexVersion = errors.New("unsupported index version") -) - -// Index represents a collection of measurement, tag, and series data. +// Index represents a collection of layered index files and WAL. type Index struct { - data []byte + file *IndexFile - // Components - slist SeriesList - mblk MeasurementBlock + // TODO(benbjohnson): Use layered list of index files. + + // TODO(benbjohnson): Add write ahead log. } -// UnmarshalBinary opens an index from data. -// The byte slice is retained so it must be kept open. -func (i *Index) UnmarshalBinary(data []byte) error { - // Ensure magic number exists at the beginning. - if len(data) < len(FileSignature) { - return io.ErrShortBuffer - } else if !bytes.Equal(data[:len(FileSignature)], []byte(FileSignature)) { - return ErrInvalidIndex - } +// SetFile explicitly sets a file in the index. +func (i *Index) SetFile(f *IndexFile) { i.file = f } - // Read index trailer. - t, err := ReadIndexTrailer(data) - if err != nil { - return err - } - - // Slice measurement block data. - buf := data[t.MeasurementBlock.Offset:] - buf = buf[:t.MeasurementBlock.Size] - - // Unmarshal measurement block. - if err := i.mblk.UnmarshalBinary(buf); err != nil { - return err - } - - // Slice series list data. - buf = data[t.SeriesList.Offset:] - buf = buf[:t.SeriesList.Size] - - // Unmarshal series list. - if err := i.slist.UnmarshalBinary(buf); err != nil { - return err - } - - // Save reference to entire data block. - i.data = data - - return nil +func (i *Index) CreateMeasurementIndexIfNotExists(name string) (*tsdb.Measurement, error) { + panic("TODO: Requires WAL") } -// Close closes the index file. -func (i *Index) Close() error { - i.slist = SeriesList{} - i.mblk = MeasurementBlock{} - return nil +// Measurement retrieves a measurement by name. +func (i *Index) Measurement(name []byte) (*tsdb.Measurement, error) { + return i.measurement(name), nil } -// TagValueElem returns a list of series ids for a measurement/tag/value. -func (i *Index) TagValueElem(name, key, value []byte) (TagValueElem, error) { - // Find measurement. - e, ok := i.mblk.Elem(name) - if !ok { - return TagValueElem{}, nil - } +func (i *Index) measurement(name []byte) *tsdb.Measurement { + m := tsdb.NewMeasurement(string(name)) - // Find tag set block. - tblk, err := i.tagSetBlock(&e) - if err != nil { - return TagValueElem{}, err - } - return tblk.TagValueElem(key, value), nil -} + // Iterate over measurement series. + itr := i.file.MeasurementSeriesIterator(name) -// tagSetBlock returns a tag set block for a measurement. -func (i *Index) tagSetBlock(e *MeasurementElem) (TagSet, error) { - // Slice tag set data. - buf := i.data[e.TagSet.Offset:] - buf = buf[:e.TagSet.Size] - - // Unmarshal block. - var blk TagSet - if err := blk.UnmarshalBinary(buf); err != nil { - return TagSet{}, err - } - return blk, nil -} - -// Indices represents a layered set of indices. -type Indices []*Index - -// IndexWriter represents a naive implementation of an index builder. -type IndexWriter struct { - series indexSeries - mms indexMeasurements -} - -// NewIndexWriter returns a new instance of IndexWriter. -func NewIndexWriter() *IndexWriter { - return &IndexWriter{ - mms: make(indexMeasurements), - } -} - -// Add adds a series to the index. -func (iw *IndexWriter) Add(name string, tags models.Tags) { - // Add to series list. - iw.series = append(iw.series, indexSerie{name: name, tags: tags}) - - // Find or create measurement. - mm, ok := iw.mms[name] - if !ok { - mm.name = []byte(name) - mm.tagset = make(indexTagset) - iw.mms[name] = mm - } - - // Add tagset. - for _, tag := range tags { - t, ok := mm.tagset[string(tag.Key)] - if !ok { - t.name = tag.Key - t.values = make(indexValues) - mm.tagset[string(tag.Key)] = t + var id uint64 // TEMPORARY + var sname []byte + var tags models.Tags + var deleted bool + for { + if itr.Next(&sname, &tags, &deleted); sname == nil { + break } - v, ok := t.values[string(tag.Value)] - if !ok { - v.name = tag.Value - t.values[string(tag.Value)] = v + // TODO: Handle deleted series. + + // Append series to to measurement. + // TODO: Remove concept of series ids. + m.AddSeries(&tsdb.Series{ + ID: id, + Key: string(sname), + Tags: models.CopyTags(tags), + }) + + // TEMPORARY: Increment ID. + id++ + } + + if !m.HasSeries() { + return nil + } + return m +} + +// Measurements returns a list of all measurements. +func (i *Index) Measurements() (tsdb.Measurements, error) { + var mms tsdb.Measurements + itr := i.file.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + mms = append(mms, i.measurement(e.Name)) + } + return mms, nil +} + +func (i *Index) MeasurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { + return i.measurementsByExpr(expr) +} + +func (i *Index) measurementsByExpr(expr influxql.Expr) (tsdb.Measurements, bool, error) { + if expr == nil { + return nil, false, nil + } + + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } + + // Retrieve value or regex expression from RHS. + var value string + var regex *regexp.Regexp + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + regex = re.Val + } else { + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + value = s.Val + } + + // Match on name, if specified. + if tag.Val == "_name" { + return i.measurementsByNameFilter(e.Op, value, regex), true, nil + } else if influxql.IsSystemName(tag.Val) { + return nil, false, nil + } + return i.measurementsByTagFilter(e.Op, tag.Val, value, regex), true, nil + + case influxql.OR, influxql.AND: + lhsIDs, lhsOk, err := i.measurementsByExpr(e.LHS) + if err != nil { + return nil, false, err + } + + rhsIDs, rhsOk, err := i.measurementsByExpr(e.RHS) + if err != nil { + return nil, false, err + } + + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsIDs.Union(rhsIDs), true, nil + } + return lhsIDs.Intersect(rhsIDs), true, nil + } else if lhsOk { + return lhsIDs, true, nil + } else if rhsOk { + return rhsIDs, true, nil + } + return nil, false, nil + + default: + return nil, false, fmt.Errorf("invalid tag comparison operator") } + + case *influxql.ParenExpr: + return i.measurementsByExpr(e.Expr) + default: + return nil, false, fmt.Errorf("%#v", expr) } } -// WriteTo writes the index to w. -func (iw *IndexWriter) WriteTo(w io.Writer) (n int64, err error) { - var t IndexTrailer +// measurementsByNameFilter returns the sorted measurements matching a name. +func (i *Index) measurementsByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) tsdb.Measurements { + var mms tsdb.Measurements + itr := i.file.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + var matched bool + switch op { + case influxql.EQ: + matched = string(e.Name) == val + case influxql.NEQ: + matched = string(e.Name) != val + case influxql.EQREGEX: + matched = regex.Match(e.Name) + case influxql.NEQREGEX: + matched = !regex.Match(e.Name) + } - // Write magic number. - if err := writeTo(w, []byte(FileSignature), &n); err != nil { - return n, err + if matched { + mms = append(mms, i.measurement(e.Name)) + } } - - // Write series list. - t.SeriesList.Offset = n - if err := iw.writeSeriesListTo(w, &n); err != nil { - return n, err - } - t.SeriesList.Size = n - t.SeriesList.Offset - - // Sort measurement names. - names := iw.mms.Names() - - // Write tagset blocks in measurement order. - if err := iw.writeTagsetsTo(w, names, &n); err != nil { - return n, err - } - - // Write measurement block. - t.MeasurementBlock.Offset = n - if err := iw.writeMeasurementBlockTo(w, names, &n); err != nil { - return n, err - } - t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset - - // Write trailer. - if err := iw.writeTrailerTo(w, t, &n); err != nil { - return n, err - } - - return n, nil + sort.Sort(mms) + return mms } -func (iw *IndexWriter) writeSeriesListTo(w io.Writer, n *int64) error { - // Ensure series are sorted. - sort.Sort(iw.series) +func (i *Index) measurementsByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) tsdb.Measurements { + var mms tsdb.Measurements + itr := i.file.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + mm := i.measurement(e.Name) - // Write all series. - sw := NewSeriesListWriter() - for _, serie := range iw.series { - if err := sw.Add(serie.name, serie.tags); err != nil { - return err - } - } - - // Flush series list. - nn, err := sw.WriteTo(w) - *n += nn - if err != nil { - return err - } - - // Add series to each measurement and key/value. - for i := range iw.series { - serie := &iw.series[i] - - // Lookup series offset. - serie.offset = sw.Offset(serie.name, serie.tags) - if serie.offset == 0 { - panic("series not found") - } - - // Add series id to measurement, tag key, and tag value. - mm := iw.mms[serie.name] - mm.seriesIDs = append(mm.seriesIDs, serie.offset) - iw.mms[serie.name] = mm - - // Add series id to each tag value. - for _, tag := range serie.tags { - t := mm.tagset[string(tag.Key)] - - v := t.values[string(tag.Value)] - v.seriesIDs = append(v.seriesIDs, serie.offset) - t.values[string(tag.Value)] = v - } - } - - return nil -} - -func (iw *IndexWriter) writeTagsetsTo(w io.Writer, names []string, n *int64) error { - for _, name := range names { - if err := iw.writeTagsetTo(w, name, n); err != nil { - return err - } - } - return nil -} - -// writeTagsetTo writes a single tagset to w and saves the tagset offset. -func (iw *IndexWriter) writeTagsetTo(w io.Writer, name string, n *int64) error { - mm := iw.mms[name] - - tsw := NewTagSetWriter() - for _, tag := range mm.tagset { - // Mark tag deleted. - if tag.deleted { - tsw.AddTag(tag.name, true) + tagVals := mm.SeriesByTagKeyValue(key) + if tagVals == nil { continue } - // Add each value. - for _, value := range tag.values { - sort.Sort(uint32Slice(value.seriesIDs)) - tsw.AddTagValue(tag.name, value.name, value.deleted, value.seriesIDs) + // If the operator is non-regex, only check the specified value. + var tagMatch bool + if op == influxql.EQ || op == influxql.NEQ { + if _, ok := tagVals[val]; ok { + tagMatch = true + } + } else { + // Else, the operator is a regex and we have to check all tag + // values against the regular expression. + for tagVal := range tagVals { + if regex.MatchString(tagVal) { + tagMatch = true + break + } + } + } + + // + // XNOR gate + // + // tags match | operation is EQ | measurement matches + // -------------------------------------------------- + // True | True | True + // True | False | False + // False | True | False + // False | False | True + if tagMatch == (op == influxql.EQ || op == influxql.EQREGEX) { + mms = append(mms, mm) + break } } - // Save tagset offset to measurement. - mm.offset = *n + sort.Sort(mms) + return mms +} - // Write tagset to writer. - nn, err := tsw.WriteTo(w) - *n += nn +func (i *Index) MeasurementsByName(names []string) ([]*tsdb.Measurement, error) { + itr := i.file.MeasurementIterator() + mms := make([]*tsdb.Measurement, 0, len(names)) + for e := itr.Next(); e != nil; e = itr.Next() { + for _, name := range names { + if string(e.Name) == name { + mms = append(mms, i.measurement(e.Name)) + break + } + } + } + return mms, nil +} + +func (i *Index) MeasurementsByRegex(re *regexp.Regexp) (tsdb.Measurements, error) { + itr := i.file.MeasurementIterator() + var mms tsdb.Measurements + for e := itr.Next(); e != nil; e = itr.Next() { + if re.Match(e.Name) { + mms = append(mms, i.measurement(e.Name)) + } + } + return mms, nil +} + +func (i *Index) DropMeasurement(name []byte) error { + panic("TODO: Requires WAL") +} + +func (i *Index) CreateSeriesIndexIfNotExists(measurement string, series *tsdb.Series) (*tsdb.Series, error) { + panic("TODO: Requires WAL") +} + +func (i *Index) Series(key []byte) (*tsdb.Series, error) { + panic("TODO") +} + +func (i *Index) DropSeries(keys []string) error { + panic("TODO: Requires WAL") +} + +func (i *Index) SeriesN() (n uint64, err error) { + itr := i.file.MeasurementIterator() + for e := itr.Next(); e != nil; e = itr.Next() { + n += uint64(e.Series.N) + } + return n, nil +} + +func (i *Index) TagsForSeries(key string) (models.Tags, error) { + ss, err := i.Series([]byte(key)) if err != nil { - return err + return nil, err } - - // Save tagset offset to measurement. - mm.size = *n - mm.offset - - iw.mms[name] = mm - - return nil + return ss.Tags, nil } -func (iw *IndexWriter) writeMeasurementBlockTo(w io.Writer, names []string, n *int64) error { - mw := NewMeasurementBlockWriter() - - // Add measurement data. - for _, mm := range iw.mms { - mw.Add(mm.name, mm.offset, mm.size, mm.seriesIDs) - } - - // Write data to writer. - nn, err := mw.WriteTo(w) - *n += nn - if err != nil { - return err - } - - return nil +func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { + panic("TODO") } -// writeTrailerTo writes the index trailer to w. -func (iw *IndexWriter) writeTrailerTo(w io.Writer, t IndexTrailer, n *int64) error { - // Write series list info. - if err := writeUint64To(w, uint64(t.SeriesList.Offset), n); err != nil { - return err - } else if err := writeUint64To(w, uint64(t.SeriesList.Size), n); err != nil { - return err - } - - // Write measurement block info. - if err := writeUint64To(w, uint64(t.MeasurementBlock.Offset), n); err != nil { - return err - } else if err := writeUint64To(w, uint64(t.MeasurementBlock.Size), n); err != nil { - return err - } - - // Write index encoding version. - if err := writeUint16To(w, IndexVersion, n); err != nil { - return err - } - - return nil +func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { + panic("TODO") } - -type indexSerie struct { - name string - tags models.Tags - deleted bool - offset uint32 -} - -type indexSeries []indexSerie - -func (a indexSeries) Len() int { return len(a) } -func (a indexSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a indexSeries) Less(i, j int) bool { - if a[i].name != a[j].name { - return a[i].name < a[j].name - } - return models.CompareTags(a[i].tags, a[j].tags) == -1 -} - -type indexMeasurement struct { - name []byte - deleted bool - tagset indexTagset - offset int64 // tagset offset - size int64 // tagset size - seriesIDs []uint32 -} - -type indexMeasurements map[string]indexMeasurement - -// Names returns a sorted list of measurement names. -func (m indexMeasurements) Names() []string { - a := make([]string, 0, len(m)) - for name := range m { - a = append(a, name) - } - sort.Strings(a) - return a -} - -type indexTag struct { - name []byte - deleted bool - values indexValues -} - -type indexTagset map[string]indexTag - -type indexValue struct { - name []byte - deleted bool - seriesIDs []uint32 -} - -type indexValues map[string]indexValue - -// ReadIndexTrailer returns the index trailer from data. -func ReadIndexTrailer(data []byte) (IndexTrailer, error) { - var t IndexTrailer - - // Read version. - t.Version = int(binary.BigEndian.Uint16(data[len(data)-IndexVersionSize:])) - if t.Version != IndexVersion { - return t, ErrUnsupportedIndexVersion - } - - // Slice trailer data. - buf := data[len(data)-IndexTrailerSize:] - - // Read series list info. - t.SeriesList.Offset = int64(binary.BigEndian.Uint64(buf[0:SeriesListOffsetSize])) - buf = buf[SeriesListOffsetSize:] - t.SeriesList.Size = int64(binary.BigEndian.Uint64(buf[0:SeriesListSizeSize])) - buf = buf[SeriesListSizeSize:] - - // Read measurement block info. - t.MeasurementBlock.Offset = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockOffsetSize])) - buf = buf[MeasurementBlockOffsetSize:] - t.MeasurementBlock.Size = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockSizeSize])) - buf = buf[MeasurementBlockSizeSize:] - - return t, nil -} - -// IndexTrailer represents meta data written to the end of the index. -type IndexTrailer struct { - Version int - SeriesList struct { - Offset int64 - Size int64 - } - MeasurementBlock struct { - Offset int64 - Size int64 - } -} - -type uint32Slice []uint32 - -func (a uint32Slice) Len() int { return len(a) } -func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] } diff --git a/tsdb/engine/tsi1/index_file.go b/tsdb/engine/tsi1/index_file.go new file mode 100644 index 0000000000..c8b181ade6 --- /dev/null +++ b/tsdb/engine/tsi1/index_file.go @@ -0,0 +1,495 @@ +package tsi1 + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "sort" + + "github.com/influxdata/influxdb/models" +) + +// IndexFileVersion is the current TSI1 index file version. +const IndexFileVersion = 1 + +// FileSignature represents a magic number at the header of the index file. +const FileSignature = "TSI1" + +// IndexFile field size constants. +const ( + // IndexFile trailer fields + IndexFileVersionSize = 2 + SeriesListOffsetSize = 8 + SeriesListSizeSize = 8 + MeasurementBlockOffsetSize = 8 + MeasurementBlockSizeSize = 8 + + IndexFileTrailerSize = IndexFileVersionSize + + SeriesListOffsetSize + + SeriesListSizeSize + + MeasurementBlockOffsetSize + + MeasurementBlockSizeSize +) + +// IndexFile errors. +var ( + ErrInvalidIndexFile = errors.New("invalid index file") + ErrUnsupportedIndexFileVersion = errors.New("unsupported index file version") +) + +// IndexFile represents a collection of measurement, tag, and series data. +type IndexFile struct { + data []byte + + // Components + slist SeriesList + mblk MeasurementBlock +} + +// UnmarshalBinary opens an index from data. +// The byte slice is retained so it must be kept open. +func (i *IndexFile) UnmarshalBinary(data []byte) error { + // Ensure magic number exists at the beginning. + if len(data) < len(FileSignature) { + return io.ErrShortBuffer + } else if !bytes.Equal(data[:len(FileSignature)], []byte(FileSignature)) { + return ErrInvalidIndexFile + } + + // Read index file trailer. + t, err := ReadIndexFileTrailer(data) + if err != nil { + return err + } + + // Slice measurement block data. + buf := data[t.MeasurementBlock.Offset:] + buf = buf[:t.MeasurementBlock.Size] + + // Unmarshal measurement block. + if err := i.mblk.UnmarshalBinary(buf); err != nil { + return err + } + + // Slice series list data. + buf = data[t.SeriesList.Offset:] + buf = buf[:t.SeriesList.Size] + + // Unmarshal series list. + if err := i.slist.UnmarshalBinary(buf); err != nil { + return err + } + + // Save reference to entire data block. + i.data = data + + return nil +} + +// Close closes the index file. +func (i *IndexFile) Close() error { + i.slist = SeriesList{} + i.mblk = MeasurementBlock{} + return nil +} + +// TagValueElem returns a list of series ids for a measurement/tag/value. +func (i *IndexFile) TagValueElem(name, key, value []byte) (TagValueElem, error) { + // Find measurement. + e, ok := i.mblk.Elem(name) + if !ok { + return TagValueElem{}, nil + } + + // Find tag set block. + tblk, err := i.tagSetBlock(&e) + if err != nil { + return TagValueElem{}, err + } + return tblk.TagValueElem(key, value), nil +} + +// tagSetBlock returns a tag set block for a measurement. +func (i *IndexFile) tagSetBlock(e *MeasurementElem) (TagSet, error) { + // Slice tag set data. + buf := i.data[e.TagSet.Offset:] + buf = buf[:e.TagSet.Size] + + // Unmarshal block. + var blk TagSet + if err := blk.UnmarshalBinary(buf); err != nil { + return TagSet{}, err + } + return blk, nil +} + +// MeasurementIterator returns an iterator over all measurements. +func (i *IndexFile) MeasurementIterator() MeasurementIterator { + return i.mblk.Iterator() +} + +// MeasurementSeriesIterator returns an iterator over a measurement's series. +func (i *IndexFile) MeasurementSeriesIterator(name []byte) SeriesIterator { + // Find measurement element. + e, ok := i.mblk.Elem(name) + if !ok { + return &seriesIterator{} + } + + // Return iterator. + return &seriesIterator{ + n: e.Series.N, + data: e.Series.Data, + seriesList: &i.slist, + } +} + +// seriesIterator iterates over a list of raw data. +type seriesIterator struct { + i, n uint32 + data []byte + + seriesList *SeriesList +} + +// Next returns the next decoded series. Uses name & tags as reusable buffers. +// Returns nils when the iterator is complete. +func (itr *seriesIterator) Next(name *[]byte, tags *models.Tags, deleted *bool) { + // Return nil if we've reached the end. + if itr.i == itr.n { + *name, *tags = nil, nil + return + } + + // Move forward and retrieved offset. + offset := binary.BigEndian.Uint32(itr.data[itr.i*SeriesIDSize:]) + + // Read from series list into buffers. + itr.seriesList.DecodeSeriesAt(offset, name, tags, deleted) + + // Move iterator forward. + itr.i++ +} + +// IndexFiles represents a layered set of index files. +type IndexFiles []*IndexFile + +// IndexFileWriter represents a naive implementation of an index file builder. +type IndexFileWriter struct { + series indexFileSeries + mms indexFileMeasurements +} + +// NewIndexFileWriter returns a new instance of IndexFileWriter. +func NewIndexFileWriter() *IndexFileWriter { + return &IndexFileWriter{ + mms: make(indexFileMeasurements), + } +} + +// Add adds a series to the index file. +func (iw *IndexFileWriter) Add(name []byte, tags models.Tags) { + // Add to series list. + iw.series = append(iw.series, indexFileSerie{name: name, tags: tags}) + + // Find or create measurement. + mm, ok := iw.mms[string(name)] + if !ok { + mm.name = name + mm.tagset = make(indexFileTagset) + iw.mms[string(name)] = mm + } + + // Add tagset. + for _, tag := range tags { + t, ok := mm.tagset[string(tag.Key)] + if !ok { + t.name = tag.Key + t.values = make(indexFileValues) + mm.tagset[string(tag.Key)] = t + } + + v, ok := t.values[string(tag.Value)] + if !ok { + v.name = tag.Value + t.values[string(tag.Value)] = v + } + } +} + +// WriteTo writes the index file to w. +func (iw *IndexFileWriter) WriteTo(w io.Writer) (n int64, err error) { + var t IndexFileTrailer + + // Write magic number. + if err := writeTo(w, []byte(FileSignature), &n); err != nil { + return n, err + } + + // Write series list. + t.SeriesList.Offset = n + if err := iw.writeSeriesListTo(w, &n); err != nil { + return n, err + } + t.SeriesList.Size = n - t.SeriesList.Offset + + // Sort measurement names. + names := iw.mms.Names() + + // Write tagset blocks in measurement order. + if err := iw.writeTagsetsTo(w, names, &n); err != nil { + return n, err + } + + // Write measurement block. + t.MeasurementBlock.Offset = n + if err := iw.writeMeasurementBlockTo(w, names, &n); err != nil { + return n, err + } + t.MeasurementBlock.Size = n - t.MeasurementBlock.Offset + + // Write trailer. + if err := iw.writeTrailerTo(w, t, &n); err != nil { + return n, err + } + + return n, nil +} + +func (iw *IndexFileWriter) writeSeriesListTo(w io.Writer, n *int64) error { + // Ensure series are sorted. + sort.Sort(iw.series) + + // Write all series. + sw := NewSeriesListWriter() + for _, serie := range iw.series { + if err := sw.Add(serie.name, serie.tags); err != nil { + return err + } + } + + // Flush series list. + nn, err := sw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + // Add series to each measurement and key/value. + for i := range iw.series { + serie := &iw.series[i] + + // Lookup series offset. + serie.offset = sw.Offset(serie.name, serie.tags) + if serie.offset == 0 { + panic("series not found") + } + + // Add series id to measurement, tag key, and tag value. + mm := iw.mms[string(serie.name)] + mm.seriesIDs = append(mm.seriesIDs, serie.offset) + iw.mms[string(serie.name)] = mm + + // Add series id to each tag value. + for _, tag := range serie.tags { + t := mm.tagset[string(tag.Key)] + + v := t.values[string(tag.Value)] + v.seriesIDs = append(v.seriesIDs, serie.offset) + t.values[string(tag.Value)] = v + } + } + + return nil +} + +func (iw *IndexFileWriter) writeTagsetsTo(w io.Writer, names []string, n *int64) error { + for _, name := range names { + if err := iw.writeTagsetTo(w, name, n); err != nil { + return err + } + } + return nil +} + +// writeTagsetTo writes a single tagset to w and saves the tagset offset. +func (iw *IndexFileWriter) writeTagsetTo(w io.Writer, name string, n *int64) error { + mm := iw.mms[name] + + tsw := NewTagSetWriter() + for _, tag := range mm.tagset { + // Mark tag deleted. + if tag.deleted { + tsw.AddTag(tag.name, true) + continue + } + + // Add each value. + for _, value := range tag.values { + sort.Sort(uint32Slice(value.seriesIDs)) + tsw.AddTagValue(tag.name, value.name, value.deleted, value.seriesIDs) + } + } + + // Save tagset offset to measurement. + mm.offset = *n + + // Write tagset to writer. + nn, err := tsw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + // Save tagset offset to measurement. + mm.size = *n - mm.offset + + iw.mms[name] = mm + + return nil +} + +func (iw *IndexFileWriter) writeMeasurementBlockTo(w io.Writer, names []string, n *int64) error { + mw := NewMeasurementBlockWriter() + + // Add measurement data. + for _, mm := range iw.mms { + mw.Add(mm.name, mm.offset, mm.size, mm.seriesIDs) + } + + // Write data to writer. + nn, err := mw.WriteTo(w) + *n += nn + if err != nil { + return err + } + + return nil +} + +// writeTrailerTo writes the index file trailer to w. +func (iw *IndexFileWriter) writeTrailerTo(w io.Writer, t IndexFileTrailer, n *int64) error { + // Write series list info. + if err := writeUint64To(w, uint64(t.SeriesList.Offset), n); err != nil { + return err + } else if err := writeUint64To(w, uint64(t.SeriesList.Size), n); err != nil { + return err + } + + // Write measurement block info. + if err := writeUint64To(w, uint64(t.MeasurementBlock.Offset), n); err != nil { + return err + } else if err := writeUint64To(w, uint64(t.MeasurementBlock.Size), n); err != nil { + return err + } + + // Write index file encoding version. + if err := writeUint16To(w, IndexFileVersion, n); err != nil { + return err + } + + return nil +} + +type indexFileSerie struct { + name []byte + tags models.Tags + deleted bool + offset uint32 +} + +type indexFileSeries []indexFileSerie + +func (a indexFileSeries) Len() int { return len(a) } +func (a indexFileSeries) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a indexFileSeries) Less(i, j int) bool { + if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 { + return cmp == -1 + } + return models.CompareTags(a[i].tags, a[j].tags) == -1 +} + +type indexFileMeasurement struct { + name []byte + deleted bool + tagset indexFileTagset + offset int64 // tagset offset + size int64 // tagset size + seriesIDs []uint32 +} + +type indexFileMeasurements map[string]indexFileMeasurement + +// Names returns a sorted list of measurement names. +func (m indexFileMeasurements) Names() []string { + a := make([]string, 0, len(m)) + for name := range m { + a = append(a, name) + } + sort.Strings(a) + return a +} + +type indexFileTag struct { + name []byte + deleted bool + values indexFileValues +} + +type indexFileTagset map[string]indexFileTag + +type indexFileValue struct { + name []byte + deleted bool + seriesIDs []uint32 +} + +type indexFileValues map[string]indexFileValue + +// ReadIndexFileTrailer returns the index file trailer from data. +func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) { + var t IndexFileTrailer + + // Read version. + t.Version = int(binary.BigEndian.Uint16(data[len(data)-IndexFileVersionSize:])) + if t.Version != IndexFileVersion { + return t, ErrUnsupportedIndexFileVersion + } + + // Slice trailer data. + buf := data[len(data)-IndexFileTrailerSize:] + + // Read series list info. + t.SeriesList.Offset = int64(binary.BigEndian.Uint64(buf[0:SeriesListOffsetSize])) + buf = buf[SeriesListOffsetSize:] + t.SeriesList.Size = int64(binary.BigEndian.Uint64(buf[0:SeriesListSizeSize])) + buf = buf[SeriesListSizeSize:] + + // Read measurement block info. + t.MeasurementBlock.Offset = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockOffsetSize])) + buf = buf[MeasurementBlockOffsetSize:] + t.MeasurementBlock.Size = int64(binary.BigEndian.Uint64(buf[0:MeasurementBlockSizeSize])) + buf = buf[MeasurementBlockSizeSize:] + + return t, nil +} + +// IndexFileTrailer represents meta data written to the end of the index file. +type IndexFileTrailer struct { + Version int + SeriesList struct { + Offset int64 + Size int64 + } + MeasurementBlock struct { + Offset int64 + Size int64 + } +} + +type uint32Slice []uint32 + +func (a uint32Slice) Len() int { return len(a) } +func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] } diff --git a/tsdb/engine/tsi1/index_file_test.go b/tsdb/engine/tsi1/index_file_test.go new file mode 100644 index 0000000000..d40171b620 --- /dev/null +++ b/tsdb/engine/tsi1/index_file_test.go @@ -0,0 +1,161 @@ +package tsi1_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/engine/tsi1" +) + +// Ensure a simple index file can be built and opened. +func TestCreateIndexFile(t *testing.T) { + if _, err := CreateIndexFile([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }); err != nil { + t.Fatal(err) + } +} + +// Ensure index file generation can be successfully built. +func TestGenerateIndexFile(t *testing.T) { + // Build generated index file. + idx, err := GenerateIndexFile(10, 3, 4) + if err != nil { + t.Fatal(err) + } + + // Verify that tag/value series can be fetched. + if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil { + t.Fatal(err) + } else if e.Series.N == 0 { + t.Fatal("expected series") + } +} + +func BenchmarkIndexFile_TagValueSeries(b *testing.B) { + b.Run("M=1,K=2,V=3", func(b *testing.B) { + benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(1, 2, 3)) + }) + b.Run("M=10,K=5,V=5", func(b *testing.B) { + benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(10, 5, 5)) + }) + b.Run("M=10,K=7,V=5", func(b *testing.B) { + benchmarkIndexFile_TagValueSeries(b, MustFindOrGenerateIndexFile(10, 7, 7)) + }) +} + +func benchmarkIndexFile_TagValueSeries(b *testing.B, idx *tsi1.IndexFile) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil { + b.Fatal(err) + } else if e.Series.N == 0 { + b.Fatal("expected series") + } + } +} + +// CreateIndexFile creates an index file with a given set of series. +func CreateIndexFile(series []Series) (*tsi1.IndexFile, error) { + // Add series to the writer. + ifw := tsi1.NewIndexFileWriter() + for _, serie := range series { + ifw.Add(serie.Name, serie.Tags) + } + + // Write index file to buffer. + var buf bytes.Buffer + if _, err := ifw.WriteTo(&buf); err != nil { + return nil, err + } + + // Load index file from buffer. + var f tsi1.IndexFile + if err := f.UnmarshalBinary(buf.Bytes()); err != nil { + return nil, err + } + return &f, nil +} + +// GenerateIndexFile generates an index file from a set of series based on the count arguments. +// Total series returned will equal measurementN * tagN * valueN. +func GenerateIndexFile(measurementN, tagN, valueN int) (*tsi1.IndexFile, error) { + tagValueN := pow(valueN, tagN) + + iw := tsi1.NewIndexFileWriter() + for i := 0; i < measurementN; i++ { + name := []byte(fmt.Sprintf("measurement%d", i)) + + // Generate tag sets. + for j := 0; j < tagValueN; j++ { + var tags models.Tags + for k := 0; k < tagN; k++ { + key := []byte(fmt.Sprintf("key%d", k)) + value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN))) + tags = append(tags, models.Tag{Key: key, Value: value}) + } + iw.Add(name, tags) + } + } + + // Write index file to buffer. + var buf bytes.Buffer + if _, err := iw.WriteTo(&buf); err != nil { + return nil, err + } + + // Load index file from buffer. + var idx tsi1.IndexFile + if err := idx.UnmarshalBinary(buf.Bytes()); err != nil { + return nil, err + } + return &idx, nil +} + +func MustGenerateIndexFile(measurementN, tagN, valueN int) *tsi1.IndexFile { + idx, err := GenerateIndexFile(measurementN, tagN, valueN) + if err != nil { + panic(err) + } + return idx +} + +var indexFileCache struct { + MeasurementN int + TagN int + ValueN int + + IndexFile *tsi1.IndexFile +} + +// MustFindOrGenerateIndexFile returns a cached index file or generates one if it doesn't exist. +func MustFindOrGenerateIndexFile(measurementN, tagN, valueN int) *tsi1.IndexFile { + // Use cache if fields match and the index file has been generated. + if indexFileCache.MeasurementN == measurementN && + indexFileCache.TagN == tagN && + indexFileCache.ValueN == valueN && + indexFileCache.IndexFile != nil { + return indexFileCache.IndexFile + } + + // Generate and cache. + indexFileCache.MeasurementN = measurementN + indexFileCache.TagN = tagN + indexFileCache.ValueN = valueN + indexFileCache.IndexFile = MustGenerateIndexFile(measurementN, tagN, valueN) + return indexFileCache.IndexFile +} + +func pow(x, y int) int { + r := 1 + for i := 0; i < y; i++ { + r *= x + } + return r +} diff --git a/tsdb/engine/tsi1/index_test.go b/tsdb/engine/tsi1/index_test.go index 5f1cf07028..27888637ee 100644 --- a/tsdb/engine/tsi1/index_test.go +++ b/tsdb/engine/tsi1/index_test.go @@ -1,158 +1,67 @@ package tsi1_test import ( - "bytes" - "fmt" "testing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/engine/tsi1" ) -// Ensure a simple index can be built and opened. -func TestIndex(t *testing.T) { - series := []Series{ - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})}, - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})}, - {Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})}, - } - - // Add series to the writer. - iw := tsi1.NewIndexWriter() - for _, serie := range series { - iw.Add(serie.Name, serie.Tags) - } - - // Write index to buffer. - var buf bytes.Buffer - if _, err := iw.WriteTo(&buf); err != nil { - t.Fatal(err) - } - - // Load index from buffer. - var idx tsi1.Index - if err := idx.UnmarshalBinary(buf.Bytes()); err != nil { - t.Fatal(err) - } -} - -// Ensure index generation can be successfully built. -func TestGenerateIndex(t *testing.T) { - // Build generated index. - idx, err := GenerateIndex(10, 3, 4) +// Ensure index can return a single measurement by name. +func TestIndex_Measurement(t *testing.T) { + // Build an index file. + f, err := CreateIndexFile([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }) if err != nil { t.Fatal(err) } - // Verify that tag/value series can be fetched. - if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil { - t.Fatal(err) - } else if e.Series.N == 0 { - t.Fatal("expected series") - } -} - -func BenchmarkIndex_TagValueSeries(b *testing.B) { - b.Run("M=1,K=2,V=3", func(b *testing.B) { - benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(1, 2, 3)) - }) - b.Run("M=10,K=5,V=5", func(b *testing.B) { - benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(10, 5, 5)) - }) - b.Run("M=10,K=7,V=5", func(b *testing.B) { - benchmarkIndex_TagValueSeries(b, MustFindOrGenerateIndex(10, 7, 7)) - }) -} - -func benchmarkIndex_TagValueSeries(b *testing.B, idx *tsi1.Index) { - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - if e, err := idx.TagValueElem([]byte("measurement0"), []byte("key0"), []byte("value0")); err != nil { - b.Fatal(err) - } else if e.Series.N == 0 { - b.Fatal("expected series") - } - } -} - -// GenerateIndex Generates an index from a set of series based on the count arguments. -// Total series returned will equal measurementN * tagN * valueN. -func GenerateIndex(measurementN, tagN, valueN int) (*tsi1.Index, error) { - tagValueN := pow(valueN, tagN) - - println("generating", measurementN*pow(valueN, tagN)) - - iw := tsi1.NewIndexWriter() - for i := 0; i < measurementN; i++ { - name := fmt.Sprintf("measurement%d", i) - - // Generate tag sets. - for j := 0; j < tagValueN; j++ { - var tags models.Tags - for k := 0; k < tagN; k++ { - key := []byte(fmt.Sprintf("key%d", k)) - value := []byte(fmt.Sprintf("value%d", (j / pow(valueN, k) % valueN))) - tags = append(tags, models.Tag{Key: key, Value: value}) - } - iw.Add(name, tags) - } - } - - // Write index to buffer. - var buf bytes.Buffer - if _, err := iw.WriteTo(&buf); err != nil { - return nil, err - } - println("file size", buf.Len()) - - // Load index from buffer. + // Create an index from the single file. var idx tsi1.Index - if err := idx.UnmarshalBinary(buf.Bytes()); err != nil { - return nil, err + idx.SetFile(f) + + // Verify measurement is correct. + if mm, err := idx.Measurement([]byte("cpu")); err != nil { + t.Fatal(err) + } else if mm == nil { + t.Fatal("expected measurement") + } + + // Verify non-existent measurement doesn't exist. + if mm, err := idx.Measurement([]byte("no_such_measurement")); err != nil { + t.Fatal(err) + } else if mm != nil { + t.Fatal("expected nil measurement") } - return &idx, nil } -func MustGenerateIndex(measurementN, tagN, valueN int) *tsi1.Index { - idx, err := GenerateIndex(measurementN, tagN, valueN) +// Ensure index can return a list of all measurements. +func TestIndex_Measurements(t *testing.T) { + // Build an index file. + f, err := CreateIndexFile([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, + }) if err != nil { - panic(err) - } - return idx -} - -var indexCache struct { - MeasurementN int - TagN int - ValueN int - - Index *tsi1.Index -} - -// MustFindOrGenerateIndex returns a cached index or generates one if it doesn't exist. -func MustFindOrGenerateIndex(measurementN, tagN, valueN int) *tsi1.Index { - // Use cache if fields match and the index has been generated. - if indexCache.MeasurementN == measurementN && - indexCache.TagN == tagN && - indexCache.ValueN == valueN && - indexCache.Index != nil { - return indexCache.Index + t.Fatal(err) } - // Generate and cache. - indexCache.MeasurementN = measurementN - indexCache.TagN = tagN - indexCache.ValueN = valueN - indexCache.Index = MustGenerateIndex(measurementN, tagN, valueN) - return indexCache.Index -} + // Create an index from the single file. + var idx tsi1.Index + idx.SetFile(f) -func pow(x, y int) int { - r := 1 - for i := 0; i < y; i++ { - r *= x + // Retrieve measurements and verify. + if mms, err := idx.Measurements(); err != nil { + t.Fatal(err) + } else if len(mms) != 2 { + t.Fatalf("expected measurement count: %d", len(mms)) + } else if mms[0].Name != "cpu" { + t.Fatalf("unexpected measurement(0): %s", mms[0].Name) + } else if mms[1].Name != "mem" { + t.Fatalf("unexpected measurement(1): %s", mms[1].Name) } - return r } diff --git a/tsdb/engine/tsi1/measurement.go b/tsdb/engine/tsi1/measurement.go index 960980f419..0c875ef67a 100644 --- a/tsdb/engine/tsi1/measurement.go +++ b/tsdb/engine/tsi1/measurement.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "io" + "sort" "github.com/influxdata/influxdb/pkg/rhh" ) @@ -19,6 +20,9 @@ const ( // Measurement field size constants. const ( + // 1 byte offset for the block to ensure non-zero offsets. + MeasurementFillSize = 1 + // Measurement trailer fields MeasurementBlockVersionSize = 2 MeasurementBlockSize = 8 @@ -109,6 +113,33 @@ func (blk *MeasurementBlock) UnmarshalBinary(data []byte) error { return nil } +// Iterator returns an iterator over all measurements. +func (blk *MeasurementBlock) Iterator() MeasurementIterator { + return &measurementIterator{data: blk.data[MeasurementFillSize:]} +} + +// measurementIterator iterates over a list measurements in a block. +type measurementIterator struct { + elem MeasurementElem + data []byte +} + +// Next returns the next measurement. Returns false when iterator is complete. +func (itr *measurementIterator) Next() *MeasurementElem { + // Return nil when we run out of data. + if len(itr.data) == 0 { + return nil + } + + // Unmarshal the element at the current position. + itr.elem.UnmarshalBinary(itr.data) + + // Move the data forward past the record. + itr.data = itr.data[itr.elem.Size:] + + return &itr.elem +} + // ReadMeasurementBlockTrailer returns the trailer from data. func ReadMeasurementBlockTrailer(data []byte) (MeasurementBlockTrailer, error) { var t MeasurementBlockTrailer @@ -170,6 +201,14 @@ type MeasurementElem struct { N uint32 // series count Data []byte // serialized series data } + + // Size in bytes, set after unmarshaling. + Size int +} + +// Deleted returns true if the tombstone flag is set. +func (e *MeasurementElem) Deleted() bool { + return (e.Flag & MeasurementTombstoneFlag) != 0 } // SeriesID returns series ID at an index. @@ -188,6 +227,8 @@ func (e *MeasurementElem) SeriesIDs() []uint32 { // UnmarshalBinary unmarshals data into e. func (e *MeasurementElem) UnmarshalBinary(data []byte) error { + start := len(data) + // Parse flag data. e.Flag, data = data[0], data[1:] @@ -202,7 +243,10 @@ func (e *MeasurementElem) UnmarshalBinary(data []byte) error { // Parse series data. v, n := binary.Uvarint(data) e.Series.N, data = uint32(v), data[n:] - e.Series.Data = data[:e.Series.N*SeriesIDSize] + e.Series.Data, data = data[:e.Series.N*SeriesIDSize], data[e.Series.N*SeriesIDSize:] + + // Save length of elem. + e.Size = start - len(data) return nil } @@ -242,6 +286,29 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { return n, err } + // Sort names. + names := make([]string, 0, len(mw.mms)) + for name := range mw.mms { + names = append(names, name) + } + sort.Strings(names) + + // Encode key list. + for _, name := range names { + // Retrieve measurement and save offset. + mm := mw.mms[name] + mm.offset = n + mw.mms[name] = mm + + // Write measurement + if err := mw.writeMeasurementTo(w, []byte(name), &mm, &n); err != nil { + return n, err + } + } + + // Save starting offset of hash index. + hoff := n + // Build key hash map m := rhh.NewHashMap(rhh.Options{ Capacity: len(mw.mms), @@ -252,35 +319,21 @@ func (mw *MeasurementBlockWriter) WriteTo(w io.Writer) (n int64, err error) { m.Put([]byte(name), &mm) } - // Encode key list. - offsets := make([]int64, m.Cap()) - for i := 0; i < m.Cap(); i++ { - k, v := m.Elem(i) - if v == nil { - continue - } - mm := v.(*measurement) - - // Save current offset so we can use it in the hash index. - offsets[i] = n - - // Write measurement - if err := mw.writeMeasurementTo(w, k, mm, &n); err != nil { - return n, err - } - } - - // Save starting offset of hash index. - hoff := n - // Encode hash map length. if err := writeUint32To(w, uint32(m.Cap()), &n); err != nil { return n, err } // Encode hash map offset entries. - for i := range offsets { - if err := writeUint64To(w, uint64(offsets[i]), &n); err != nil { + for i := 0; i < m.Cap(); i++ { + _, v := m.Elem(i) + + var offset int64 + if mm, ok := v.(*measurement); ok { + offset = mm.offset + } + + if err := writeUint64To(w, uint64(offset), &n); err != nil { return n, err } } @@ -351,6 +404,7 @@ type measurement struct { size int64 } seriesIDs []uint32 + offset int64 } func (mm measurement) flag() byte { diff --git a/tsdb/engine/tsi1/series.go b/tsdb/engine/tsi1/series.go index 8fa3b650c8..3ead70b429 100644 --- a/tsdb/engine/tsi1/series.go +++ b/tsdb/engine/tsi1/series.go @@ -41,6 +41,7 @@ const ( // SeriesList represents the section of the index which holds the term // dictionary and a sorted list of series keys. type SeriesList struct { + data []byte termData []byte seriesData []byte } @@ -75,18 +76,18 @@ func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) { } // EncodeSeries returns a dictionary-encoded series key. -func (l *SeriesList) EncodeSeries(name string, tags models.Tags) []byte { +func (l *SeriesList) EncodeSeries(name []byte, tags models.Tags) []byte { // Build a buffer with the minimum space for the name, tag count, and tags. buf := make([]byte, 2+len(tags)) return l.AppendEncodeSeries(buf[:0], name, tags) } // AppendEncodeSeries appends an encoded series value to dst and returns the new slice. -func (l *SeriesList) AppendEncodeSeries(dst []byte, name string, tags models.Tags) []byte { +func (l *SeriesList) AppendEncodeSeries(dst []byte, name []byte, tags models.Tags) []byte { var buf [binary.MaxVarintLen32]byte // Append encoded name. - n := binary.PutUvarint(buf[:], uint64(l.EncodeTerm([]byte(name)))) + n := binary.PutUvarint(buf[:], uint64(l.EncodeTerm(name))) dst = append(dst, buf[:n]...) // Append encoded tag count. @@ -105,16 +106,32 @@ func (l *SeriesList) AppendEncodeSeries(dst []byte, name string, tags models.Tag return dst } +// DecodeSeriesAt decodes the series at a given offset. +func (l *SeriesList) DecodeSeriesAt(offset uint32, name *[]byte, tags *models.Tags, deleted *bool) { + data := l.data[offset:] + + // Read flag. + flag, data := data[0], data[1:] + *deleted = (flag & SeriesTombstoneFlag) != 0 + + l.DecodeSeries(data, name, tags) +} + // DecodeSeries decodes a dictionary encoded series into a name and tagset. -func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) { +func (l *SeriesList) DecodeSeries(v []byte, name *[]byte, tags *models.Tags) { // Read name. offset, n := binary.Uvarint(v) - name, v = string(l.DecodeTerm(uint32(offset))), v[n:] + *name, v = l.DecodeTerm(uint32(offset)), v[n:] // Read tag count. tagN, n := binary.Uvarint(v) v = v[n:] + // Clear tags, if necessary. + if len(*tags) > 0 { + *tags = (*tags)[0:] + } + // Loop over tag key/values. for i := 0; i < int(tagN); i++ { // Read key. @@ -128,8 +145,6 @@ func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) { // Add to tagset. tags.Set(key, value) } - - return name, tags } // DecodeTerm returns the term at the given offset. @@ -184,6 +199,9 @@ func (l *SeriesList) SeriesCount() uint32 { func (l *SeriesList) UnmarshalBinary(data []byte) error { t := ReadSeriesListTrailer(data) + // Save entire block. + l.data = data + // Slice term list data. l.termData = data[t.TermList.Offset:] l.termData = l.termData[:t.TermList.Size] @@ -213,23 +231,23 @@ func NewSeriesListWriter() *SeriesListWriter { // Add adds a series to the writer's set. // Returns an ErrSeriesOverflow if no more series can be held in the writer. -func (sw *SeriesListWriter) Add(name string, tags models.Tags) error { +func (sw *SeriesListWriter) Add(name []byte, tags models.Tags) error { return sw.append(name, tags, false) } // Delete marks a series as tombstoned. -func (sw *SeriesListWriter) Delete(name string, tags models.Tags) error { +func (sw *SeriesListWriter) Delete(name []byte, tags models.Tags) error { return sw.append(name, tags, true) } -func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool) error { +func (sw *SeriesListWriter) append(name []byte, tags models.Tags, deleted bool) error { // Ensure writer doesn't add too many series. if len(sw.series) == math.MaxInt32 { return ErrSeriesOverflow } // Increment term counts. - sw.terms[name]++ + sw.terms[string(name)]++ for _, t := range tags { sw.terms[string(t.Key)]++ sw.terms[string(t.Value)]++ @@ -381,12 +399,12 @@ func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, t SeriesListTrailer, n * // Offset returns the series offset from the writer. // Only valid after the series list has been written to a writer. -func (sw *SeriesListWriter) Offset(name string, tags models.Tags) uint32 { +func (sw *SeriesListWriter) Offset(name []byte, tags models.Tags) uint32 { // Find position of series. i := sort.Search(len(sw.series), func(i int) bool { s := &sw.series[i] - if s.name != name { - return s.name >= name + if cmp := bytes.Compare(s.name, name); cmp != 0 { + return cmp != -1 } return models.CompareTags(s.tags, tags) != -1 }) @@ -394,7 +412,7 @@ func (sw *SeriesListWriter) Offset(name string, tags models.Tags) uint32 { // Ignore if it's not an exact match. if i >= len(sw.series) { return 0 - } else if s := &sw.series[i]; s.name != name || !s.tags.Equal(tags) { + } else if s := &sw.series[i]; !bytes.Equal(s.name, name) || !s.tags.Equal(tags) { return 0 } @@ -437,7 +455,7 @@ type SeriesListTrailer struct { } type serie struct { - name string + name []byte tags models.Tags deleted bool offset uint32 @@ -448,8 +466,8 @@ type series []serie func (a series) Len() int { return len(a) } func (a series) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a series) Less(i, j int) bool { - if a[i].name != a[j].name { - return a[i].name < a[j].name + if cmp := bytes.Compare(a[i].name, a[j].name); cmp != 0 { + return cmp == -1 } return models.CompareTags(a[i].tags, a[j].tags) == -1 } diff --git a/tsdb/engine/tsi1/series_test.go b/tsdb/engine/tsi1/series_test.go index b70cb33b0a..b37dcc4f10 100644 --- a/tsdb/engine/tsi1/series_test.go +++ b/tsdb/engine/tsi1/series_test.go @@ -13,9 +13,9 @@ import ( // Ensure series list can be unmarshaled. func TestSeriesList_UnmarshalBinary(t *testing.T) { if _, err := CreateSeriesList([]Series{ - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})}, - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})}, - {Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }); err != nil { t.Fatal(err) } @@ -24,9 +24,9 @@ func TestSeriesList_UnmarshalBinary(t *testing.T) { // Ensure series list contains the correct term count and term encoding. func TestSeriesList_Terms(t *testing.T) { l := MustCreateSeriesList([]Series{ - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})}, - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})}, - {Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, }) // Verify term count is correct. @@ -53,9 +53,9 @@ func TestSeriesList_Terms(t *testing.T) { // Ensure series list contains the correct set of series. func TestSeriesList_Series(t *testing.T) { series := []Series{ - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "east"})}, - {Name: "cpu", Tags: models.NewTags(map[string]string{"region": "west"})}, - {Name: "mem", Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + {Name: []byte("mem"), Tags: models.NewTags(map[string]string{"region": "east"})}, } l := MustCreateSeriesList(series) @@ -65,9 +65,11 @@ func TestSeriesList_Series(t *testing.T) { } // Ensure series can encode & decode correctly. + var name []byte + var tags models.Tags for _, series := range series { - name, tags := l.DecodeSeries(l.EncodeSeries(series.Name, series.Tags)) - if name != series.Name || !reflect.DeepEqual(tags, series.Tags) { + l.DecodeSeries(l.EncodeSeries(series.Name, series.Tags), &name, &tags) + if !bytes.Equal(name, series.Name) || !reflect.DeepEqual(tags, series.Tags) { t.Fatalf("encoding mismatch: got=%s/%#v, exp=%s/%#v", name, tags, series.Name, series.Tags) } } @@ -82,7 +84,7 @@ func TestSeriesList_Series(t *testing.T) { } // Verify non-existent series doesn't exist. - if offset, deleted := l.SeriesOffset(l.EncodeSeries("foo", models.NewTags(map[string]string{"region": "north"}))); offset != 0 { + if offset, deleted := l.SeriesOffset(l.EncodeSeries([]byte("foo"), models.NewTags(map[string]string{"region": "north"}))); offset != 0 { t.Fatalf("series should not exist: offset=%d", offset) } else if deleted { t.Fatalf("series should not be deleted") @@ -125,6 +127,6 @@ func MustCreateSeriesList(a []Series) *tsi1.SeriesList { // Series represents name/tagset pairs that are used in testing. type Series struct { - Name string + Name []byte Tags models.Tags } diff --git a/tsdb/engine/tsi1/term_list.go b/tsdb/engine/tsi1/term_list.go index 562dcfcbfa..f6f5197d62 100644 --- a/tsdb/engine/tsi1/term_list.go +++ b/tsdb/engine/tsi1/term_list.go @@ -60,13 +60,13 @@ func (l *TermList) OffsetString(v string) uint32 { } // AppendEncodedSeries dictionary encodes a series and appends it to the buffer. -func (l *TermList) AppendEncodedSeries(dst []byte, name string, tags models.Tags) []byte { +func (l *TermList) AppendEncodedSeries(dst []byte, name []byte, tags models.Tags) []byte { var buf [binary.MaxVarintLen32]byte // Encode name. - offset := l.OffsetString(name) + offset := l.Offset(name) if offset == 0 { - panic("name not in term list: " + name) + panic("name not in term list: " + string(name)) } n := binary.PutUvarint(buf[:], uint64(offset)) dst = append(dst, buf[:n]...) diff --git a/tsdb/engine/tsi1/tsi1.go b/tsdb/engine/tsi1/tsi1.go index 3648e8dead..97e6389393 100644 --- a/tsdb/engine/tsi1/tsi1.go +++ b/tsdb/engine/tsi1/tsi1.go @@ -7,8 +7,19 @@ import ( "os" "github.com/cespare/xxhash" + "github.com/influxdata/influxdb/models" ) +// MeasurementIterator represents a iterator over a list of measurements. +type MeasurementIterator interface { + Next() *MeasurementElem +} + +// SeriesIterator represents a iterator over a list of series. +type SeriesIterator interface { + Next(name *[]byte, tags *models.Tags, deleted *bool) +} + // writeTo writes write v into w. Updates n. func writeTo(w io.Writer, v []byte, n *int64) error { nn, err := w.Write(v) diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 703dd209ec..f9e4897236 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) +/* // Ensure engine can load the metadata index after reopening. func TestEngine_LoadMetadataIndex(t *testing.T) { e := MustOpenEngine() @@ -107,6 +108,7 @@ func TestEngine_LoadMetadataIndex(t *testing.T) { t.Fatalf("unexpected series: %q / %#v", s.Key, s.Tags) } } +*/ // Ensure that deletes only sent to the WAL will clear out the data from the cache on restart func TestEngine_DeleteWALLoadMetadata(t *testing.T) { @@ -571,7 +573,7 @@ func TestEngine_DeleteSeries(t *testing.T) { // Write those points to the engine. e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine) - e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index + // e.LoadMetadataIndex(1, MustNewDatabaseIndex("db0")) // Initialise an index // mock the planner so compactions don't run during the test e.CompactionPlan = &mockPlanner{} @@ -964,9 +966,9 @@ func MustOpenEngine() *Engine { if err := e.Open(); err != nil { panic(err) } - if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil { - panic(err) - } + // if err := e.LoadMetadataIndex(1, MustNewDatabaseIndex("db")); err != nil { + // panic(err) + // } return e } @@ -1010,6 +1012,7 @@ func (e *Engine) MustMeasurement(name string) *tsdb.Measurement { return m } +/* // MustNewDatabaseIndex creates a tsdb.DatabaseIndex, panicking if there is an // error doing do. func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex { @@ -1019,6 +1022,7 @@ func MustNewDatabaseIndex(name string) *tsdb.DatabaseIndex { } return index } +*/ // WritePointsString parses a string buffer and writes the points. func (e *Engine) WritePointsString(buf ...string) error { diff --git a/tsdb/index.go b/tsdb/index.go index eb884b7cb2..4a39267c40 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -9,9 +9,6 @@ import ( ) type Index interface { - Open() error - Close() error - CreateMeasurementIndexIfNotExists(name string) (*Measurement, error) Measurement(name []byte) (*Measurement, error) Measurements() (Measurements, error) @@ -20,7 +17,7 @@ type Index interface { MeasurementsByRegex(re *regexp.Regexp) (Measurements, error) DropMeasurement(name []byte) error - CreateSeriesIndexIfNotExists(measurment string, series *Series) (*Series, error) + CreateSeriesIndexIfNotExists(measurement string, series *Series) (*Series, error) Series(key []byte) (*Series, error) DropSeries(keys []string) error @@ -29,6 +26,4 @@ type Index interface { MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) TagsForSeries(key string) (models.Tags, error) - - Dereference(b []byte) } diff --git a/tsdb/meta.go b/tsdb/meta.go index 66fe1b76c0..2e8b498087 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -11,8 +11,6 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/escape" - "github.com/influxdata/influxdb/pkg/estimator" - "github.com/influxdata/influxdb/pkg/estimator/hll" internal "github.com/influxdata/influxdb/tsdb/internal" "github.com/gogo/protobuf/proto" @@ -20,6 +18,7 @@ import ( //go:generate protoc --gogo_out=. internal/meta.proto +/* // DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. // Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks. type DatabaseIndex struct { @@ -491,6 +490,7 @@ func (d *DatabaseIndex) Dereference(b []byte) { s.Dereference(b) } } +*/ // Measurement represents a collection of time series in a database. It also // contains in memory structures for indexing tags. Exported functions are @@ -601,7 +601,7 @@ func (m *Measurement) Cardinality(key string) int { return n } -// CardinalityBytes returns the number of values associated with the given tag key. +// CardinalityBytes returns the number of values associated with tag key func (m *Measurement) CardinalityBytes(key []byte) int { var n int m.mu.RLock() @@ -1435,7 +1435,7 @@ func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } // Swap implements sort.Interface. func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a Measurements) intersect(other Measurements) Measurements { +func (a Measurements) Intersect(other Measurements) Measurements { l := a r := other @@ -1465,7 +1465,7 @@ func (a Measurements) intersect(other Measurements) Measurements { return result } -func (a Measurements) union(other Measurements) Measurements { +func (a Measurements) Union(other Measurements) Measurements { result := make(Measurements, 0, len(a)+len(other)) var i, j int for i < len(a) && j < len(other) { @@ -1900,6 +1900,13 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs) return tagValues } +func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs { + m.mu.RLock() + ret := m.seriesByTagKeyValue[key] + m.mu.RUnlock() + return ret +} + // stringSet represents a set of strings. type stringSet map[string]struct{} diff --git a/tsdb/meta_test.go b/tsdb/meta_test.go index aeca64999d..3703886b00 100644 --- a/tsdb/meta_test.go +++ b/tsdb/meta_test.go @@ -233,6 +233,7 @@ func benchmarkMarshalTags(b *testing.B, keyN int) { } } +/* func BenchmarkCreateSeriesIndex_1K(b *testing.B) { benchmarkCreateSeriesIndex(b, genTestSeries(38, 3, 3)) } @@ -263,6 +264,7 @@ func benchmarkCreateSeriesIndex(b *testing.B, series []*TestSeries) { } } } +*/ type TestSeries struct { Measurement string diff --git a/tsdb/shard.go b/tsdb/shard.go index f1cff54dd2..16220c448a 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -259,18 +259,6 @@ func (s *Shard) Open() error { if err := e.Open(); err != nil { return err } - - // Load metadata index. - start := time.Now() - index, err := NewDatabaseIndex(s.database) - if err != nil { - return err - } - - if err := e.LoadMetadataIndex(s.id, index); err != nil { - return err - } - s.engine = e s.logger.Info(fmt.Sprintf("%s database index loaded in %s", s.path, time.Now().Sub(start)))