TagSet writer & reader.

pull/7913/head
Ben Johnson 2016-09-20 10:27:33 -06:00
parent 4eeb81ef38
commit e25d61e4bd
No known key found for this signature in database
GPG Key ID: 81741CD251883081
9 changed files with 976 additions and 162 deletions

View File

@ -6,33 +6,28 @@ import (
"github.com/influxdata/influxdb/pkg/murmur3"
)
// DefaultCapacity is the initial capacity of a HashMap.
const DefaultCapacity = 256
// LoadFactor is the percent fill before the map is grown.
const LoadFactor = 90
// HashMap represents a hash map that implements Robin Hood Hashing.
// https://cs.uwaterloo.ca/research/tr/1986/CS-86-14.pdf
type HashMap struct {
hashes []uint32
elems []hashElem
n int
capacity int
threshold int
mask uint32
n int
capacity int
threshold int
loadFactor int
}
func NewHashMap() *HashMap {
func NewHashMap(opt Options) *HashMap {
m := &HashMap{
capacity: DefaultCapacity,
capacity: opt.Capacity,
loadFactor: opt.LoadFactor,
}
m.alloc()
return m
}
func (m *HashMap) Get(key []byte) []byte {
func (m *HashMap) Get(key []byte) interface{} {
i := m.index(key)
if i == -1 {
return nil
@ -40,10 +35,10 @@ func (m *HashMap) Get(key []byte) []byte {
return m.elems[i].value
}
func (m *HashMap) Put(key, val []byte) {
func (m *HashMap) Put(key []byte, val interface{}) {
// Grow the map if we've run out of slots.
m.n++
if m.n >= m.threshold {
if m.n > m.threshold {
m.grow()
}
@ -54,8 +49,8 @@ func (m *HashMap) Put(key, val []byte) {
}
}
func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) {
pos := int(hash & m.mask)
func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten bool) {
pos := int(hash) % m.capacity
dist := 0
// Continue searching until we find an empty slot or lower probe distance.
@ -86,7 +81,7 @@ func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) {
}
// Increment position, wrap around on overflow.
pos = int(uint32(pos+1) & m.mask)
pos = (pos + 1) % m.capacity
dist++
}
}
@ -95,8 +90,7 @@ func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) {
func (m *HashMap) alloc() {
m.elems = make([]hashElem, m.capacity)
m.hashes = make([]uint32, m.capacity)
m.threshold = (m.capacity * LoadFactor) / 100
m.mask = uint32(m.capacity - 1)
m.threshold = (m.capacity * m.loadFactor) / 100
}
// grow doubles the capacity and reinserts all existing hashes & elements.
@ -122,7 +116,7 @@ func (m *HashMap) grow() {
// index returns the position of key in the hash map.
func (m *HashMap) index(key []byte) int {
hash := m.hashKey(key)
pos := int(hash & m.mask)
pos := int(hash) % m.capacity
dist := 0
for {
@ -134,7 +128,7 @@ func (m *HashMap) index(key []byte) int {
return pos
}
pos = int(uint32(pos+1) & m.mask)
pos = (pos + 1) % m.capacity
dist++
}
}
@ -149,7 +143,7 @@ func (m *HashMap) hashKey(key []byte) uint32 {
}
// Elem returns the i-th key/value pair of the hash map.
func (m *HashMap) Elem(i int) (key, value []byte) {
func (m *HashMap) Elem(i int) (key []byte, value interface{}) {
if i >= len(m.elems) {
return nil, nil
}
@ -179,11 +173,23 @@ func (m *HashMap) AverageProbeCount() float64 {
// dist returns the probe distance for a hash in a slot index.
func (m *HashMap) dist(hash uint32, i int) int {
return int(uint32(i+m.capacity-int(hash&m.mask)) & m.mask)
return (i + m.capacity - (int(hash) % m.capacity)) % m.capacity
}
type hashElem struct {
key []byte
value []byte
value interface{}
hash uint32
}
// Options represents initialization options that are passed to NewHashMap().
type Options struct {
Capacity int
LoadFactor int
}
// DefaultOptions represents a default set of options to pass to NewHashMap().
var DefaultOptions = Options{
Capacity: 256,
LoadFactor: 90,
}

View File

@ -12,7 +12,7 @@ import (
// Ensure hash map can perform basic get/put operations.
func TestHashMap(t *testing.T) {
m := rhh.NewHashMap()
m := rhh.NewHashMap(rhh.DefaultOptions)
m.Put([]byte("foo"), []byte("bar"))
m.Put([]byte("baz"), []byte("bat"))
@ -38,7 +38,7 @@ func TestHashMap_Quick(t *testing.T) {
}
if err := quick.Check(func(keys, values [][]byte) bool {
m := rhh.NewHashMap()
m := rhh.NewHashMap(rhh.Options{Capacity: 1000, LoadFactor: 100})
h := make(map[string][]byte)
// Insert all key/values into both maps.

View File

@ -1,5 +1,7 @@
/*
Series:
Series List
Term List
@ -54,5 +56,89 @@
Tag Sets:
Tag Set
Tag Values Block
...
Tag Keys Block
Trailer
Tag Values Block
Value List
Value
Flag <uint8>
len(Value) <varint>
Value <byte...>
len(Series) <varint>
SeriesIDs <uint32...>
...
Hash Index
len(Values) <uint32>
Value Offset <uint64>
...
Tag Key Block
Key List
Key
Flag <uint8>
Value Offset <uint64>
len(Key) <varint>
Key <byte...>
...
Hash Index
len(Keys) <uint32>
Key Offset <uint64>
...
Trailer
Hash Index Offset <uint64>
Tag Set Size <uint64>
Tag Set Version <uint16>
*/
package tsi1

View File

@ -6,7 +6,6 @@ import (
"errors"
"io"
"math"
"sort"
"github.com/influxdata/influxdb/models"
)
@ -20,6 +19,7 @@ const (
TermCountSize = 4
SeriesCountSize = 4
SeriesIDSize = 4
)
// Series flag constants.
@ -30,14 +30,14 @@ const (
// SeriesList represents the section of the index which holds the term
// dictionary and a sorted list of series keys.
type SeriesList struct {
termList []byte
termData []byte
seriesData []byte
}
// SeriesOffset returns offset of the encoded series key.
// Returns 0 if the key does not exist in the series list.
func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) {
offset = uint32(len(l.termList) + SeriesCountSize)
offset = uint32(len(l.termData) + SeriesCountSize)
data := l.seriesData[SeriesCountSize:]
for i, n := uint32(0), l.SeriesCount(); i < n; i++ {
@ -123,7 +123,7 @@ func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) {
// DecodeTerm returns the term at the given offset.
func (l *SeriesList) DecodeTerm(offset uint32) []byte {
buf := l.termList[offset:]
buf := l.termData[offset:]
// Read length at offset.
i, n := binary.Uvarint(buf)
@ -136,7 +136,7 @@ func (l *SeriesList) DecodeTerm(offset uint32) []byte {
// EncodeTerm returns the offset of v within data.
func (l *SeriesList) EncodeTerm(v []byte) uint32 {
offset := uint32(TermCountSize)
data := l.termList[offset:]
data := l.termData[offset:]
for i, n := uint32(0), l.TermCount(); i < n; i++ {
// Read term length.
@ -158,7 +158,7 @@ func (l *SeriesList) EncodeTerm(v []byte) uint32 {
// TermCount returns the number of terms within the dictionary.
func (l *SeriesList) TermCount() uint32 {
return binary.BigEndian.Uint32(l.termList[:TermCountSize])
return binary.BigEndian.Uint32(l.termData[:TermCountSize])
}
// SeriesCount returns the number of series.
@ -177,13 +177,13 @@ func (l *SeriesList) UnmarshalBinary(data []byte) error {
}
// Read trailer offsets.
termListOffset := binary.BigEndian.Uint32(data[len(data)-8:])
termDataOffset := binary.BigEndian.Uint32(data[len(data)-8:])
seriesDataOffset := binary.BigEndian.Uint32(data[len(data)-4:])
// Save reference to term list data.
termListSize := seriesDataOffset - termListOffset
l.termList = data[termListOffset:]
l.termList = l.termList[:termListSize]
termDataSize := seriesDataOffset - termDataOffset
l.termData = data[termDataOffset:]
l.termData = l.termData[:termDataSize]
// Save reference to series data.
seriesDataSize := uint32(len(data)) - seriesDataOffset - SeriesListTrailerSize
@ -238,11 +238,11 @@ func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool)
// WriteTo computes the dictionary encoding of the series and writes to w.
func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) {
terms := newTermList(sw.terms)
terms := NewTermList(sw.terms)
// Write term dictionary.
termListOffset := n
nn, err := sw.writeDictionaryTo(w, terms)
termDataOffset := n
nn, err := sw.writeTermListTo(w, terms)
n += nn
if err != nil {
return n, err
@ -257,7 +257,7 @@ func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) {
}
// Write trailer.
nn, err = sw.writeTrailerTo(w, uint32(termListOffset), uint32(seriesDataOffset))
nn, err = sw.writeTrailerTo(w, uint32(termDataOffset), uint32(seriesDataOffset))
n += nn
if err != nil {
return n, err
@ -266,12 +266,12 @@ func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) {
return n, nil
}
// writeDictionaryTo writes the terms to w.
func (sw *SeriesListWriter) writeDictionaryTo(w io.Writer, terms *termList) (n int64, err error) {
// writeTermListTo writes the terms to w.
func (sw *SeriesListWriter) writeTermListTo(w io.Writer, terms *TermList) (n int64, err error) {
buf := make([]byte, binary.MaxVarintLen32)
// Write term count.
binary.BigEndian.PutUint32(buf[:4], uint32(terms.len()))
binary.BigEndian.PutUint32(buf[:4], uint32(terms.Len()))
nn, err := w.Write(buf[:4])
n += int64(nn)
if err != nil {
@ -306,7 +306,7 @@ func (sw *SeriesListWriter) writeDictionaryTo(w io.Writer, terms *termList) (n i
}
// writeSeriesTo writes dictionary-encoded series to w in sorted order.
func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset uint32) (n int64, err error) {
func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *TermList, offset uint32) (n int64, err error) {
buf := make([]byte, binary.MaxVarintLen32+1)
// Write series count.
@ -331,7 +331,7 @@ func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset u
s.offset = uint32(offset + uint32(n))
// Write encoded series to a separate buffer.
seriesBuf = terms.appendEncodedSeries(seriesBuf[:0], s.name, s.tags)
seriesBuf = terms.AppendEncodedSeries(seriesBuf[:0], s.name, s.tags)
// Join flag, varint(length), & dictionary-encoded series in buffer.
buf[0] = 0 // TODO(benbjohnson): series tombstone
@ -350,9 +350,9 @@ func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset u
}
// writeTrailerTo writes offsets to the end of the series list.
func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termListOffset, seriesDataOffset uint32) (n int64, err error) {
func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termDataOffset, seriesDataOffset uint32) (n int64, err error) {
// Write offset of term list.
if err := binary.Write(w, binary.BigEndian, termListOffset); err != nil {
if err := binary.Write(w, binary.BigEndian, termDataOffset); err != nil {
return n, err
}
n += 4
@ -383,112 +383,3 @@ func (a series) Less(i, j int) bool {
}
panic("TODO: CompareTags(a[i].tags, a[j].tags)")
}
// termList represents a list of terms sorted by frequency.
type termList struct {
m map[string]int // terms by index
a []termListElem // sorted terms
}
// newTermList computes a term list based on a map of term frequency.
func newTermList(m map[string]int) *termList {
if len(m) == 0 {
return &termList{}
}
l := &termList{
a: make([]termListElem, 0, len(m)),
m: make(map[string]int, len(m)),
}
// Insert elements into slice.
for term, freq := range m {
l.a = append(l.a, termListElem{term: term, freq: freq})
}
sort.Sort(termListElems(l.a))
// Create lookup of terms to indices.
for i, e := range l.a {
l.m[e.term] = i
}
return l
}
// len returns the length of the list.
func (l *termList) len() int { return len(l.a) }
// offset returns the offset for a given term. Returns zero if term doesn't exist.
func (l *termList) offset(v []byte) uint32 {
i, ok := l.m[string(v)]
if !ok {
return 0
}
return l.a[i].offset
}
// offsetString returns the offset for a given term. Returns zero if term doesn't exist.
func (l *termList) offsetString(v string) uint32 {
i, ok := l.m[v]
if !ok {
return 0
}
return l.a[i].offset
}
// appendEncodedSeries dictionary encodes a series and appends it to the buffer.
func (l *termList) appendEncodedSeries(dst []byte, name string, tags models.Tags) []byte {
var buf [binary.MaxVarintLen32]byte
// Encode name.
offset := l.offsetString(name)
if offset == 0 {
panic("name not in term list: " + name)
}
n := binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
// Encode tag count.
n = binary.PutUvarint(buf[:], uint64(len(tags)))
dst = append(dst, buf[:n]...)
// Encode tags.
for _, t := range tags {
// Encode tag key.
offset := l.offset(t.Key)
if offset == 0 {
panic("tag key not in term list: " + string(t.Key))
}
n := binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
// Encode tag value.
offset = l.offset(t.Value)
if offset == 0 {
panic("tag value not in term list: " + string(t.Value))
}
n = binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
}
return dst
}
// termListElem represents an element in a term list.
type termListElem struct {
term string // term value
freq int // term frequency
offset uint32 // position in file
}
// termListElems represents a list of elements sorted by descending frequency.
type termListElems []termListElem
func (a termListElems) Len() int { return len(a) }
func (a termListElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a termListElems) Less(i, j int) bool {
if a[i].freq != a[j].freq {
return a[i].freq > a[i].freq
}
return a[i].term < a[j].term
}

View File

@ -1,5 +0,0 @@
package tsi1
type TagSet struct{}
type TagValueBlock struct{}
type TagKeyBlock struct{}

539
tsdb/engine/tsi1/tag_set.go Normal file
View File

@ -0,0 +1,539 @@
package tsi1
import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/influxdata/influxdb/pkg/murmur3"
"github.com/influxdata/influxdb/pkg/rhh"
)
// TagSetVersion is the version of the tag set block.
const TagSetVersion = 1
// Tag key flag constants.
const (
TagKeyTombstoneFlag = 0x01
)
// Tag value flag constants.
const (
TagValueTombstoneFlag = 0x01
)
// TagSet variable size constants.
const (
// TagSet trailer fields
TagSetVersionSize = 2
TagSetSize = 8
TagSetHashOffsetSize = 8
TagSetTrailerSize = TagSetVersionSize + TagSetSize + TagSetHashOffsetSize
// TagSet key block fields.
TagKeyNSize = 4
TagKeyOffsetSize = 8
// TagSet value block fields.
TagValueNSize = 4
TagValueOffsetSize = 8
)
// TagSet errors.
var (
ErrUnsupportedTagSetVersion = errors.New("unsupported tag set version")
ErrTagSetSizeMismatch = errors.New("tag set size mismatch")
)
// TagSet represents tag key/value data for a single measurement.
type TagSet struct {
data []byte
hashData []byte
hoff uint64 // hash index offset
version int // tag set version
}
// Verison returns the encoding version parsed from the data.
// Only valid after UnmarshalBinary() has been successfully invoked.
func (ts *TagSet) Version() int { return ts.version }
// TagValueSeriesN returns the number of series ids associated with a tag value.
func (ts *TagSet) TagValueSeriesN(key, value []byte) int {
velem := ts.tagValueElem(key, value)
if velem.value == nil {
return 0
}
return int(velem.seriesN)
}
// TagValueSeriesIDs returns the series IDs associated with a tag value.
func (ts *TagSet) TagValueSeriesIDs(key, value []byte) []uint32 {
// Find value element.
velem := ts.tagValueElem(key, value)
if velem.value == nil {
return nil
}
// Build slice of series ids.
a := make([]uint32, velem.seriesN)
for i := range a {
a[i] = velem.seriesID(i)
}
return a
}
// tagKeyElem returns an element for a tag key.
// Returns an element with a nil key if not found.
func (ts *TagSet) tagKeyElem(key []byte) tagKeyElem {
keyN := binary.BigEndian.Uint32(ts.hashData[:TagKeyNSize])
hash := hashKey(key)
pos := int(hash) % int(keyN)
// Track current distance
var d int
for {
// Find offset of tag key.
offset := binary.BigEndian.Uint64(ts.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
// Parse into element.
var e tagKeyElem
e.UnmarshalBinary(ts.data[offset:])
// Return if keys match.
if bytes.Equal(e.key, key) {
return e
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.key), pos, int(keyN)) {
return tagKeyElem{}
}
// Move position forward.
pos = (pos + 1) % int(keyN)
d++
}
}
// tagValueElem returns an element for a tag value.
// Returns an element with a nil value if not found.
func (ts *TagSet) tagValueElem(key, value []byte) tagValueElem {
// Find key element, exit if not found.
kelem := ts.tagKeyElem(key)
if kelem.key == nil {
return tagValueElem{}
}
hashData := ts.data[kelem.valueOffset:]
valueN := binary.BigEndian.Uint32(hashData[:TagValueNSize])
hash := hashKey(value)
pos := int(hash) % int(valueN)
// Track current distance
var d int
for {
// Find offset of tag value.
offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):])
// Parse into element.
var e tagValueElem
e.UnmarshalBinary(ts.data[offset:])
// Return if values match.
if bytes.Equal(e.value, value) {
return e
}
// Check if we've exceeded the probe distance.
if d > dist(hashKey(e.value), pos, int(valueN)) {
return tagValueElem{}
}
// Move position forward.
pos = (pos + 1) % int(valueN)
d++
}
}
// UnmarshalBinary unpacks data into the tag set. Tag set is not copied so data
// should be retained and unchanged after being passed into this function.
func (ts *TagSet) UnmarshalBinary(data []byte) error {
// Parse version.
if len(data) < TagSetVersion {
return io.ErrShortBuffer
}
versionOffset := len(data) - TagSetVersionSize
ts.version = int(binary.BigEndian.Uint16(data[versionOffset:]))
// Ensure version matches.
if ts.version != TagSetVersion {
return ErrUnsupportedTagSetVersion
}
// Parse size & validate.
szOffset := versionOffset - TagSetSize
sz := binary.BigEndian.Uint64(data[szOffset:])
if uint64(len(data)) != sz+TagSetTrailerSize {
return ErrTagSetSizeMismatch
}
// Parse hash index offset.
hoffOffset := szOffset - TagSetHashOffsetSize
hoff := binary.BigEndian.Uint64(data[hoffOffset:])
// Save data block & hash block.
ts.data = data[:hoff]
ts.hashData = data[hoff:hoffOffset]
return nil
}
// tagKeyElem represents an intenral tag key element.
type tagKeyElem struct {
flag byte
key []byte
valueOffset uint64
}
// UnmarshalBinary unmarshals data into e.
func (e *tagKeyElem) UnmarshalBinary(data []byte) {
// Parse flag data.
e.flag, data = data[0], data[1:]
// Parse value offset.
e.valueOffset, data = binary.BigEndian.Uint64(data), data[8:]
// Parse key.
sz, n := binary.Uvarint(data)
data = data[n:]
e.key = data[:sz]
}
// tagValueElem represents an intenral tag value element.
type tagValueElem struct {
flag byte
value []byte
seriesN uint64
seriesData []byte
}
// seriesID returns series ID at an index.
func (e *tagValueElem) seriesID(i int) uint32 {
return binary.BigEndian.Uint32(e.seriesData[i*SeriesIDSize:])
}
// UnmarshalBinary unmarshals data into e.
func (e *tagValueElem) UnmarshalBinary(data []byte) {
// Parse flag data.
e.flag, data = data[0], data[1:]
// Parse value.
sz, n := binary.Uvarint(data)
e.value, data = data[n:n+int(sz)], data[n+int(sz):]
// Parse series count.
e.seriesN, n = binary.Uvarint(data)
data = data[n:]
// Save reference to series data.
e.seriesData = data[:e.seriesN*SeriesIDSize]
}
// TagSetWriter writes a TagSet section.
type TagSetWriter struct {
sets map[string]tagSet
// Starting offset of the writer.
Offset int64
}
// NewTagSetWriter returns a new TagSetWriter.
func NewTagSetWriter() *TagSetWriter {
return &TagSetWriter{
sets: make(map[string]tagSet),
}
}
// AddTagValueSeries associates series id with a tag/value pair.
func (tsw *TagSetWriter) AddTagValueSeries(key, value []byte, seriesID uint32) {
ts := tsw.createTagSetIfNotExists(key)
tv := ts.values[string(value)]
tv.seriesIDs = append(tv.seriesIDs, seriesID)
ts.values[string(value)] = tv
}
// AddSeries associates series id with a map of key/value pairs.
// This is not optimized and is only provided for ease of use.
func (tsw *TagSetWriter) AddSeries(m map[string]string, seriesID uint32) {
for k, v := range m {
tsw.AddTagValueSeries([]byte(k), []byte(v), seriesID)
}
}
// DeleteTag marks a tag key as tombstoned.
// The tag must not be used after deleting it.
func (tsw *TagSetWriter) DeleteTag(key []byte) {
ts := tsw.sets[string(key)]
ts.deleted = true
tsw.sets[string(key)] = ts
}
// DeleteTagValue marks a tag value as tombstoned.
func (tsw *TagSetWriter) DeleteTagValue(key, value []byte) {
ts := tsw.createTagSetIfNotExists(key)
tv := ts.values[string(value)]
tv.deleted = true
ts.values[string(value)] = tv
}
// createTagSetIfNotExists returns the tag set for a key.
func (tsw *TagSetWriter) createTagSetIfNotExists(key []byte) tagSet {
ts, ok := tsw.sets[string(key)]
if ok {
return ts
}
ts = tagSet{values: make(map[string]tagValue)}
tsw.sets[string(key)] = ts
return ts
}
// WriteTo encodes the tag values & tag key blocks.
func (tsw *TagSetWriter) WriteTo(w io.Writer) (n int64, err error) {
// Build key hash map with an exact capacity.
m := rhh.NewHashMap(rhh.Options{
Capacity: len(tsw.sets),
LoadFactor: 100,
})
for key := range tsw.sets {
ts := tsw.sets[key]
m.Put([]byte(key), &ts)
}
// Write value blocks in key map order.
for i := 0; i < m.Len(); i++ {
k, v := m.Elem(i)
if k == nil {
panic("rhh nil key")
}
ts := v.(*tagSet)
// Write value block.
hoff, err := tsw.writeTagValueBlockTo(w, ts.values, &n)
if err != nil {
return n, err
}
// Save offset of hash index so we can use it in the key block.
ts.offset = uint64(hoff)
}
// Write key block to point to value blocks.
hoff, err := tsw.writeTagKeyBlockTo(w, m, &n)
if err != nil {
return n, err
}
// Write trailer.
err = tsw.writeTrailerTo(w, hoff, &n)
if err != nil {
return n, err
}
return n, nil
}
// writeTagValueBlockTo encodes values from a tag set into w.
// Returns the offset of the hash index (hoff).
func (tsw *TagSetWriter) writeTagValueBlockTo(w io.Writer, values map[string]tagValue, n *int64) (hoff int64, err error) {
// Build RHH map from tag values.
m := rhh.NewHashMap(rhh.Options{
Capacity: len(values),
LoadFactor: 100,
})
for value, tv := range values {
m.Put([]byte(value), tv)
}
// Encode value list.
offsets := make([]int64, m.Len())
for i := 0; i < m.Len(); i++ {
k, v := m.Elem(i)
if k == nil {
panic("rhh nil key")
}
tv := v.(tagValue)
// Save current offset so we can use it in the hash index.
offsets[i] = *n
// Write value block.
if err := tsw.writeTagValueTo(w, k, tv, n); err != nil {
return hoff, err
}
}
// Save starting offset of hash index.
hoff = *n
// Encode hash map length.
if err := writeUint32To(w, uint32(m.Len()), n); err != nil {
return hoff, err
}
// Encode hash map offset entries.
for i := range offsets {
if err := writeUint64To(w, uint64(offsets[i]), n); err != nil {
return hoff, err
}
}
return hoff, nil
}
// writeTagValueTo encodes a single tag value entry into w.
func (tsw *TagSetWriter) writeTagValueTo(w io.Writer, v []byte, tv tagValue, n *int64) error {
// Write flag.
if err := writeUint8To(w, tv.flag(), n); err != nil {
return err
}
// Write value.
if err := writeUvarintTo(w, uint64(len(v)), n); err != nil {
return err
} else if err := writeTo(w, v, n); err != nil {
return err
}
// Write series count.
if err := writeUvarintTo(w, uint64(len(tv.seriesIDs)), n); err != nil {
return err
}
// Write series ids.
for _, seriesID := range tv.seriesIDs {
if err := writeUint32To(w, seriesID, n); err != nil {
return err
}
}
return nil
}
// writeTagKeyBlockTo encodes keys from a tag set into w.
func (tsw *TagSetWriter) writeTagKeyBlockTo(w io.Writer, m *rhh.HashMap, n *int64) (hoff int64, err error) {
// Encode key list.
offsets := make([]int64, m.Len())
for i := 0; i < m.Len(); i++ {
k, v := m.Elem(i)
if k == nil {
panic("rhh nil key")
}
ts := v.(*tagSet)
// Save current offset so we can use it in the hash index.
offsets[i] = *n
// Write key entry.
if err := tsw.writeTagKeyTo(w, k, ts, n); err != nil {
return hoff, err
}
}
// Save starting offset of hash index.
hoff = *n
// Encode hash map length.
if err := writeUint32To(w, uint32(m.Len()), n); err != nil {
return hoff, err
}
// Encode hash map offset entries.
for i := range offsets {
if err := writeUint64To(w, uint64(offsets[i]), n); err != nil {
return hoff, err
}
}
return hoff, nil
}
// writeTagKeyTo encodes a single tag key entry into w.
func (tsw *TagSetWriter) writeTagKeyTo(w io.Writer, k []byte, ts *tagSet, n *int64) error {
if err := writeUint8To(w, ts.flag(), n); err != nil {
return err
}
if err := writeUint64To(w, ts.offset, n); err != nil {
return err
}
if err := writeUvarintTo(w, uint64(len(k)), n); err != nil {
return err
}
if err := writeTo(w, k, n); err != nil {
return err
}
return nil
}
// writeTrailerTo encodes the trailer containing sizes and offsets to w.
func (tsw *TagSetWriter) writeTrailerTo(w io.Writer, hoff int64, n *int64) error {
// Save current size of the write.
sz := *n
// Write hash index offset, total size, and v
if err := writeUint64To(w, uint64(hoff), n); err != nil {
return err
}
if err := writeUint64To(w, uint64(sz), n); err != nil {
return err
}
if err := writeUint16To(w, TagSetVersion, n); err != nil {
return err
}
return nil
}
type tagSet struct {
values map[string]tagValue
deleted bool
offset uint64
}
func (ts tagSet) flag() byte {
var flag byte
if ts.deleted {
flag |= TagKeyTombstoneFlag
}
return flag
}
type tagValue struct {
seriesIDs []uint32
deleted bool
}
func (tv tagValue) flag() byte {
var flag byte
if tv.deleted {
flag |= TagValueTombstoneFlag
}
return flag
}
// hashKey hashes a key using murmur3.
func hashKey(key []byte) uint32 {
h := murmur3.Sum32(key)
if h == 0 {
h = 1
}
return h
}
// dist returns the probe distance for a hash in a slot index.
func dist(hash uint32, i, capacity int) int {
return (i + capacity - (int(hash) % capacity)) % capacity
}

View File

@ -0,0 +1,106 @@
package tsi1_test
import (
"bytes"
"reflect"
"strconv"
"testing"
"github.com/influxdata/influxdb/tsdb/engine/tsi1"
)
// Ensure tag sets can be written and opened.
func TestTagSetWriter(t *testing.T) {
// Write 3 series to writer.
tsw := tsi1.NewTagSetWriter()
tsw.AddSeries(map[string]string{"region": "us-east", "host": "server0"}, 1)
tsw.AddSeries(map[string]string{"region": "us-east", "host": "server1"}, 2)
tsw.AddSeries(map[string]string{"region": "us-west", "host": "server2"}, 3)
// Encode into buffer.
var buf bytes.Buffer
if n, err := tsw.WriteTo(&buf); err != nil {
t.Fatal(err)
} else if n == 0 {
t.Fatal("expected bytes written")
}
// Unmarshal into a TagSet.
var ts tsi1.TagSet
if err := ts.UnmarshalBinary(buf.Bytes()); err != nil {
t.Fatal(err)
}
// Verify data in TagSet.
if a := ts.TagValueSeriesIDs([]byte("region"), []byte("us-east")); !reflect.DeepEqual(a, []uint32{1, 2}) {
t.Fatalf("unexpected series ids: %#v", a)
} else if a := ts.TagValueSeriesIDs([]byte("region"), []byte("us-west")); !reflect.DeepEqual(a, []uint32{3}) {
t.Fatalf("unexpected series ids: %#v", a)
}
if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server0")); !reflect.DeepEqual(a, []uint32{1}) {
t.Fatalf("unexpected series ids: %#v", a)
} else if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server1")); !reflect.DeepEqual(a, []uint32{2}) {
t.Fatalf("unexpected series ids: %#v", a)
} else if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server2")); !reflect.DeepEqual(a, []uint32{3}) {
t.Fatalf("unexpected series ids: %#v", a)
}
}
var benchmarkTagSet10x1000 *tsi1.TagSet
var benchmarkTagSet100x1000 *tsi1.TagSet
var benchmarkTagSet1000x1000 *tsi1.TagSet
func BenchmarkTagSet_SeriesN_10_1000(b *testing.B) {
benchmarkTagSet_SeriesN(b, 10, 1000, &benchmarkTagSet10x1000)
}
func BenchmarkTagSet_SeriesN_100_1000(b *testing.B) {
benchmarkTagSet_SeriesN(b, 100, 1000, &benchmarkTagSet100x1000)
}
func BenchmarkTagSet_SeriesN_1000_1000(b *testing.B) {
benchmarkTagSet_SeriesN(b, 1000, 1000, &benchmarkTagSet1000x1000)
}
func benchmarkTagSet_SeriesN(b *testing.B, tagN, valueN int, ts **tsi1.TagSet) {
if (*ts) == nil {
tsw := tsi1.NewTagSetWriter()
// Write tagset block.
var seriesID uint32
var kbuf, vbuf [20]byte
for i := 0; i < tagN; i++ {
for j := 0; j < valueN; j++ {
k := strconv.AppendInt(kbuf[:0], int64(i), 10)
v := strconv.AppendInt(vbuf[:0], int64(j), 10)
tsw.AddTagValueSeries(k, v, seriesID)
seriesID++
}
}
tsw.AddSeries(map[string]string{"region": "us-east", "host": "server0"}, 1)
tsw.AddSeries(map[string]string{"region": "us-east", "host": "server1"}, 2)
tsw.AddSeries(map[string]string{"region": "us-west", "host": "server2"}, 3)
// Encode into buffer.
var buf bytes.Buffer
if _, err := tsw.WriteTo(&buf); err != nil {
b.Fatal(err)
}
b.Log("size", buf.Len())
// Unmarshal into a TagSet.
*ts = &tsi1.TagSet{}
if err := (*ts).UnmarshalBinary(buf.Bytes()); err != nil {
b.Fatal(err)
}
}
// Benchmark lookups.
b.ReportAllocs()
b.ResetTimer()
key, value := []byte("0"), []byte("0")
for i := 0; i < b.N; i++ {
if n := (*ts).TagValueSeriesN(key, value); n != 1 {
b.Fatalf("unexpected series count: %d", n)
}
}
}

View File

@ -0,0 +1,117 @@
package tsi1
import (
"encoding/binary"
"sort"
"github.com/influxdata/influxdb/models"
)
// TermList represents a list of terms sorted by frequency.
type TermList struct {
m map[string]int // terms by index
a []termListElem // sorted terms
}
// NewTermList computes a term list based on a map of term frequency.
func NewTermList(m map[string]int) *TermList {
if len(m) == 0 {
return &TermList{}
}
l := &TermList{
a: make([]termListElem, 0, len(m)),
m: make(map[string]int, len(m)),
}
// Insert elements into slice.
for term, freq := range m {
l.a = append(l.a, termListElem{term: term, freq: freq})
}
sort.Sort(termListElems(l.a))
// Create lookup of terms to indices.
for i, e := range l.a {
l.m[e.term] = i
}
return l
}
// Len returns the length of the list.
func (l *TermList) Len() int { return len(l.a) }
// Offset returns the offset for a given term. Returns zero if term doesn't exist.
func (l *TermList) Offset(v []byte) uint32 {
i, ok := l.m[string(v)]
if !ok {
return 0
}
return l.a[i].offset
}
// OffsetString returns the offset for a given term. Returns zero if term doesn't exist.
func (l *TermList) OffsetString(v string) uint32 {
i, ok := l.m[v]
if !ok {
return 0
}
return l.a[i].offset
}
// AppendEncodedSeries dictionary encodes a series and appends it to the buffer.
func (l *TermList) AppendEncodedSeries(dst []byte, name string, tags models.Tags) []byte {
var buf [binary.MaxVarintLen32]byte
// Encode name.
offset := l.OffsetString(name)
if offset == 0 {
panic("name not in term list: " + name)
}
n := binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
// Encode tag count.
n = binary.PutUvarint(buf[:], uint64(len(tags)))
dst = append(dst, buf[:n]...)
// Encode tags.
for _, t := range tags {
// Encode tag key.
offset := l.Offset(t.Key)
if offset == 0 {
panic("tag key not in term list: " + string(t.Key))
}
n := binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
// Encode tag value.
offset = l.Offset(t.Value)
if offset == 0 {
panic("tag value not in term list: " + string(t.Value))
}
n = binary.PutUvarint(buf[:], uint64(offset))
dst = append(dst, buf[:n]...)
}
return dst
}
// termListElem represents an element in a term list.
type termListElem struct {
term string // term value
freq int // term frequency
offset uint32 // position in file
}
// termListElems represents a list of elements sorted by descending frequency.
type termListElems []termListElem
func (a termListElems) Len() int { return len(a) }
func (a termListElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a termListElems) Less(i, j int) bool {
if a[i].freq != a[j].freq {
return a[i].freq > a[i].freq
}
return a[i].term < a[j].term
}

74
tsdb/engine/tsi1/tsi1.go Normal file
View File

@ -0,0 +1,74 @@
package tsi1
import (
"encoding/binary"
"fmt"
"io"
"os"
)
// writeTo writes write v into w. Updates n.
func writeTo(w io.Writer, v []byte, n *int64) error {
nn, err := w.Write(v)
*n += int64(nn)
return err
}
// writeUint8To writes write v into w. Updates n.
func writeUint8To(w io.Writer, v uint8, n *int64) error {
nn, err := w.Write([]byte{v})
*n += int64(nn)
return err
}
// writeUint16To writes write v into w using big endian encoding. Updates n.
func writeUint16To(w io.Writer, v uint16, n *int64) error {
var buf [2]byte
binary.BigEndian.PutUint16(buf[:], v)
nn, err := w.Write(buf[:])
*n += int64(nn)
return err
}
// writeUint32To writes write v into w using big endian encoding. Updates n.
func writeUint32To(w io.Writer, v uint32, n *int64) error {
var buf [4]byte
binary.BigEndian.PutUint32(buf[:], v)
nn, err := w.Write(buf[:])
*n += int64(nn)
return err
}
// writeUint64To writes write v into w using big endian encoding. Updates n.
func writeUint64To(w io.Writer, v uint64, n *int64) error {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], v)
nn, err := w.Write(buf[:])
*n += int64(nn)
return err
}
// writeUvarintTo writes write v into w using variable length encoding. Updates n.
func writeUvarintTo(w io.Writer, v uint64, n *int64) error {
var buf [binary.MaxVarintLen64]byte
i := binary.PutUvarint(buf[:], v)
nn, err := w.Write(buf[:i])
*n += int64(nn)
return err
}
func Hexdump(data []byte) {
addr := 0
for len(data) > 0 {
n := len(data)
if n > 16 {
n = 16
}
fmt.Fprintf(os.Stderr, "%07x % x\n", addr, data[:n])
data = data[n:]
addr += n
}
fmt.Fprintln(os.Stderr, "")
}