diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index 74de27d978..418d607b0d 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -6,33 +6,28 @@ import ( "github.com/influxdata/influxdb/pkg/murmur3" ) -// DefaultCapacity is the initial capacity of a HashMap. -const DefaultCapacity = 256 - -// LoadFactor is the percent fill before the map is grown. -const LoadFactor = 90 - // HashMap represents a hash map that implements Robin Hood Hashing. // https://cs.uwaterloo.ca/research/tr/1986/CS-86-14.pdf type HashMap struct { hashes []uint32 elems []hashElem - n int - capacity int - threshold int - mask uint32 + n int + capacity int + threshold int + loadFactor int } -func NewHashMap() *HashMap { +func NewHashMap(opt Options) *HashMap { m := &HashMap{ - capacity: DefaultCapacity, + capacity: opt.Capacity, + loadFactor: opt.LoadFactor, } m.alloc() return m } -func (m *HashMap) Get(key []byte) []byte { +func (m *HashMap) Get(key []byte) interface{} { i := m.index(key) if i == -1 { return nil @@ -40,10 +35,10 @@ func (m *HashMap) Get(key []byte) []byte { return m.elems[i].value } -func (m *HashMap) Put(key, val []byte) { +func (m *HashMap) Put(key []byte, val interface{}) { // Grow the map if we've run out of slots. m.n++ - if m.n >= m.threshold { + if m.n > m.threshold { m.grow() } @@ -54,8 +49,8 @@ func (m *HashMap) Put(key, val []byte) { } } -func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) { - pos := int(hash & m.mask) +func (m *HashMap) insert(hash uint32, key []byte, val interface{}) (overwritten bool) { + pos := int(hash) % m.capacity dist := 0 // Continue searching until we find an empty slot or lower probe distance. @@ -86,7 +81,7 @@ func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) { } // Increment position, wrap around on overflow. - pos = int(uint32(pos+1) & m.mask) + pos = (pos + 1) % m.capacity dist++ } } @@ -95,8 +90,7 @@ func (m *HashMap) insert(hash uint32, key, val []byte) (overwritten bool) { func (m *HashMap) alloc() { m.elems = make([]hashElem, m.capacity) m.hashes = make([]uint32, m.capacity) - m.threshold = (m.capacity * LoadFactor) / 100 - m.mask = uint32(m.capacity - 1) + m.threshold = (m.capacity * m.loadFactor) / 100 } // grow doubles the capacity and reinserts all existing hashes & elements. @@ -122,7 +116,7 @@ func (m *HashMap) grow() { // index returns the position of key in the hash map. func (m *HashMap) index(key []byte) int { hash := m.hashKey(key) - pos := int(hash & m.mask) + pos := int(hash) % m.capacity dist := 0 for { @@ -134,7 +128,7 @@ func (m *HashMap) index(key []byte) int { return pos } - pos = int(uint32(pos+1) & m.mask) + pos = (pos + 1) % m.capacity dist++ } } @@ -149,7 +143,7 @@ func (m *HashMap) hashKey(key []byte) uint32 { } // Elem returns the i-th key/value pair of the hash map. -func (m *HashMap) Elem(i int) (key, value []byte) { +func (m *HashMap) Elem(i int) (key []byte, value interface{}) { if i >= len(m.elems) { return nil, nil } @@ -179,11 +173,23 @@ func (m *HashMap) AverageProbeCount() float64 { // dist returns the probe distance for a hash in a slot index. func (m *HashMap) dist(hash uint32, i int) int { - return int(uint32(i+m.capacity-int(hash&m.mask)) & m.mask) + return (i + m.capacity - (int(hash) % m.capacity)) % m.capacity } type hashElem struct { key []byte - value []byte + value interface{} hash uint32 } + +// Options represents initialization options that are passed to NewHashMap(). +type Options struct { + Capacity int + LoadFactor int +} + +// DefaultOptions represents a default set of options to pass to NewHashMap(). +var DefaultOptions = Options{ + Capacity: 256, + LoadFactor: 90, +} diff --git a/pkg/rhh/rhh_test.go b/pkg/rhh/rhh_test.go index 7c754dd251..f9a82ad277 100644 --- a/pkg/rhh/rhh_test.go +++ b/pkg/rhh/rhh_test.go @@ -12,7 +12,7 @@ import ( // Ensure hash map can perform basic get/put operations. func TestHashMap(t *testing.T) { - m := rhh.NewHashMap() + m := rhh.NewHashMap(rhh.DefaultOptions) m.Put([]byte("foo"), []byte("bar")) m.Put([]byte("baz"), []byte("bat")) @@ -38,7 +38,7 @@ func TestHashMap_Quick(t *testing.T) { } if err := quick.Check(func(keys, values [][]byte) bool { - m := rhh.NewHashMap() + m := rhh.NewHashMap(rhh.Options{Capacity: 1000, LoadFactor: 100}) h := make(map[string][]byte) // Insert all key/values into both maps. diff --git a/tsdb/engine/tsi1/doc.go b/tsdb/engine/tsi1/doc.go index d0396c8601..56c99cb845 100644 --- a/tsdb/engine/tsi1/doc.go +++ b/tsdb/engine/tsi1/doc.go @@ -1,5 +1,7 @@ /* +Series: + ╔══════Series List═════╗ ║ ┌───────────────────┐║ ║ │ Term List │║ @@ -54,5 +56,89 @@ ║ └─────────────────────────────┘ ║ ╚═════════════════════════════════╝ + +Tag Sets: + + ╔════════Tag Set═════════╗ + ║┌──────────────────────┐║ + ║│ Tag Values Block │║ + ║├──────────────────────┤║ + ║│ ... │║ + ║├──────────────────────┤║ + ║│ Tag Keys Block │║ + ║├──────────────────────┤║ + ║│ Trailer │║ + ║└──────────────────────┘║ + ╚════════════════════════╝ + + ╔═══════Tag Values Block═══════╗ + ║ ║ + ║ ┏━━━━━━━━Value List━━━━━━━━┓ ║ + ║ ┃ ┃ ║ + ║ ┃┏━━━━━━━━━Value━━━━━━━━━━┓┃ ║ + ║ ┃┃┌──────────────────────┐┃┃ ║ + ║ ┃┃│ Flag │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│ len(Value) │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│ Value │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│ len(Series) │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│SeriesIDs │┃┃ ║ + ║ ┃┃└──────────────────────┘┃┃ ║ + ║ ┃┗━━━━━━━━━━━━━━━━━━━━━━━━┛┃ ║ + ║ ┃ ... ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ║ ┏━━━━━━━━Hash Index━━━━━━━━┓ ║ + ║ ┃ ┌──────────────────────┐ ┃ ║ + ║ ┃ │ len(Values) │ ┃ ║ + ║ ┃ ├──────────────────────┤ ┃ ║ + ║ ┃ │Value Offset │ ┃ ║ + ║ ┃ ├──────────────────────┤ ┃ ║ + ║ ┃ │ ... │ ┃ ║ + ║ ┃ └──────────────────────┘ ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ╚══════════════════════════════╝ + + ╔════════Tag Key Block═════════╗ + ║ ║ + ║ ┏━━━━━━━━━Key List━━━━━━━━━┓ ║ + ║ ┃ ┃ ║ + ║ ┃┏━━━━━━━━━━Key━━━━━━━━━━━┓┃ ║ + ║ ┃┃┌──────────────────────┐┃┃ ║ + ║ ┃┃│ Flag │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│Value Offset │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│ len(Key) │┃┃ ║ + ║ ┃┃├──────────────────────┤┃┃ ║ + ║ ┃┃│ Key │┃┃ ║ + ║ ┃┃└──────────────────────┘┃┃ ║ + ║ ┃┗━━━━━━━━━━━━━━━━━━━━━━━━┛┃ ║ + ║ ┃ ... ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ║ ┏━━━━━━━━Hash Index━━━━━━━━┓ ║ + ║ ┃ ┌──────────────────────┐ ┃ ║ + ║ ┃ │ len(Keys) │ ┃ ║ + ║ ┃ ├──────────────────────┤ ┃ ║ + ║ ┃ │ Key Offset │ ┃ ║ + ║ ┃ ├──────────────────────┤ ┃ ║ + ║ ┃ │ ... │ ┃ ║ + ║ ┃ └──────────────────────┘ ┃ ║ + ║ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ║ + ╚══════════════════════════════╝ + + ╔════════════Trailer══════════════╗ + ║ ┌─────────────────────────────┐ ║ + ║ │ Hash Index Offset │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tag Set Size │ ║ + ║ ├─────────────────────────────┤ ║ + ║ │ Tag Set Version │ ║ + ║ └─────────────────────────────┘ ║ + ╚═════════════════════════════════╝ + + */ package tsi1 diff --git a/tsdb/engine/tsi1/series.go b/tsdb/engine/tsi1/series.go index 12048e78d7..392513deec 100644 --- a/tsdb/engine/tsi1/series.go +++ b/tsdb/engine/tsi1/series.go @@ -6,7 +6,6 @@ import ( "errors" "io" "math" - "sort" "github.com/influxdata/influxdb/models" ) @@ -20,6 +19,7 @@ const ( TermCountSize = 4 SeriesCountSize = 4 + SeriesIDSize = 4 ) // Series flag constants. @@ -30,14 +30,14 @@ const ( // SeriesList represents the section of the index which holds the term // dictionary and a sorted list of series keys. type SeriesList struct { - termList []byte + termData []byte seriesData []byte } // SeriesOffset returns offset of the encoded series key. // Returns 0 if the key does not exist in the series list. func (l *SeriesList) SeriesOffset(key []byte) (offset uint32, deleted bool) { - offset = uint32(len(l.termList) + SeriesCountSize) + offset = uint32(len(l.termData) + SeriesCountSize) data := l.seriesData[SeriesCountSize:] for i, n := uint32(0), l.SeriesCount(); i < n; i++ { @@ -123,7 +123,7 @@ func (l *SeriesList) DecodeSeries(v []byte) (name string, tags models.Tags) { // DecodeTerm returns the term at the given offset. func (l *SeriesList) DecodeTerm(offset uint32) []byte { - buf := l.termList[offset:] + buf := l.termData[offset:] // Read length at offset. i, n := binary.Uvarint(buf) @@ -136,7 +136,7 @@ func (l *SeriesList) DecodeTerm(offset uint32) []byte { // EncodeTerm returns the offset of v within data. func (l *SeriesList) EncodeTerm(v []byte) uint32 { offset := uint32(TermCountSize) - data := l.termList[offset:] + data := l.termData[offset:] for i, n := uint32(0), l.TermCount(); i < n; i++ { // Read term length. @@ -158,7 +158,7 @@ func (l *SeriesList) EncodeTerm(v []byte) uint32 { // TermCount returns the number of terms within the dictionary. func (l *SeriesList) TermCount() uint32 { - return binary.BigEndian.Uint32(l.termList[:TermCountSize]) + return binary.BigEndian.Uint32(l.termData[:TermCountSize]) } // SeriesCount returns the number of series. @@ -177,13 +177,13 @@ func (l *SeriesList) UnmarshalBinary(data []byte) error { } // Read trailer offsets. - termListOffset := binary.BigEndian.Uint32(data[len(data)-8:]) + termDataOffset := binary.BigEndian.Uint32(data[len(data)-8:]) seriesDataOffset := binary.BigEndian.Uint32(data[len(data)-4:]) // Save reference to term list data. - termListSize := seriesDataOffset - termListOffset - l.termList = data[termListOffset:] - l.termList = l.termList[:termListSize] + termDataSize := seriesDataOffset - termDataOffset + l.termData = data[termDataOffset:] + l.termData = l.termData[:termDataSize] // Save reference to series data. seriesDataSize := uint32(len(data)) - seriesDataOffset - SeriesListTrailerSize @@ -238,11 +238,11 @@ func (sw *SeriesListWriter) append(name string, tags models.Tags, deleted bool) // WriteTo computes the dictionary encoding of the series and writes to w. func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) { - terms := newTermList(sw.terms) + terms := NewTermList(sw.terms) // Write term dictionary. - termListOffset := n - nn, err := sw.writeDictionaryTo(w, terms) + termDataOffset := n + nn, err := sw.writeTermListTo(w, terms) n += nn if err != nil { return n, err @@ -257,7 +257,7 @@ func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) { } // Write trailer. - nn, err = sw.writeTrailerTo(w, uint32(termListOffset), uint32(seriesDataOffset)) + nn, err = sw.writeTrailerTo(w, uint32(termDataOffset), uint32(seriesDataOffset)) n += nn if err != nil { return n, err @@ -266,12 +266,12 @@ func (sw *SeriesListWriter) WriteTo(w io.Writer) (n int64, err error) { return n, nil } -// writeDictionaryTo writes the terms to w. -func (sw *SeriesListWriter) writeDictionaryTo(w io.Writer, terms *termList) (n int64, err error) { +// writeTermListTo writes the terms to w. +func (sw *SeriesListWriter) writeTermListTo(w io.Writer, terms *TermList) (n int64, err error) { buf := make([]byte, binary.MaxVarintLen32) // Write term count. - binary.BigEndian.PutUint32(buf[:4], uint32(terms.len())) + binary.BigEndian.PutUint32(buf[:4], uint32(terms.Len())) nn, err := w.Write(buf[:4]) n += int64(nn) if err != nil { @@ -306,7 +306,7 @@ func (sw *SeriesListWriter) writeDictionaryTo(w io.Writer, terms *termList) (n i } // writeSeriesTo writes dictionary-encoded series to w in sorted order. -func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset uint32) (n int64, err error) { +func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *TermList, offset uint32) (n int64, err error) { buf := make([]byte, binary.MaxVarintLen32+1) // Write series count. @@ -331,7 +331,7 @@ func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset u s.offset = uint32(offset + uint32(n)) // Write encoded series to a separate buffer. - seriesBuf = terms.appendEncodedSeries(seriesBuf[:0], s.name, s.tags) + seriesBuf = terms.AppendEncodedSeries(seriesBuf[:0], s.name, s.tags) // Join flag, varint(length), & dictionary-encoded series in buffer. buf[0] = 0 // TODO(benbjohnson): series tombstone @@ -350,9 +350,9 @@ func (sw *SeriesListWriter) writeSeriesTo(w io.Writer, terms *termList, offset u } // writeTrailerTo writes offsets to the end of the series list. -func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termListOffset, seriesDataOffset uint32) (n int64, err error) { +func (sw *SeriesListWriter) writeTrailerTo(w io.Writer, termDataOffset, seriesDataOffset uint32) (n int64, err error) { // Write offset of term list. - if err := binary.Write(w, binary.BigEndian, termListOffset); err != nil { + if err := binary.Write(w, binary.BigEndian, termDataOffset); err != nil { return n, err } n += 4 @@ -383,112 +383,3 @@ func (a series) Less(i, j int) bool { } panic("TODO: CompareTags(a[i].tags, a[j].tags)") } - -// termList represents a list of terms sorted by frequency. -type termList struct { - m map[string]int // terms by index - a []termListElem // sorted terms -} - -// newTermList computes a term list based on a map of term frequency. -func newTermList(m map[string]int) *termList { - if len(m) == 0 { - return &termList{} - } - - l := &termList{ - a: make([]termListElem, 0, len(m)), - m: make(map[string]int, len(m)), - } - - // Insert elements into slice. - for term, freq := range m { - l.a = append(l.a, termListElem{term: term, freq: freq}) - } - sort.Sort(termListElems(l.a)) - - // Create lookup of terms to indices. - for i, e := range l.a { - l.m[e.term] = i - } - - return l -} - -// len returns the length of the list. -func (l *termList) len() int { return len(l.a) } - -// offset returns the offset for a given term. Returns zero if term doesn't exist. -func (l *termList) offset(v []byte) uint32 { - i, ok := l.m[string(v)] - if !ok { - return 0 - } - return l.a[i].offset -} - -// offsetString returns the offset for a given term. Returns zero if term doesn't exist. -func (l *termList) offsetString(v string) uint32 { - i, ok := l.m[v] - if !ok { - return 0 - } - return l.a[i].offset -} - -// appendEncodedSeries dictionary encodes a series and appends it to the buffer. -func (l *termList) appendEncodedSeries(dst []byte, name string, tags models.Tags) []byte { - var buf [binary.MaxVarintLen32]byte - - // Encode name. - offset := l.offsetString(name) - if offset == 0 { - panic("name not in term list: " + name) - } - n := binary.PutUvarint(buf[:], uint64(offset)) - dst = append(dst, buf[:n]...) - - // Encode tag count. - n = binary.PutUvarint(buf[:], uint64(len(tags))) - dst = append(dst, buf[:n]...) - - // Encode tags. - for _, t := range tags { - // Encode tag key. - offset := l.offset(t.Key) - if offset == 0 { - panic("tag key not in term list: " + string(t.Key)) - } - n := binary.PutUvarint(buf[:], uint64(offset)) - dst = append(dst, buf[:n]...) - - // Encode tag value. - offset = l.offset(t.Value) - if offset == 0 { - panic("tag value not in term list: " + string(t.Value)) - } - n = binary.PutUvarint(buf[:], uint64(offset)) - dst = append(dst, buf[:n]...) - } - - return dst -} - -// termListElem represents an element in a term list. -type termListElem struct { - term string // term value - freq int // term frequency - offset uint32 // position in file -} - -// termListElems represents a list of elements sorted by descending frequency. -type termListElems []termListElem - -func (a termListElems) Len() int { return len(a) } -func (a termListElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a termListElems) Less(i, j int) bool { - if a[i].freq != a[j].freq { - return a[i].freq > a[i].freq - } - return a[i].term < a[j].term -} diff --git a/tsdb/engine/tsi1/tag.go b/tsdb/engine/tsi1/tag.go deleted file mode 100644 index 92710d6d9e..0000000000 --- a/tsdb/engine/tsi1/tag.go +++ /dev/null @@ -1,5 +0,0 @@ -package tsi1 - -type TagSet struct{} -type TagValueBlock struct{} -type TagKeyBlock struct{} diff --git a/tsdb/engine/tsi1/tag_set.go b/tsdb/engine/tsi1/tag_set.go new file mode 100644 index 0000000000..312046c67b --- /dev/null +++ b/tsdb/engine/tsi1/tag_set.go @@ -0,0 +1,539 @@ +package tsi1 + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + + "github.com/influxdata/influxdb/pkg/murmur3" + "github.com/influxdata/influxdb/pkg/rhh" +) + +// TagSetVersion is the version of the tag set block. +const TagSetVersion = 1 + +// Tag key flag constants. +const ( + TagKeyTombstoneFlag = 0x01 +) + +// Tag value flag constants. +const ( + TagValueTombstoneFlag = 0x01 +) + +// TagSet variable size constants. +const ( + // TagSet trailer fields + TagSetVersionSize = 2 + TagSetSize = 8 + TagSetHashOffsetSize = 8 + TagSetTrailerSize = TagSetVersionSize + TagSetSize + TagSetHashOffsetSize + + // TagSet key block fields. + TagKeyNSize = 4 + TagKeyOffsetSize = 8 + + // TagSet value block fields. + TagValueNSize = 4 + TagValueOffsetSize = 8 +) + +// TagSet errors. +var ( + ErrUnsupportedTagSetVersion = errors.New("unsupported tag set version") + ErrTagSetSizeMismatch = errors.New("tag set size mismatch") +) + +// TagSet represents tag key/value data for a single measurement. +type TagSet struct { + data []byte + hashData []byte + + hoff uint64 // hash index offset + version int // tag set version +} + +// Verison returns the encoding version parsed from the data. +// Only valid after UnmarshalBinary() has been successfully invoked. +func (ts *TagSet) Version() int { return ts.version } + +// TagValueSeriesN returns the number of series ids associated with a tag value. +func (ts *TagSet) TagValueSeriesN(key, value []byte) int { + velem := ts.tagValueElem(key, value) + if velem.value == nil { + return 0 + } + return int(velem.seriesN) +} + +// TagValueSeriesIDs returns the series IDs associated with a tag value. +func (ts *TagSet) TagValueSeriesIDs(key, value []byte) []uint32 { + // Find value element. + velem := ts.tagValueElem(key, value) + if velem.value == nil { + return nil + } + + // Build slice of series ids. + a := make([]uint32, velem.seriesN) + for i := range a { + a[i] = velem.seriesID(i) + } + return a +} + +// tagKeyElem returns an element for a tag key. +// Returns an element with a nil key if not found. +func (ts *TagSet) tagKeyElem(key []byte) tagKeyElem { + keyN := binary.BigEndian.Uint32(ts.hashData[:TagKeyNSize]) + hash := hashKey(key) + pos := int(hash) % int(keyN) + + // Track current distance + var d int + + for { + // Find offset of tag key. + offset := binary.BigEndian.Uint64(ts.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):]) + + // Parse into element. + var e tagKeyElem + e.UnmarshalBinary(ts.data[offset:]) + + // Return if keys match. + if bytes.Equal(e.key, key) { + return e + } + + // Check if we've exceeded the probe distance. + if d > dist(hashKey(e.key), pos, int(keyN)) { + return tagKeyElem{} + } + + // Move position forward. + pos = (pos + 1) % int(keyN) + d++ + } +} + +// tagValueElem returns an element for a tag value. +// Returns an element with a nil value if not found. +func (ts *TagSet) tagValueElem(key, value []byte) tagValueElem { + // Find key element, exit if not found. + kelem := ts.tagKeyElem(key) + if kelem.key == nil { + return tagValueElem{} + } + + hashData := ts.data[kelem.valueOffset:] + valueN := binary.BigEndian.Uint32(hashData[:TagValueNSize]) + hash := hashKey(value) + pos := int(hash) % int(valueN) + + // Track current distance + var d int + + for { + // Find offset of tag value. + offset := binary.BigEndian.Uint64(hashData[TagValueNSize+(pos*TagValueOffsetSize):]) + + // Parse into element. + var e tagValueElem + e.UnmarshalBinary(ts.data[offset:]) + + // Return if values match. + if bytes.Equal(e.value, value) { + return e + } + + // Check if we've exceeded the probe distance. + if d > dist(hashKey(e.value), pos, int(valueN)) { + return tagValueElem{} + } + + // Move position forward. + pos = (pos + 1) % int(valueN) + d++ + } +} + +// UnmarshalBinary unpacks data into the tag set. Tag set is not copied so data +// should be retained and unchanged after being passed into this function. +func (ts *TagSet) UnmarshalBinary(data []byte) error { + // Parse version. + if len(data) < TagSetVersion { + return io.ErrShortBuffer + } + versionOffset := len(data) - TagSetVersionSize + ts.version = int(binary.BigEndian.Uint16(data[versionOffset:])) + + // Ensure version matches. + if ts.version != TagSetVersion { + return ErrUnsupportedTagSetVersion + } + + // Parse size & validate. + szOffset := versionOffset - TagSetSize + sz := binary.BigEndian.Uint64(data[szOffset:]) + if uint64(len(data)) != sz+TagSetTrailerSize { + return ErrTagSetSizeMismatch + } + + // Parse hash index offset. + hoffOffset := szOffset - TagSetHashOffsetSize + hoff := binary.BigEndian.Uint64(data[hoffOffset:]) + + // Save data block & hash block. + ts.data = data[:hoff] + ts.hashData = data[hoff:hoffOffset] + + return nil +} + +// tagKeyElem represents an intenral tag key element. +type tagKeyElem struct { + flag byte + key []byte + valueOffset uint64 +} + +// UnmarshalBinary unmarshals data into e. +func (e *tagKeyElem) UnmarshalBinary(data []byte) { + // Parse flag data. + e.flag, data = data[0], data[1:] + + // Parse value offset. + e.valueOffset, data = binary.BigEndian.Uint64(data), data[8:] + + // Parse key. + sz, n := binary.Uvarint(data) + data = data[n:] + e.key = data[:sz] +} + +// tagValueElem represents an intenral tag value element. +type tagValueElem struct { + flag byte + value []byte + seriesN uint64 + seriesData []byte +} + +// seriesID returns series ID at an index. +func (e *tagValueElem) seriesID(i int) uint32 { + return binary.BigEndian.Uint32(e.seriesData[i*SeriesIDSize:]) +} + +// UnmarshalBinary unmarshals data into e. +func (e *tagValueElem) UnmarshalBinary(data []byte) { + // Parse flag data. + e.flag, data = data[0], data[1:] + + // Parse value. + sz, n := binary.Uvarint(data) + e.value, data = data[n:n+int(sz)], data[n+int(sz):] + + // Parse series count. + e.seriesN, n = binary.Uvarint(data) + data = data[n:] + + // Save reference to series data. + e.seriesData = data[:e.seriesN*SeriesIDSize] +} + +// TagSetWriter writes a TagSet section. +type TagSetWriter struct { + sets map[string]tagSet + + // Starting offset of the writer. + Offset int64 +} + +// NewTagSetWriter returns a new TagSetWriter. +func NewTagSetWriter() *TagSetWriter { + return &TagSetWriter{ + sets: make(map[string]tagSet), + } +} + +// AddTagValueSeries associates series id with a tag/value pair. +func (tsw *TagSetWriter) AddTagValueSeries(key, value []byte, seriesID uint32) { + ts := tsw.createTagSetIfNotExists(key) + tv := ts.values[string(value)] + tv.seriesIDs = append(tv.seriesIDs, seriesID) + ts.values[string(value)] = tv +} + +// AddSeries associates series id with a map of key/value pairs. +// This is not optimized and is only provided for ease of use. +func (tsw *TagSetWriter) AddSeries(m map[string]string, seriesID uint32) { + for k, v := range m { + tsw.AddTagValueSeries([]byte(k), []byte(v), seriesID) + } +} + +// DeleteTag marks a tag key as tombstoned. +// The tag must not be used after deleting it. +func (tsw *TagSetWriter) DeleteTag(key []byte) { + ts := tsw.sets[string(key)] + ts.deleted = true + tsw.sets[string(key)] = ts +} + +// DeleteTagValue marks a tag value as tombstoned. +func (tsw *TagSetWriter) DeleteTagValue(key, value []byte) { + ts := tsw.createTagSetIfNotExists(key) + tv := ts.values[string(value)] + tv.deleted = true + ts.values[string(value)] = tv +} + +// createTagSetIfNotExists returns the tag set for a key. +func (tsw *TagSetWriter) createTagSetIfNotExists(key []byte) tagSet { + ts, ok := tsw.sets[string(key)] + if ok { + return ts + } + + ts = tagSet{values: make(map[string]tagValue)} + tsw.sets[string(key)] = ts + return ts +} + +// WriteTo encodes the tag values & tag key blocks. +func (tsw *TagSetWriter) WriteTo(w io.Writer) (n int64, err error) { + // Build key hash map with an exact capacity. + m := rhh.NewHashMap(rhh.Options{ + Capacity: len(tsw.sets), + LoadFactor: 100, + }) + for key := range tsw.sets { + ts := tsw.sets[key] + m.Put([]byte(key), &ts) + } + + // Write value blocks in key map order. + for i := 0; i < m.Len(); i++ { + k, v := m.Elem(i) + if k == nil { + panic("rhh nil key") + } + ts := v.(*tagSet) + + // Write value block. + hoff, err := tsw.writeTagValueBlockTo(w, ts.values, &n) + if err != nil { + return n, err + } + + // Save offset of hash index so we can use it in the key block. + ts.offset = uint64(hoff) + } + + // Write key block to point to value blocks. + hoff, err := tsw.writeTagKeyBlockTo(w, m, &n) + if err != nil { + return n, err + } + + // Write trailer. + err = tsw.writeTrailerTo(w, hoff, &n) + if err != nil { + return n, err + } + + return n, nil +} + +// writeTagValueBlockTo encodes values from a tag set into w. +// Returns the offset of the hash index (hoff). +func (tsw *TagSetWriter) writeTagValueBlockTo(w io.Writer, values map[string]tagValue, n *int64) (hoff int64, err error) { + // Build RHH map from tag values. + m := rhh.NewHashMap(rhh.Options{ + Capacity: len(values), + LoadFactor: 100, + }) + for value, tv := range values { + m.Put([]byte(value), tv) + } + + // Encode value list. + offsets := make([]int64, m.Len()) + for i := 0; i < m.Len(); i++ { + k, v := m.Elem(i) + if k == nil { + panic("rhh nil key") + } + tv := v.(tagValue) + + // Save current offset so we can use it in the hash index. + offsets[i] = *n + + // Write value block. + if err := tsw.writeTagValueTo(w, k, tv, n); err != nil { + return hoff, err + } + } + + // Save starting offset of hash index. + hoff = *n + + // Encode hash map length. + if err := writeUint32To(w, uint32(m.Len()), n); err != nil { + return hoff, err + } + + // Encode hash map offset entries. + for i := range offsets { + if err := writeUint64To(w, uint64(offsets[i]), n); err != nil { + return hoff, err + } + } + + return hoff, nil +} + +// writeTagValueTo encodes a single tag value entry into w. +func (tsw *TagSetWriter) writeTagValueTo(w io.Writer, v []byte, tv tagValue, n *int64) error { + // Write flag. + if err := writeUint8To(w, tv.flag(), n); err != nil { + return err + } + + // Write value. + if err := writeUvarintTo(w, uint64(len(v)), n); err != nil { + return err + } else if err := writeTo(w, v, n); err != nil { + return err + } + + // Write series count. + if err := writeUvarintTo(w, uint64(len(tv.seriesIDs)), n); err != nil { + return err + } + + // Write series ids. + for _, seriesID := range tv.seriesIDs { + if err := writeUint32To(w, seriesID, n); err != nil { + return err + } + } + + return nil +} + +// writeTagKeyBlockTo encodes keys from a tag set into w. +func (tsw *TagSetWriter) writeTagKeyBlockTo(w io.Writer, m *rhh.HashMap, n *int64) (hoff int64, err error) { + // Encode key list. + offsets := make([]int64, m.Len()) + for i := 0; i < m.Len(); i++ { + k, v := m.Elem(i) + if k == nil { + panic("rhh nil key") + } + ts := v.(*tagSet) + + // Save current offset so we can use it in the hash index. + offsets[i] = *n + + // Write key entry. + if err := tsw.writeTagKeyTo(w, k, ts, n); err != nil { + return hoff, err + } + } + + // Save starting offset of hash index. + hoff = *n + + // Encode hash map length. + if err := writeUint32To(w, uint32(m.Len()), n); err != nil { + return hoff, err + } + + // Encode hash map offset entries. + for i := range offsets { + if err := writeUint64To(w, uint64(offsets[i]), n); err != nil { + return hoff, err + } + } + + return hoff, nil +} + +// writeTagKeyTo encodes a single tag key entry into w. +func (tsw *TagSetWriter) writeTagKeyTo(w io.Writer, k []byte, ts *tagSet, n *int64) error { + if err := writeUint8To(w, ts.flag(), n); err != nil { + return err + } + if err := writeUint64To(w, ts.offset, n); err != nil { + return err + } + if err := writeUvarintTo(w, uint64(len(k)), n); err != nil { + return err + } + if err := writeTo(w, k, n); err != nil { + return err + } + return nil +} + +// writeTrailerTo encodes the trailer containing sizes and offsets to w. +func (tsw *TagSetWriter) writeTrailerTo(w io.Writer, hoff int64, n *int64) error { + // Save current size of the write. + sz := *n + + // Write hash index offset, total size, and v + if err := writeUint64To(w, uint64(hoff), n); err != nil { + return err + } + if err := writeUint64To(w, uint64(sz), n); err != nil { + return err + } + if err := writeUint16To(w, TagSetVersion, n); err != nil { + return err + } + return nil +} + +type tagSet struct { + values map[string]tagValue + deleted bool + offset uint64 +} + +func (ts tagSet) flag() byte { + var flag byte + if ts.deleted { + flag |= TagKeyTombstoneFlag + } + return flag +} + +type tagValue struct { + seriesIDs []uint32 + deleted bool +} + +func (tv tagValue) flag() byte { + var flag byte + if tv.deleted { + flag |= TagValueTombstoneFlag + } + return flag +} + +// hashKey hashes a key using murmur3. +func hashKey(key []byte) uint32 { + h := murmur3.Sum32(key) + if h == 0 { + h = 1 + } + return h +} + +// dist returns the probe distance for a hash in a slot index. +func dist(hash uint32, i, capacity int) int { + return (i + capacity - (int(hash) % capacity)) % capacity +} diff --git a/tsdb/engine/tsi1/tag_set_test.go b/tsdb/engine/tsi1/tag_set_test.go new file mode 100644 index 0000000000..7597f382cc --- /dev/null +++ b/tsdb/engine/tsi1/tag_set_test.go @@ -0,0 +1,106 @@ +package tsi1_test + +import ( + "bytes" + "reflect" + "strconv" + "testing" + + "github.com/influxdata/influxdb/tsdb/engine/tsi1" +) + +// Ensure tag sets can be written and opened. +func TestTagSetWriter(t *testing.T) { + // Write 3 series to writer. + tsw := tsi1.NewTagSetWriter() + tsw.AddSeries(map[string]string{"region": "us-east", "host": "server0"}, 1) + tsw.AddSeries(map[string]string{"region": "us-east", "host": "server1"}, 2) + tsw.AddSeries(map[string]string{"region": "us-west", "host": "server2"}, 3) + + // Encode into buffer. + var buf bytes.Buffer + if n, err := tsw.WriteTo(&buf); err != nil { + t.Fatal(err) + } else if n == 0 { + t.Fatal("expected bytes written") + } + + // Unmarshal into a TagSet. + var ts tsi1.TagSet + if err := ts.UnmarshalBinary(buf.Bytes()); err != nil { + t.Fatal(err) + } + + // Verify data in TagSet. + if a := ts.TagValueSeriesIDs([]byte("region"), []byte("us-east")); !reflect.DeepEqual(a, []uint32{1, 2}) { + t.Fatalf("unexpected series ids: %#v", a) + } else if a := ts.TagValueSeriesIDs([]byte("region"), []byte("us-west")); !reflect.DeepEqual(a, []uint32{3}) { + t.Fatalf("unexpected series ids: %#v", a) + } + if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server0")); !reflect.DeepEqual(a, []uint32{1}) { + t.Fatalf("unexpected series ids: %#v", a) + } else if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server1")); !reflect.DeepEqual(a, []uint32{2}) { + t.Fatalf("unexpected series ids: %#v", a) + } else if a := ts.TagValueSeriesIDs([]byte("host"), []byte("server2")); !reflect.DeepEqual(a, []uint32{3}) { + t.Fatalf("unexpected series ids: %#v", a) + } +} + +var benchmarkTagSet10x1000 *tsi1.TagSet +var benchmarkTagSet100x1000 *tsi1.TagSet +var benchmarkTagSet1000x1000 *tsi1.TagSet + +func BenchmarkTagSet_SeriesN_10_1000(b *testing.B) { + benchmarkTagSet_SeriesN(b, 10, 1000, &benchmarkTagSet10x1000) +} +func BenchmarkTagSet_SeriesN_100_1000(b *testing.B) { + benchmarkTagSet_SeriesN(b, 100, 1000, &benchmarkTagSet100x1000) +} +func BenchmarkTagSet_SeriesN_1000_1000(b *testing.B) { + benchmarkTagSet_SeriesN(b, 1000, 1000, &benchmarkTagSet1000x1000) +} + +func benchmarkTagSet_SeriesN(b *testing.B, tagN, valueN int, ts **tsi1.TagSet) { + if (*ts) == nil { + tsw := tsi1.NewTagSetWriter() + + // Write tagset block. + var seriesID uint32 + var kbuf, vbuf [20]byte + for i := 0; i < tagN; i++ { + for j := 0; j < valueN; j++ { + k := strconv.AppendInt(kbuf[:0], int64(i), 10) + v := strconv.AppendInt(vbuf[:0], int64(j), 10) + tsw.AddTagValueSeries(k, v, seriesID) + seriesID++ + } + } + tsw.AddSeries(map[string]string{"region": "us-east", "host": "server0"}, 1) + tsw.AddSeries(map[string]string{"region": "us-east", "host": "server1"}, 2) + tsw.AddSeries(map[string]string{"region": "us-west", "host": "server2"}, 3) + + // Encode into buffer. + var buf bytes.Buffer + if _, err := tsw.WriteTo(&buf); err != nil { + b.Fatal(err) + } + b.Log("size", buf.Len()) + + // Unmarshal into a TagSet. + *ts = &tsi1.TagSet{} + if err := (*ts).UnmarshalBinary(buf.Bytes()); err != nil { + b.Fatal(err) + } + } + + // Benchmark lookups. + b.ReportAllocs() + b.ResetTimer() + + key, value := []byte("0"), []byte("0") + for i := 0; i < b.N; i++ { + if n := (*ts).TagValueSeriesN(key, value); n != 1 { + b.Fatalf("unexpected series count: %d", n) + } + } +} diff --git a/tsdb/engine/tsi1/term_list.go b/tsdb/engine/tsi1/term_list.go new file mode 100644 index 0000000000..562dcfcbfa --- /dev/null +++ b/tsdb/engine/tsi1/term_list.go @@ -0,0 +1,117 @@ +package tsi1 + +import ( + "encoding/binary" + "sort" + + "github.com/influxdata/influxdb/models" +) + +// TermList represents a list of terms sorted by frequency. +type TermList struct { + m map[string]int // terms by index + a []termListElem // sorted terms +} + +// NewTermList computes a term list based on a map of term frequency. +func NewTermList(m map[string]int) *TermList { + if len(m) == 0 { + return &TermList{} + } + + l := &TermList{ + a: make([]termListElem, 0, len(m)), + m: make(map[string]int, len(m)), + } + + // Insert elements into slice. + for term, freq := range m { + l.a = append(l.a, termListElem{term: term, freq: freq}) + } + sort.Sort(termListElems(l.a)) + + // Create lookup of terms to indices. + for i, e := range l.a { + l.m[e.term] = i + } + + return l +} + +// Len returns the length of the list. +func (l *TermList) Len() int { return len(l.a) } + +// Offset returns the offset for a given term. Returns zero if term doesn't exist. +func (l *TermList) Offset(v []byte) uint32 { + i, ok := l.m[string(v)] + if !ok { + return 0 + } + return l.a[i].offset +} + +// OffsetString returns the offset for a given term. Returns zero if term doesn't exist. +func (l *TermList) OffsetString(v string) uint32 { + i, ok := l.m[v] + if !ok { + return 0 + } + return l.a[i].offset +} + +// AppendEncodedSeries dictionary encodes a series and appends it to the buffer. +func (l *TermList) AppendEncodedSeries(dst []byte, name string, tags models.Tags) []byte { + var buf [binary.MaxVarintLen32]byte + + // Encode name. + offset := l.OffsetString(name) + if offset == 0 { + panic("name not in term list: " + name) + } + n := binary.PutUvarint(buf[:], uint64(offset)) + dst = append(dst, buf[:n]...) + + // Encode tag count. + n = binary.PutUvarint(buf[:], uint64(len(tags))) + dst = append(dst, buf[:n]...) + + // Encode tags. + for _, t := range tags { + // Encode tag key. + offset := l.Offset(t.Key) + if offset == 0 { + panic("tag key not in term list: " + string(t.Key)) + } + n := binary.PutUvarint(buf[:], uint64(offset)) + dst = append(dst, buf[:n]...) + + // Encode tag value. + offset = l.Offset(t.Value) + if offset == 0 { + panic("tag value not in term list: " + string(t.Value)) + } + n = binary.PutUvarint(buf[:], uint64(offset)) + dst = append(dst, buf[:n]...) + } + + return dst +} + +// termListElem represents an element in a term list. +type termListElem struct { + term string // term value + freq int // term frequency + offset uint32 // position in file +} + +// termListElems represents a list of elements sorted by descending frequency. +type termListElems []termListElem + +func (a termListElems) Len() int { return len(a) } +func (a termListElems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a termListElems) Less(i, j int) bool { + if a[i].freq != a[j].freq { + return a[i].freq > a[i].freq + } + return a[i].term < a[j].term +} diff --git a/tsdb/engine/tsi1/tsi1.go b/tsdb/engine/tsi1/tsi1.go new file mode 100644 index 0000000000..f82842c41e --- /dev/null +++ b/tsdb/engine/tsi1/tsi1.go @@ -0,0 +1,74 @@ +package tsi1 + +import ( + "encoding/binary" + "fmt" + "io" + "os" +) + +// writeTo writes write v into w. Updates n. +func writeTo(w io.Writer, v []byte, n *int64) error { + nn, err := w.Write(v) + *n += int64(nn) + return err +} + +// writeUint8To writes write v into w. Updates n. +func writeUint8To(w io.Writer, v uint8, n *int64) error { + nn, err := w.Write([]byte{v}) + *n += int64(nn) + return err +} + +// writeUint16To writes write v into w using big endian encoding. Updates n. +func writeUint16To(w io.Writer, v uint16, n *int64) error { + var buf [2]byte + binary.BigEndian.PutUint16(buf[:], v) + nn, err := w.Write(buf[:]) + *n += int64(nn) + return err +} + +// writeUint32To writes write v into w using big endian encoding. Updates n. +func writeUint32To(w io.Writer, v uint32, n *int64) error { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], v) + nn, err := w.Write(buf[:]) + *n += int64(nn) + return err +} + +// writeUint64To writes write v into w using big endian encoding. Updates n. +func writeUint64To(w io.Writer, v uint64, n *int64) error { + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], v) + nn, err := w.Write(buf[:]) + *n += int64(nn) + return err +} + +// writeUvarintTo writes write v into w using variable length encoding. Updates n. +func writeUvarintTo(w io.Writer, v uint64, n *int64) error { + var buf [binary.MaxVarintLen64]byte + i := binary.PutUvarint(buf[:], v) + nn, err := w.Write(buf[:i]) + *n += int64(nn) + return err +} + +func Hexdump(data []byte) { + addr := 0 + for len(data) > 0 { + n := len(data) + if n > 16 { + n = 16 + } + + fmt.Fprintf(os.Stderr, "%07x % x\n", addr, data[:n]) + + data = data[n:] + addr += n + } + fmt.Fprintln(os.Stderr, "") +}