From 6f58149052326165911a3c701d5c8b2512eba226 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 10 Apr 2017 09:17:57 -0600 Subject: [PATCH 01/10] Increase tsi compaction factor. --- tsdb/index/tsi1/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 69cb7ad24e..c43b03082f 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -27,7 +27,7 @@ const IndexName = "tsi1" // Default compaction thresholds. const ( DefaultMaxLogFileSize = 5 * 1024 * 1024 - DefaultCompactionFactor = 1.8 + DefaultCompactionFactor = 10 // 1.8 ) func init() { From f3e08c58711e95d6494d450d4123cd29db2023f8 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 12 Apr 2017 10:38:24 -0600 Subject: [PATCH 02/10] Delta encode tag and measurement block series data. --- tsdb/index/tsi1/index_file.go | 1 + tsdb/index/tsi1/measurement_block.go | 59 ++++++++++++++++----- tsdb/index/tsi1/tag_block.go | 78 ++++++++++++++-------------- 3 files changed, 86 insertions(+), 52 deletions(-) diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index 6c6723b44e..c31b4f1896 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -265,6 +265,7 @@ func (f *IndexFile) TagValueSeriesIterator(name, key, value []byte) SeriesIterat return newSeriesDecodeIterator( &f.sblk, &rawSeriesIDIterator{ + n: ve.(*TagBlockValueElem).series.n, data: ve.(*TagBlockValueElem).series.data, }, ) diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index 205b5620fc..ed10e9717f 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -148,7 +148,7 @@ func (blk *MeasurementBlock) seriesIDIterator(name []byte) seriesIDIterator { if !ok { return &rawSeriesIDIterator{} } - return &rawSeriesIDIterator{data: e.series.data} + return &rawSeriesIDIterator{n: e.series.n, data: e.series.data} } // blockMeasurementIterator iterates over a list measurements in a block. @@ -175,6 +175,8 @@ func (itr *blockMeasurementIterator) Next() MeasurementElem { // rawSeriesIterator iterates over a list of raw series data. type rawSeriesIDIterator struct { + prev uint64 + n uint64 data []byte } @@ -184,9 +186,12 @@ func (itr *rawSeriesIDIterator) next() uint64 { return 0 } - id := binary.BigEndian.Uint64(itr.data) - itr.data = itr.data[SeriesIDSize:] - return id + delta, n := binary.Uvarint(itr.data) + itr.data = itr.data[n:] + + seriesID := itr.prev + delta + itr.prev = seriesID + return seriesID } // MeasurementBlockTrailer represents meta data at the end of a MeasurementBlock. @@ -334,9 +339,15 @@ func (e *MeasurementBlockElem) SeriesID(i int) uint64 { // SeriesIDs returns a list of decoded series ids. func (e *MeasurementBlockElem) SeriesIDs() []uint64 { - a := make([]uint64, e.series.n) - for i := 0; i < int(e.series.n); i++ { - a[i] = e.SeriesID(i) + a := make([]uint64, 0, e.series.n) + var prev uint64 + for data := e.series.data; len(data) > 0; { + delta, n := binary.Uvarint(data) + data = data[n:] + + seriesID := prev + delta + a = append(a, seriesID) + prev = seriesID } return a } @@ -362,7 +373,9 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { // Parse series data. v, n := binary.Uvarint(data) e.series.n, data = uint64(v), data[n:] - e.series.data, data = data[:e.series.n*SeriesIDSize], data[e.series.n*SeriesIDSize:] + sz, n = binary.Uvarint(data) + data = data[n:] + e.series.data, data = data[:sz], data[sz:] // Save length of elem. e.size = start - len(data) @@ -372,6 +385,7 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { // MeasurementBlockWriter writes a measurement block. type MeasurementBlockWriter struct { + buf bytes.Buffer mms map[string]measurement // Measurement sketch and tombstoned measurement sketch. @@ -518,14 +532,33 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m return err } - // Write series count & ids. + // Write series data to buffer. + mw.buf.Reset() + var prev uint64 + for _, seriesID := range mm.seriesIDs { + delta := seriesID - prev + + var buf [binary.MaxVarintLen64]byte + i := binary.PutUvarint(buf[:], delta) + if _, err := mw.buf.Write(buf[:i]); err != nil { + return err + } + + prev = seriesID + } + + // Write series count. if err := writeUvarintTo(w, uint64(len(mm.seriesIDs)), n); err != nil { return err } - for _, seriesID := range mm.seriesIDs { - if err := writeUint64To(w, seriesID, n); err != nil { - return err - } + + // Write data size & buffer. + if err := writeUvarintTo(w, uint64(mw.buf.Len()), n); err != nil { + return err + } + nn, err := mw.buf.WriteTo(w) + if *n += nn; err != nil { + return err } return nil diff --git a/tsdb/index/tsi1/tag_block.go b/tsdb/index/tsi1/tag_block.go index b22b8a1933..db14ced6a3 100644 --- a/tsdb/index/tsi1/tag_block.go +++ b/tsdb/index/tsi1/tag_block.go @@ -326,9 +326,15 @@ func (e *TagBlockValueElem) SeriesID(i int) uint64 { // SeriesIDs returns a list decoded series ids. func (e *TagBlockValueElem) SeriesIDs() []uint64 { - a := make([]uint64, e.series.n) - for i := 0; i < int(e.series.n); i++ { - a[i] = e.SeriesID(i) + a := make([]uint64, 0, e.series.n) + var prev uint64 + for data := e.series.data; len(data) > 0; { + delta, n := binary.Uvarint(data) + data = data[n:] + + seriesID := prev + delta + a = append(a, seriesID) + prev = seriesID } return a } @@ -351,9 +357,13 @@ func (e *TagBlockValueElem) unmarshal(buf []byte) { e.series.n, n = binary.Uvarint(buf) buf = buf[n:] + // Parse data block size. + sz, n = binary.Uvarint(buf) + buf = buf[n:] + // Save reference to series data. - e.series.data = buf[:e.series.n*SeriesIDSize] - buf = buf[e.series.n*SeriesIDSize:] + e.series.data = buf[:sz] + buf = buf[sz:] // Save length of elem. e.size = start - len(buf) @@ -457,7 +467,8 @@ func ReadTagBlockTrailer(data []byte) (TagBlockTrailer, error) { // TagBlockEncoder encodes a tags to a TagBlock section. type TagBlockEncoder struct { - w io.Writer + w io.Writer + buf bytes.Buffer // Track value offsets. offsets *rhh.HashMap @@ -542,16 +553,33 @@ func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs [] return err } + // Build series data in buffer. + enc.buf.Reset() + var prev uint64 + for _, seriesID := range seriesIDs { + delta := seriesID - prev + + var buf [binary.MaxVarintLen64]byte + i := binary.PutUvarint(buf[:], delta) + if _, err := enc.buf.Write(buf[:i]); err != nil { + return err + } + + prev = seriesID + } + // Write series count. if err := writeUvarintTo(enc.w, uint64(len(seriesIDs)), &enc.n); err != nil { return err } - // Write series ids. - for _, seriesID := range seriesIDs { - if err := writeUint64To(enc.w, seriesID, &enc.n); err != nil { - return err - } + // Write data size & buffer. + if err := writeUvarintTo(enc.w, uint64(enc.buf.Len()), &enc.n); err != nil { + return err + } + nn, err := enc.buf.WriteTo(enc.w) + if enc.n += nn; err != nil { + return err } return nil @@ -721,31 +749,3 @@ func encodeTagValueFlag(deleted bool) byte { } return flag } - -/* -type tagSet struct { - deleted bool - data struct { - offset int64 - size int64 - } - hashIndex struct { - offset int64 - size int64 - } - values map[string]tagValue - - offset int64 -} - -func (ts tagSet) flag() byte { return encodeTagKeyFlag(ts.deleted) } - -type tagValue struct { - seriesIDs []uint64 - deleted bool - - offset int64 -} - -func (tv tagValue) flag() byte { return encodeTagValueFlag(tv.deleted) } -*/ From 48a06432dfbe7d1f8e8b5b95c324555b2a5c87fd Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 17 Apr 2017 15:32:20 -0600 Subject: [PATCH 03/10] Add tsi1 bloom filter. --- pkg/bloom/bloom.go | 113 +++++++++++++++++++++++++++ pkg/bloom/bloom_test.go | 29 +++++++ tsdb/index/tsi1/index_files.go | 11 ++- tsdb/index/tsi1/log_file.go | 8 +- tsdb/index/tsi1/series_block.go | 64 ++++++++++++++- tsdb/index/tsi1/series_block_test.go | 2 +- 6 files changed, 223 insertions(+), 4 deletions(-) create mode 100644 pkg/bloom/bloom.go create mode 100644 pkg/bloom/bloom_test.go diff --git a/pkg/bloom/bloom.go b/pkg/bloom/bloom.go new file mode 100644 index 0000000000..e3693f516c --- /dev/null +++ b/pkg/bloom/bloom.go @@ -0,0 +1,113 @@ +package bloom + +// NOTE: +// This package implements a limited bloom filter implementation based on +// Will Fitzgerald's bloom & bitset packages. It's implemented locally to +// support zero-copy memory-mapped slices. +// +// This also optimizes the filter by always using a bitset size with a power of 2. + +import ( + "fmt" + "math" + + "github.com/spaolacci/murmur3" +) + +// Filter represents a bloom filter. +type Filter struct { + k uint64 + b []byte + mask uint64 +} + +// NewFilter returns a new instance of Filter using m bits and k hash functions. +// If m is not a power of two then it is rounded to the next highest power of 2. +func NewFilter(m uint64, k uint64) *Filter { + m = pow2(m) + + return &Filter{ + k: k, + b: make([]byte, m/8), + mask: m - 1, + } +} + +// NewFilterBuffer returns a new instance of a filter using a backing buffer. +// The buffer length MUST be a power of 2. +func NewFilterBuffer(buf []byte, k uint64) (*Filter, error) { + m := pow2(uint64(len(buf)) * 8) + if m != uint64(len(buf))*8 { + return nil, fmt.Errorf("bloom.Filter: buffer bit count must a power of two: %d/%d", len(buf)*8, m) + } + + return &Filter{ + k: k, + b: buf, + mask: m - 1, + }, nil +} + +// Len returns the number of bits used in the filter. +func (f *Filter) Len() uint { return uint(len(f.b)) } + +// K returns the number of hash functions used in the filter. +func (f *Filter) K() uint64 { return f.k } + +// Bytes returns the underlying backing slice. +func (f *Filter) Bytes() []byte { return f.b } + +// Insert inserts data to the filter. +func (f *Filter) Insert(v []byte) { + h := hash(v) + for i := uint64(0); i < f.k; i++ { + loc := f.location(h, i) + f.b[loc/8] |= 1 << (loc % 8) + } +} + +// Contains returns true if the filter possibly contains v. +// Returns false if the filter definitely does not contain v. +func (f *Filter) Contains(v []byte) bool { + h := hash(v) + for i := uint64(0); i < f.k; i++ { + loc := f.location(h, i) + if f.b[loc/8]&(1<<(loc%8)) == 0 { + return false + } + } + return true +} + +// location returns the ith hashed location using the four base hash values. +func (f *Filter) location(h [4]uint64, i uint64) uint { + return uint((h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) & f.mask) +} + +// Estimate returns an estimated bit count and hash count given the element count and false positive rate. +func Estimate(n uint64, p float64) (m uint64, k uint64) { + m = uint64(math.Ceil(-1 * float64(n) * math.Log(p) / math.Pow(math.Log(2), 2))) + k = uint64(math.Ceil(math.Log(2) * float64(m) / float64(n))) + return m, k +} + +// pow2 returns the number that is the next highest power of 2. +// Returns v if it is a power of 2. +func pow2(v uint64) uint64 { + for i := uint64(8); i < 1<<62; i *= 2 { + if i >= v { + return i + } + } + panic("unreachable") +} + +// hash returns a set of 4 based hashes. +func hash(data []byte) [4]uint64 { + h := murmur3.New128() + h.Write(data) + v1, v2 := h.Sum128() + h.Write([]byte{1}) + v3, v4 := h.Sum128() + return [4]uint64{v1, v2, v3, v4} +} diff --git a/pkg/bloom/bloom_test.go b/pkg/bloom/bloom_test.go new file mode 100644 index 0000000000..33182da6b2 --- /dev/null +++ b/pkg/bloom/bloom_test.go @@ -0,0 +1,29 @@ +package bloom_test + +import ( + "testing" + + "github.com/influxdata/influxdb/pkg/bloom" +) + +// Ensure filter can insert values and verify they exist. +func TestFilter_InsertContains(t *testing.T) { + f := bloom.NewFilter(1000, 4) + + // Insert value and validate. + f.Insert([]byte("Bess")) + if !f.Contains([]byte("Bess")) { + t.Fatal("expected true") + } + + // Insert another value and test. + f.Insert([]byte("Emma")) + if !f.Contains([]byte("Emma")) { + t.Fatal("expected true") + } + + // Validate that a non-existent value doesn't exist. + if f.Contains([]byte("Jane")) { + t.Fatal("expected false") + } +} diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 5a9d3a8da5..21865a5e5d 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -8,6 +8,7 @@ import ( "sort" "time" + "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" ) @@ -187,8 +188,16 @@ func (p IndexFiles) WriteTo(w io.Writer) (n int64, err error) { } func (p IndexFiles) writeSeriesBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { + // Estimate series cardinality. + sketch := hll.NewDefaultPlus() + for _, f := range p { + if err := f.MergeSeriesSketches(sketch, sketch); err != nil { + return err + } + } + itr := p.SeriesIterator() - enc := NewSeriesBlockEncoder(w) + enc := NewSeriesBlockEncoder(w, sketch.Count()) // Write all series. for e := itr.Next(); e != nil; e = itr.Next() { diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 4198f2dd35..240d9484a3 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -808,8 +808,14 @@ func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) { } func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { + // Determine series count. + var seriesN uint64 + for _, mm := range f.mms { + seriesN += uint64(len(mm.series)) + } + // Write all series. - enc := NewSeriesBlockEncoder(w) + enc := NewSeriesBlockEncoder(w, seriesN) // Add series from measurements. for _, name := range names { diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index a7b8d17332..4ac1b825aa 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/pkg/mmap" @@ -20,12 +21,16 @@ import ( // ErrSeriesOverflow is returned when too many series are added to a series writer. var ErrSeriesOverflow = errors.New("series overflow") +// BloomFalsePositiveRate is the false positive rate of the series bloom filter. +const BloomFalsePositiveRate = 0.02 + // Series list field size constants. const ( // Series list trailer field sizes. SeriesBlockTrailerSize = 0 + 8 + 8 + // series data offset/size 8 + 8 + 8 + // series index offset/size/capacity + 8 + 8 + 8 + // bloom filter false positive rate, offset/size 8 + 8 + // series sketch offset/size 8 + 8 + // tombstone series sketch offset/size 8 + 8 + // series count and tombstone count @@ -61,6 +66,9 @@ type SeriesBlock struct { seriesN int64 tombstoneN int64 + // Bloom filter used for fast series existence check. + filter *bloom.Filter + // Series block sketch and tombstone sketch for cardinality estimation. // While we have exact counts for the block, these sketches allow us to // estimate cardinality across multiple blocks (which might contain @@ -97,6 +105,14 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse buf = AppendSeriesKey(buf[:0], name, tags) bufN := uint64(len(buf)) + // Quickly check the bloom filter. + // If the key doesn't exist then we know for sure that it doesn't exist. + // If it does exist then we need to do a hash index check to verify. False + // positives are possible with a bloom filter. + if !blk.filter.Contains(buf) { + return 0, false + } + // Find the correct partition. // Use previous index unless an exact match on the min value. i := sort.Search(len(blk.seriesIndexes), func(i int) bool { @@ -196,6 +212,13 @@ func (blk *SeriesBlock) UnmarshalBinary(data []byte) error { return fmt.Errorf("data remaining in index list buffer: %d", len(buf)) } + // Initialize bloom filter. + filter, err := bloom.NewFilterBuffer(data[t.Bloom.Offset:][:t.Bloom.Size], t.Bloom.K) + if err != nil { + return err + } + blk.filter = filter + // Initialise sketches. We're currently using HLL+. var s, ts = hll.NewDefaultPlus(), hll.NewDefaultPlus() if err := s.UnmarshalBinary(data[t.Sketch.Offset:][:t.Sketch.Size]); err != nil { @@ -510,13 +533,18 @@ type SeriesBlockEncoder struct { indexMin []byte indexes []seriesBlockIndexEncodeInfo + // Bloom filter to check for series existance. + filter *bloom.Filter + // Series sketch and tombstoned series sketch. These must be // set before calling WriteTo. sketch, tSketch estimator.Sketch } // NewSeriesBlockEncoder returns a new instance of SeriesBlockEncoder. -func NewSeriesBlockEncoder(w io.Writer) *SeriesBlockEncoder { +func NewSeriesBlockEncoder(w io.Writer, n uint64) *SeriesBlockEncoder { + m, k := bloom.Estimate(n, BloomFalsePositiveRate) + return &SeriesBlockEncoder{ w: w, @@ -525,6 +553,8 @@ func NewSeriesBlockEncoder(w io.Writer) *SeriesBlockEncoder { LoadFactor: LoadFactor, }), + filter: bloom.NewFilter(m, k), + sketch: hll.NewDefaultPlus(), tSketch: hll.NewDefaultPlus(), } @@ -574,6 +604,9 @@ func (enc *SeriesBlockEncoder) Encode(name []byte, tags models.Tags, deleted boo // Key is copied by the RHH map. enc.offsets.Put(buf[1:], uint64(offset)) + // Update bloom filter. + enc.filter.Insert(buf[1:]) + // Update sketches & trailer. if deleted { enc.trailer.TombstoneN++ @@ -609,6 +642,14 @@ func (enc *SeriesBlockEncoder) Close() error { } enc.trailer.Series.Index.Size = enc.n - enc.trailer.Series.Index.Offset + // Flush bloom filter. + enc.trailer.Bloom.K = enc.filter.K() + enc.trailer.Bloom.Offset = enc.n + if err := writeTo(enc.w, enc.filter.Bytes(), &enc.n); err != nil { + return err + } + enc.trailer.Bloom.Size = enc.n - enc.trailer.Bloom.Offset + // Write the sketches out. enc.trailer.Sketch.Offset = enc.n if err := writeSketchTo(enc.w, enc.sketch, &enc.n); err != nil { @@ -774,6 +815,11 @@ func ReadSeriesBlockTrailer(data []byte) SeriesBlockTrailer { t.Series.Index.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] t.Series.Index.N, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + // Read bloom filter info. + t.Bloom.K, buf = binary.BigEndian.Uint64(buf[0:8]), buf[8:] + t.Bloom.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Bloom.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + // Read series sketch info. t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] @@ -803,6 +849,13 @@ type SeriesBlockTrailer struct { } } + // Bloom filter info. + Bloom struct { + K uint64 + Offset int64 + Size int64 + } + // Offset and size of cardinality sketch for measurements. Sketch struct { Offset int64 @@ -834,6 +887,15 @@ func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { return n, err } + // Write bloom filter info. + if err := writeUint64To(w, t.Bloom.K, &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.Bloom.Offset), &n); err != nil { + return n, err + } else if err := writeUint64To(w, uint64(t.Bloom.Size), &n); err != nil { + return n, err + } + // Write measurement sketch info. if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { return n, err diff --git a/tsdb/index/tsi1/series_block_test.go b/tsdb/index/tsi1/series_block_test.go index b43980371e..5ae8dc98cd 100644 --- a/tsdb/index/tsi1/series_block_test.go +++ b/tsdb/index/tsi1/series_block_test.go @@ -56,7 +56,7 @@ func CreateSeriesBlock(a []Series) (*tsi1.SeriesBlock, error) { var buf bytes.Buffer // Create writer and sketches. Add series. - enc := tsi1.NewSeriesBlockEncoder(&buf) + enc := tsi1.NewSeriesBlockEncoder(&buf, uint64(len(a))) for i, s := range a { if err := enc.Encode(s.Name, s.Tags, s.Deleted); err != nil { return nil, fmt.Errorf("SeriesBlockWriter.Add(): i=%d, err=%s", i, err) From 79edc0979cc85a807384c8fb6cc8c6931b0f488a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 19 Apr 2017 07:38:29 -0600 Subject: [PATCH 04/10] Add temporary debugging stats for offset lookups. --- tsdb/index/tsi1/series_block.go | 36 +++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index 4ac1b825aa..ee94b89bfa 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -8,6 +8,8 @@ import ( "io" "os" "sort" + "sync/atomic" + "time" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" @@ -18,6 +20,34 @@ import ( "github.com/influxdata/influxdb/pkg/rhh" ) +// TEMP +var ( + offsetCount uint64 + offsetFilterFalseCount uint64 + offsetFilterFalsePositiveCount uint64 +) + +// TEMP +func init() { + go func() { + ticker := time.NewTicker(10 * time.Second) + for range ticker.C { + // Read values. + ofc := atomic.LoadUint64(&offsetCount) + offc := atomic.LoadUint64(&offsetFilterFalseCount) + offpc := atomic.LoadUint64(&offsetFilterFalsePositiveCount) + + // Clear values. + atomic.StoreUint64(&offsetCount, 0) + atomic.StoreUint64(&offsetFilterFalseCount, 0) + atomic.StoreUint64(&offsetFilterFalsePositiveCount, 0) + + // Report values. + println("dbg/OFFSET.STATS>>>", ofc, offc, offpc) + } + }() +} + // ErrSeriesOverflow is returned when too many series are added to a series writer. var ErrSeriesOverflow = errors.New("series overflow") @@ -105,6 +135,8 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse buf = AppendSeriesKey(buf[:0], name, tags) bufN := uint64(len(buf)) + atomic.AddUint64(&offsetCount, 1) // TEMP + // Quickly check the bloom filter. // If the key doesn't exist then we know for sure that it doesn't exist. // If it does exist then we need to do a hash index check to verify. False @@ -112,6 +144,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse if !blk.filter.Contains(buf) { return 0, false } + atomic.AddUint64(&offsetFilterFalseCount, 1) // TEMP // Find the correct partition. // Use previous index unless an exact match on the min value. @@ -134,6 +167,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse // Find offset of series. offset := binary.BigEndian.Uint64(seriesIndex.data[pos*SeriesIDSize:]) if offset == 0 { + atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } @@ -146,6 +180,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse // Check if we've exceeded the probe distance. max := rhh.Dist(rhh.HashKey(key), pos, n) if d > max { + atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } @@ -154,6 +189,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse d++ if d > n { + atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } } From 1975940f767a72746a3df0c6047751356e97032a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 25 Apr 2017 10:26:45 -0600 Subject: [PATCH 05/10] intermediate compaction commit --- pkg/bloom/bloom.go | 18 +++ tsdb/index/internal/file_set.go | 11 +- tsdb/index/tsi1/file_set.go | 199 +++++++++++++++++++------------ tsdb/index/tsi1/file_set_test.go | 2 +- tsdb/index/tsi1/index.go | 141 ++++++++++++++++------ tsdb/index/tsi1/index_file.go | 23 ++-- tsdb/index/tsi1/index_files.go | 2 +- tsdb/index/tsi1/log_file.go | 17 ++- 8 files changed, 287 insertions(+), 126 deletions(-) diff --git a/pkg/bloom/bloom.go b/pkg/bloom/bloom.go index e3693f516c..1316b66043 100644 --- a/pkg/bloom/bloom.go +++ b/pkg/bloom/bloom.go @@ -79,6 +79,24 @@ func (f *Filter) Contains(v []byte) bool { return true } +// Merge performs an in-place union of other into f. +// Returns an error if m or k of the filters differs. +func (f *Filter) Merge(other *Filter) error { + // Ensure m & k fields match. + if len(f.b) != len(other.b) { + return fmt.Errorf("bloom.Filter.Merge(): m mismatch: %d <> %d", len(f.b), len(other.b)) + } else if f.k != other.k { + return fmt.Errorf("bloom.Filter.Merge(): k mismatch: %d <> %d", f.b, other.b) + } + + // Perform union of each byte. + for i := range f.b { + f.b[i] |= other.b[i] + } + + return nil +} + // location returns the ith hashed location using the four base hash values. func (f *Filter) location(h [4]uint64, i uint64) uint { return uint((h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) & f.mask) diff --git a/tsdb/index/internal/file_set.go b/tsdb/index/internal/file_set.go index a6a9b03ef4..dd3debddff 100644 --- a/tsdb/index/internal/file_set.go +++ b/tsdb/index/internal/file_set.go @@ -2,6 +2,7 @@ package internal import ( "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) @@ -10,6 +11,8 @@ import ( type File struct { Closef func() error Pathf func() string + IDf func() int + Levelf func() int FilterNameTagsf func(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) Measurementf func(name []byte) tsi1.MeasurementElem MeasurementIteratorf func() tsi1.MeasurementIterator @@ -28,10 +31,13 @@ type File struct { MergeMeasurementsSketchesf func(s, t estimator.Sketch) error Retainf func() Releasef func() + Filterf func() *bloom.Filter } func (f *File) Close() error { return f.Closef() } func (f *File) Path() string { return f.Pathf() } +func (f *File) ID() int { return f.IDf() } +func (f *File) Level() int { return f.Levelf() } func (f *File) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { return f.FilterNameTagsf(names, tagsSlice) } @@ -64,5 +70,6 @@ func (f *File) MergeSeriesSketches(s, t estimator.Sketch) error { return f.Merge func (f *File) MergeMeasurementsSketches(s, t estimator.Sketch) error { return f.MergeMeasurementsSketchesf(s, t) } -func (f *File) Retain() { f.Retainf() } -func (f *File) Release() { f.Releasef() } +func (f *File) Retain() { f.Retainf() } +func (f *File) Release() { f.Releasef() } +func (f *File) Filter() *bloom.Filter { return f.Filterf() } diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 00771f96c1..22ea269a76 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator/hll" @@ -15,7 +16,18 @@ import ( ) // FileSet represents a collection of files. -type FileSet []File +type FileSet struct { + levels []CompactionLevel + files []File + filters []*bloom.Filter // per-level filters +} + +// NewFileSet returns a new instance of FileSet. +func NewFileSet(levels []CompactionLevel, files []File) *FileSet { + fs := &FileSet{levels: levels, files: files} + fs.buildFilters() + return fs +} // Close closes all the files in the file set. func (p FileSet) Close() error { @@ -29,55 +41,55 @@ func (p FileSet) Close() error { } // Retain adds a reference count to all files. -func (p FileSet) Retain() { - for _, f := range p { +func (fs *FileSet) Retain() { + for _, f := range fs.files { f.Retain() } } // Release removes a reference count from all files. -func (p FileSet) Release() { - for _, f := range p { +func (fs *FileSet) Release() { + for _, f := range fs.files { f.Release() } } // MustReplace swaps a list of files for a single file and returns a new file set. // The caller should always guarentee that the files exist and are contiguous. -func (p FileSet) MustReplace(oldFiles []File, newFile File) FileSet { +func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet { assert(len(oldFiles) > 0, "cannot replace empty files") // Find index of first old file. var i int - for ; i < len(p); i++ { - if p[i] == oldFiles[0] { + for ; i < len(fs.files); i++ { + if fs.files[i] == oldFiles[0] { break - } else if i == len(p)-1 { + } else if i == len(fs.files)-1 { panic("first replacement file not found") } } // Ensure all old files are contiguous. for j := range oldFiles { - if p[i+j] != oldFiles[j] { + if fs.files[i+j] != oldFiles[j] { panic("cannot replace non-contiguous files") } } // Copy to new fileset. - other := make([]File, len(p)-len(oldFiles)+1) - copy(other[:i], p[:i]) + other := make([]File, len(fs.files)-len(oldFiles)+1) + copy(other[:i], fs.files[:i]) other[i] = newFile - copy(other[i+1:], p[i+len(oldFiles):]) + copy(other[i+1:], fs.files[i+len(oldFiles):]) - return other + return NewFileSet(fs.levels, other) } // MaxID returns the highest file identifier. -func (fs FileSet) MaxID() int { +func (fs *FileSet) MaxID() int { var max int - for _, f := range fs { - if i := ParseFileID(f.Path()); i > max { + for _, f := range fs.files { + if i := f.ID(); i > max { max = i } } @@ -85,9 +97,9 @@ func (fs FileSet) MaxID() int { } // LogFiles returns all log files from the file set. -func (fs FileSet) LogFiles() []*LogFile { +func (fs *FileSet) LogFiles() []*LogFile { var a []*LogFile - for _, f := range fs { + for _, f := range fs.files { if f, ok := f.(*LogFile); ok { a = append(a, f) } @@ -96,9 +108,9 @@ func (fs FileSet) LogFiles() []*LogFile { } // IndexFiles returns all index files from the file set. -func (fs FileSet) IndexFiles() []*IndexFile { +func (fs *FileSet) IndexFiles() []*IndexFile { var a []*IndexFile - for _, f := range fs { + for _, f := range fs.files { if f, ok := f.(*IndexFile); ok { a = append(a, f) } @@ -107,9 +119,9 @@ func (fs FileSet) IndexFiles() []*IndexFile { } // SeriesIterator returns an iterator over all series in the index. -func (fs FileSet) SeriesIterator() SeriesIterator { - a := make([]SeriesIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) SeriesIterator() SeriesIterator { + a := make([]SeriesIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.SeriesIterator() if itr == nil { continue @@ -120,8 +132,8 @@ func (fs FileSet) SeriesIterator() SeriesIterator { } // Measurement returns a measurement by name. -func (fs FileSet) Measurement(name []byte) MeasurementElem { - for _, f := range fs { +func (fs *FileSet) Measurement(name []byte) MeasurementElem { + for _, f := range fs.files { if e := f.Measurement(name); e == nil { continue } else if e.Deleted() { @@ -134,9 +146,9 @@ func (fs FileSet) Measurement(name []byte) MeasurementElem { } // MeasurementIterator returns an iterator over all measurements in the index. -func (fs FileSet) MeasurementIterator() MeasurementIterator { - a := make([]MeasurementIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) MeasurementIterator() MeasurementIterator { + a := make([]MeasurementIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.MeasurementIterator() if itr != nil { a = append(a, itr) @@ -147,9 +159,9 @@ func (fs FileSet) MeasurementIterator() MeasurementIterator { // MeasurementSeriesIterator returns an iterator over all non-tombstoned series // in the index for the provided measurement. -func (fs FileSet) MeasurementSeriesIterator(name []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) MeasurementSeriesIterator(name []byte) SeriesIterator { + a := make([]SeriesIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.MeasurementSeriesIterator(name) if itr != nil { a = append(a, itr) @@ -159,9 +171,9 @@ func (fs FileSet) MeasurementSeriesIterator(name []byte) SeriesIterator { } // TagKeyIterator returns an iterator over all tag keys for a measurement. -func (fs FileSet) TagKeyIterator(name []byte) TagKeyIterator { - a := make([]TagKeyIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) TagKeyIterator(name []byte) TagKeyIterator { + a := make([]TagKeyIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.TagKeyIterator(name) if itr != nil { a = append(a, itr) @@ -171,7 +183,7 @@ func (fs FileSet) TagKeyIterator(name []byte) TagKeyIterator { } // MeasurementTagKeysByExpr extracts the tag keys wanted by the expression. -func (fs FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { +func (fs *FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { @@ -231,7 +243,7 @@ func (fs FileSet) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map } // tagKeysByFilter will filter the tag keys for the measurement. -func (fs FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, regex *regexp.Regexp) map[string]struct{} { +func (fs *FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, regex *regexp.Regexp) map[string]struct{} { ss := make(map[string]struct{}) itr := fs.TagKeyIterator(name) for e := itr.Next(); e != nil; e = itr.Next() { @@ -256,9 +268,9 @@ func (fs FileSet) tagKeysByFilter(name []byte, op influxql.Token, val []byte, re } // TagKeySeriesIterator returns a series iterator for all values across a single key. -func (fs FileSet) TagKeySeriesIterator(name, key []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) TagKeySeriesIterator(name, key []byte) SeriesIterator { + a := make([]SeriesIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.TagKeySeriesIterator(name, key) if itr != nil { a = append(a, itr) @@ -268,8 +280,8 @@ func (fs FileSet) TagKeySeriesIterator(name, key []byte) SeriesIterator { } // HasTagKey returns true if the tag key exists. -func (fs FileSet) HasTagKey(name, key []byte) bool { - for _, f := range fs { +func (fs *FileSet) HasTagKey(name, key []byte) bool { + for _, f := range fs.files { if e := f.TagKey(name, key); e != nil { return !e.Deleted() } @@ -278,8 +290,8 @@ func (fs FileSet) HasTagKey(name, key []byte) bool { } // HasTagValue returns true if the tag value exists. -func (fs FileSet) HasTagValue(name, key, value []byte) bool { - for _, f := range fs { +func (fs *FileSet) HasTagValue(name, key, value []byte) bool { + for _, f := range fs.files { if e := f.TagValue(name, key, value); e != nil { return !e.Deleted() } @@ -288,9 +300,9 @@ func (fs FileSet) HasTagValue(name, key, value []byte) bool { } // TagValueIterator returns a value iterator for a tag key. -func (fs FileSet) TagValueIterator(name, key []byte) TagValueIterator { - a := make([]TagValueIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) TagValueIterator(name, key []byte) TagValueIterator { + a := make([]TagValueIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.TagValueIterator(name, key) if itr != nil { a = append(a, itr) @@ -300,9 +312,9 @@ func (fs FileSet) TagValueIterator(name, key []byte) TagValueIterator { } // TagValueSeriesIterator returns a series iterator for a single tag value. -func (fs FileSet) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { - a := make([]SeriesIterator, 0, len(fs)) - for _, f := range fs { +func (fs *FileSet) TagValueSeriesIterator(name, key, value []byte) SeriesIterator { + a := make([]SeriesIterator, 0, len(fs.files)) + for _, f := range fs.files { itr := f.TagValueSeriesIterator(name, key, value) if itr != nil { a = append(a, itr) @@ -313,7 +325,7 @@ func (fs FileSet) TagValueSeriesIterator(name, key, value []byte) SeriesIterator // MatchTagValueSeriesIterator returns a series iterator for tags which match value. // If matches is false, returns iterators which do not match value. -func (fs FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator { +func (fs *FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Regexp, matches bool) SeriesIterator { matchEmpty := value.MatchString("") if matches { @@ -329,7 +341,7 @@ func (fs FileSet) MatchTagValueSeriesIterator(name, key []byte, value *regexp.Re return FilterUndeletedSeriesIterator(fs.matchTagValueNotEqualNotEmptySeriesIterator(name, key, value)) } -func (fs FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return fs.MeasurementSeriesIterator(name) @@ -348,7 +360,7 @@ func (fs FileSet) matchTagValueEqualEmptySeriesIterator(name, key []byte, value ) } -func (fs FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return nil @@ -363,7 +375,7 @@ func (fs FileSet) matchTagValueEqualNotEmptySeriesIterator(name, key []byte, val return MergeSeriesIterators(itrs...) } -func (fs FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return nil @@ -378,7 +390,7 @@ func (fs FileSet) matchTagValueNotEqualEmptySeriesIterator(name, key []byte, val return MergeSeriesIterators(itrs...) } -func (fs FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { +func (fs *FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, value *regexp.Regexp) SeriesIterator { vitr := fs.TagValueIterator(name, key) if vitr == nil { return fs.MeasurementSeriesIterator(name) @@ -397,7 +409,7 @@ func (fs FileSet) matchTagValueNotEqualNotEmptySeriesIterator(name, key []byte, ) } -func (fs FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { // Return filtered list if expression exists. if expr != nil { return fs.measurementNamesByExpr(expr) @@ -412,7 +424,7 @@ func (fs FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { return names, nil } -func (fs FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { +func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { if expr == nil { return nil, nil } @@ -479,7 +491,7 @@ func (fs FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error) { } // measurementNamesByNameFilter returns matching measurement names in sorted order. -func (fs FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte { +func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte { var names [][]byte itr := fs.MeasurementIterator() for e := itr.Next(); e != nil; e = itr.Next() { @@ -503,7 +515,7 @@ func (fs FileSet) measurementNamesByNameFilter(op influxql.Token, val string, re return names } -func (fs FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte { +func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string, regex *regexp.Regexp) [][]byte { var names [][]byte mitr := fs.MeasurementIterator() @@ -548,8 +560,8 @@ func (fs FileSet) measurementNamesByTagFilter(op influxql.Token, key, val string } // HasSeries returns true if the series exists and is not tombstoned. -func (fs FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool { - for _, f := range fs { +func (fs *FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool { + for _, f := range fs.files { if exists, tombstoned := f.HasSeries(name, tags, buf); exists { return !tombstoned } @@ -559,19 +571,19 @@ func (fs FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool { // FilterNamesTags filters out any series which already exist. It modifies the // provided slices of names and tags. -func (fs FileSet) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { - for _, f := range fs { +func (fs *FileSet) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { + for _, f := range fs.files { names, tagsSlice = f.FilterNamesTags(names, tagsSlice) } return names, tagsSlice } // SeriesSketches returns the merged series sketches for the FileSet. -func (fs FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { +func (fs *FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus() // Iterate over all the files and merge the sketches into the result. - for _, f := range fs { + for _, f := range fs.files { if err := f.MergeSeriesSketches(sketch, tsketch); err != nil { return nil, nil, err } @@ -580,11 +592,11 @@ func (fs FileSet) SeriesSketches() (estimator.Sketch, estimator.Sketch, error) { } // MeasurementsSketches returns the merged measurement sketches for the FileSet. -func (fs FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { +func (fs *FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) { sketch, tsketch := hll.NewDefaultPlus(), hll.NewDefaultPlus() // Iterate over all the files and merge the sketches into the result. - for _, f := range fs { + for _, f := range fs.files { if err := f.MergeMeasurementsSketches(sketch, tsketch); err != nil { return nil, nil, err } @@ -595,7 +607,7 @@ func (fs FileSet) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, er // MeasurementSeriesByExprIterator returns a series iterator for a measurement // that is filtered by expr. If expr only contains time expressions then this // call is equivalent to MeasurementSeriesIterator(). -func (fs FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (SeriesIterator, error) { +func (fs *FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) (SeriesIterator, error) { // Return all series for the measurement if there are no tag expressions. if expr == nil || influxql.OnlyTimeExpr(expr) { return fs.MeasurementSeriesIterator(name), nil @@ -604,7 +616,7 @@ func (fs FileSet) MeasurementSeriesByExprIterator(name []byte, expr influxql.Exp } // MeasurementSeriesKeysByExpr returns a list of series keys matching expr. -func (fs FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([][]byte, error) { +func (fs *FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr, fieldset *tsdb.MeasurementFieldSet) ([][]byte, error) { // Create iterator for all matching series. itr, err := fs.MeasurementSeriesByExprIterator(name, expr, fieldset) if err != nil { @@ -627,7 +639,7 @@ func (fs FileSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr, f return keys, nil } -func (fs FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { +func (fs *FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { switch expr := expr.(type) { case *influxql.BinaryExpr: switch expr.Op { @@ -665,7 +677,7 @@ func (fs FileSet) seriesByExprIterator(name []byte, expr influxql.Expr, mf *tsdb } // seriesByBinaryExprIterator returns a series iterator and a filtering expression. -func (fs FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr, mf *tsdb.MeasurementFields) (SeriesIterator, error) { // If this binary expression has another binary expression, then this // is some expression math and we should just pass it to the underlying query. if _, ok := n.LHS.(*influxql.BinaryExpr); ok { @@ -716,7 +728,7 @@ func (fs FileSet) seriesByBinaryExprIterator(name []byte, n *influxql.BinaryExpr } } -func (fs FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op influxql.Token) (SeriesIterator, error) { // Special handling for "_name" to match measurement name. if bytes.Equal(key, []byte("_name")) { if (op == influxql.EQ && bytes.Equal(value, name)) || (op == influxql.NEQ && !bytes.Equal(value, name)) { @@ -750,7 +762,7 @@ func (fs FileSet) seriesByBinaryExprStringIterator(name, key, value []byte, op i return fs.TagKeySeriesIterator(name, key), nil } -func (fs FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regexp.Regexp, op influxql.Token) (SeriesIterator, error) { // Special handling for "_name" to match measurement name. if bytes.Equal(key, []byte("_name")) { match := value.Match(name) @@ -762,7 +774,7 @@ func (fs FileSet) seriesByBinaryExprRegexIterator(name, key []byte, value *regex return fs.MatchTagValueSeriesIterator(name, key, value, op == influxql.EQREGEX), nil } -func (fs FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) { +func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *influxql.VarRef, op influxql.Token) (SeriesIterator, error) { if op == influxql.EQ { return IntersectSeriesIterators( fs.TagKeySeriesIterator(name, key), @@ -776,11 +788,47 @@ func (fs FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *infl ), nil } +// buildFilters builds a series existence filter for each compaction level. +func (fs *FileSet) buildFilters() { + if len(fs.filters) == 0 { + fs.filters = nil + return + } + + // Generate enough filters for each level. + maxLevel := fs.files[len(fs.files)-1].Level() + fs.filters = make([]*bloom.Filter, maxLevel+1) + + // Merge filters at each level. + for _, f := range fs.files { + level := f.Level() + + // Skip if file has no bloom filter. + if f.Filter() == nil { + continue + } + + // Initialize a filter if it doesn't exist. + if fs.filters[level] == nil { + lvl := fs.levels[level] + fs.filters[level] = bloom.NewFilter(lvl.M, lvl.K) + } + + // Merge filter. + if err := fs.filters[level].Merge(f.Filter()); err != nil { + panic(err) + } + } +} + // File represents a log or index file. type File interface { Close() error Path() string + ID() int + Level() int + FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) Measurement(name []byte) MeasurementElem MeasurementIterator() MeasurementIterator @@ -804,6 +852,9 @@ type File interface { MergeSeriesSketches(s, t estimator.Sketch) error MergeMeasurementsSketches(s, t estimator.Sketch) error + // Series existence bloom filter. + Filter() *bloom.Filter + // Reference counting. Retain() Release() diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 88ef5cce42..5bc4490d11 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -270,7 +270,7 @@ func TestFileSet_TagKeyIterator(t *testing.T) { func TestFileSet_FilterNamesTags(t *testing.T) { var mf internal.File - fs := tsi1.FileSet{&mf} + fs := tsi1.NewFileSet(nil, []tsi1.File{&mf}) var ( names [][]byte diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index c43b03082f..9bac961edb 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -26,8 +26,7 @@ const IndexName = "tsi1" // Default compaction thresholds. const ( - DefaultMaxLogFileSize = 5 * 1024 * 1024 - DefaultCompactionFactor = 10 // 1.8 + DefaultMaxLogFileSize = 5 * 1024 * 1024 ) func init() { @@ -60,9 +59,10 @@ type Index struct { opened bool options tsdb.EngineOptions - activeLogFile *LogFile // current log file - fileSet FileSet // current file set - seq int // file id sequence + activeLogFile *LogFile // current log file + fileSet *FileSet // current file set + levels []CompactionLevel // compaction levels + seq int // file id sequence // Close management. once sync.Once @@ -79,8 +79,7 @@ type Index struct { Path string // Log file compaction thresholds. - MaxLogFileSize int64 - CompactionFactor float64 + MaxLogFileSize int64 // Frequency of compaction checks. CompactionEnabled bool @@ -118,12 +117,17 @@ func (i *Index) Open() error { // Read manifest file. m, err := ReadManifestFile(filepath.Join(i.Path, ManifestFileName)) if os.IsNotExist(err) { - m = &Manifest{} + m = NewManifest() } else if err != nil { return err } + // Copy compaction levels to the index. + i.levels = make([]CompactionLevel, len(m.Levels)) + copy(i.levels, m.Levels) + // Open each file in the manifest. + var files []File for _, filename := range m.Files { switch filepath.Ext(filename) { case LogFileExt: @@ -131,7 +135,7 @@ func (i *Index) Open() error { if err != nil { return err } - i.fileSet = append(i.fileSet, f) + files = append(files, f) // Make first log file active, if within threshold. sz, _ := f.Stat() @@ -144,9 +148,10 @@ func (i *Index) Open() error { if err != nil { return err } - i.fileSet = append(i.fileSet, f) + files = append(files, f) } } + i.fileSet = NewFileSet(i.levels, files) // Set initial sequnce number. i.seq = i.fileSet.MaxID() @@ -230,10 +235,10 @@ func (i *Index) Close() error { defer i.mu.Unlock() // Close log files. - for _, f := range i.fileSet { + for _, f := range i.fileSet.files { f.Close() } - i.fileSet = nil + i.fileSet.files = nil return nil } @@ -258,10 +263,11 @@ func (i *Index) ManifestPath() string { // Manifest returns a manifest for the index. func (i *Index) Manifest() *Manifest { m := &Manifest{ - Files: make([]string, len(i.fileSet)), + Levels: i.levels, + Files: make([]string, len(i.fileSet.files)), } - for j, f := range i.fileSet { + for j, f := range i.fileSet.files { m.Files[j] = filepath.Base(f.Path()) } @@ -281,21 +287,21 @@ func (i *Index) SetFieldSet(fs *tsdb.MeasurementFieldSet) { } // RetainFileSet returns the current fileset and adds a reference count. -func (i *Index) RetainFileSet() FileSet { +func (i *Index) RetainFileSet() *FileSet { i.mu.RLock() fs := i.retainFileSet() i.mu.RUnlock() return fs } -func (i *Index) retainFileSet() FileSet { +func (i *Index) retainFileSet() *FileSet { fs := i.fileSet fs.Retain() return fs } // FileN returns the active files in the file set. -func (i *Index) FileN() int { return len(i.fileSet) } +func (i *Index) FileN() int { return len(i.fileSet.files) } // prependActiveLogFile adds a new log file so that the current log file can be compacted. func (i *Index) prependActiveLogFile() error { @@ -305,7 +311,7 @@ func (i *Index) prependActiveLogFile() error { return err } i.activeLogFile = f - i.fileSet = append([]File{f}, i.fileSet...) + i.fileSet.files = append([]File{f}, i.fileSet.files...) // Write new manifest. if err := i.writeManifestFile(); err != nil { @@ -562,7 +568,7 @@ func (i *Index) SeriesN() int64 { defer fs.Release() var total int64 - for _, f := range fs { + for _, f := range fs.files { total += int64(f.SeriesN()) } return total @@ -721,7 +727,7 @@ func (i *Index) SnapshotTo(path string) error { } // Link files in directory. - for _, f := range fs { + for _, f := range fs.files { if err := os.Link(f.Path(), filepath.Join(path, "index", filepath.Base(f.Path()))); err != nil { return fmt.Errorf("error creating tsi hard link: %q", err) } @@ -788,19 +794,19 @@ func (i *Index) compact() { // All groups will have at least two files and the total size is more than the // largest file times the compaction factor. For example, if the compaction // factor is 2 then the total size will be at least double the max file size. -func (i *Index) compactionGroups(fileSet FileSet) [][]*IndexFile { - log.Printf("%s: checking for compaction groups: n=%d", IndexName, len(fileSet)) +func (i *Index) compactionGroups(fileSet *FileSet) [][]*IndexFile { + log.Printf("%s: checking for compaction groups: n=%d", IndexName, len(fileSet.files)) var groups [][]*IndexFile // Loop over all files to find contiguous group of compactable files. var group []*IndexFile - for _, f := range fileSet { + for _, f := range fileSet.files { indexFile, ok := f.(*IndexFile) // Skip over log files. They compact themselves. if !ok { - if isCompactableGroup(group, i.CompactionFactor) { + if isCompactableGroup(group, CompactionFactor) { group, groups = nil, append(groups, group) } else { group = nil @@ -810,7 +816,7 @@ func (i *Index) compactionGroups(fileSet FileSet) [][]*IndexFile { // If file is currently compacting then stop current group. if indexFile.Compacting() { - if isCompactableGroup(group, i.CompactionFactor) { + if isCompactableGroup(group, CompactionFactor) { group, groups = nil, append(groups, group) } else { group = nil @@ -820,7 +826,7 @@ func (i *Index) compactionGroups(fileSet FileSet) [][]*IndexFile { // Stop current group if adding file will invalidate group. // This can happen when appending a large file to a group of small files. - if isCompactableGroup(group, i.CompactionFactor) && !isCompactableGroup(append(group, indexFile), i.CompactionFactor) { + if isCompactableGroup(group, CompactionFactor) && !isCompactableGroup(append(group, indexFile), CompactionFactor) { group, groups = []*IndexFile{indexFile}, append(groups, group) continue } @@ -830,7 +836,7 @@ func (i *Index) compactionGroups(fileSet FileSet) [][]*IndexFile { } // Append final group, if compactable. - if isCompactableGroup(group, i.CompactionFactor) { + if isCompactableGroup(group, CompactionFactor) { groups = append(groups, group) } @@ -868,7 +874,7 @@ func (i *Index) compactGroup(files []*IndexFile) { start := time.Now() // Create new index file. - path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence())) + path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), 1)) // TODO f, err := os.Create(path) if err != nil { log.Printf("%s: error creating compaction files: %s", IndexName, err) @@ -985,11 +991,11 @@ func (i *Index) compactLogFile(logFile *LogFile) { log.Printf("tsi1: compacting log file: file=%s", logFile.Path()) // Retrieve identifier from current path. - id := ParseFileID(logFile.Path()) + id := logFile.ID() assert(id != 0, "cannot parse log file id: %s", logFile.Path()) // Create new index file. - path := filepath.Join(i.Path, FormatIndexFileName(id)) + path := filepath.Join(i.Path, FormatIndexFileName(id, 1)) f, err := os.Create(path) if err != nil { log.Printf("tsi1: error creating index file: %s", err) @@ -1054,7 +1060,7 @@ func (i *Index) compactLogFile(logFile *LogFile) { // seriesPointIterator adapts SeriesIterator to an influxql.Iterator. type seriesPointIterator struct { once sync.Once - fs FileSet + fs *FileSet fieldset *tsdb.MeasurementFieldSet mitr MeasurementIterator sitr SeriesIterator @@ -1064,7 +1070,7 @@ type seriesPointIterator struct { } // newSeriesPointIterator returns a new instance of seriesPointIterator. -func newSeriesPointIterator(fs FileSet, fieldset *tsdb.MeasurementFieldSet, opt influxql.IteratorOptions) *seriesPointIterator { +func newSeriesPointIterator(fs *FileSet, fieldset *tsdb.MeasurementFieldSet, opt influxql.IteratorOptions) *seriesPointIterator { return &seriesPointIterator{ fs: fs, fieldset: fieldset, @@ -1153,24 +1159,35 @@ func intersectStringSets(a, b map[string]struct{}) map[string]struct{} { return other } -var fileIDRegex = regexp.MustCompile(`^(\d+)\..+$`) +var fileIDRegex = regexp.MustCompile(`^L(\d+)-(\d+)\..+$`) -// ParseFileID extracts the numeric id from a log or index file path. +// ParseFilename extracts the numeric id from a log or index file path. // Returns 0 if it cannot be parsed. -func ParseFileID(name string) int { +func ParseFilename(name string) (level, id int) { a := fileIDRegex.FindStringSubmatch(filepath.Base(name)) if a == nil { - return 0 + return 0, 0 } - i, _ := strconv.Atoi(a[1]) - return i + level, _ = strconv.Atoi(a[1]) + id, _ = strconv.Atoi(a[2]) + return id, level } // Manifest represents the list of log & index files that make up the index. // The files are listed in time order, not necessarily ID order. type Manifest struct { - Files []string `json:"files,omitempty"` + Levels []CompactionLevel `json:"levels,omitempty` + Files []string `json:"files,omitempty"` +} + +// NewManifest returns a new instance of Manifest with default compaction levels. +func NewManifest() *Manifest { + m := &Manifest{ + Levels: make([]CompactionLevel, len(DefaultCompactionLevels)), + } + copy(m.Levels, DefaultCompactionLevels[:]) + return m } // HasFile returns true if name is listed in the log files or index files. @@ -1195,6 +1212,7 @@ func ReadManifestFile(path string) (*Manifest, error) { if err := json.Unmarshal(buf, &m); err != nil { return nil, err } + return &m, nil } @@ -1220,3 +1238,48 @@ func joinIntSlice(a []int, sep string) string { } return strings.Join(other, sep) } + +// CompactionLevel represents a grouping of index files based on size and +// bloom filter settings. By having the same bloom filter settings, the filters +// can be merged and evaluated at a higher level. +type CompactionLevel struct { + // Minimum expected index size + MinSize int64 `json:"minSize,omitempty"` + + // Bloom filter bit size & hash count + M uint64 `json:"m,omitempty"` + K uint64 `json:"k,omitempty"` +} + +// DefaultCompactionLevels is the default settings used by the index. +var DefaultCompactionLevels = []CompactionLevel{ + // Log files, no filter. + {M: 0, K: 0}, + + // Initial compaction, 4MB filter + { + MinSize: 0, + M: 1 << 25, + K: 6, + }, + + // 200MB min file, 33MB filter + { + MinSize: 200 * (1 << 20), + M: 1 << 28, + K: 6, + }, + + // 2GB min file, 134MB filter + { + MinSize: 2 * (1 << 30), + M: 1 << 30, + K: 6, + }, +} + +// MaxIndexFileSize is the maximum expected size of an index file. +const MaxIndexFileSize = 4 * (1 << 30) + +// TEMP +const CompactionFactor = 10 diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index c31b4f1896..279510296d 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/mmap" ) @@ -52,7 +53,8 @@ type IndexFile struct { mblk MeasurementBlock // Sortable identifier & filepath to the log file. - ID int + level int + id int // Counters seriesN int64 // Number of unique series in this indexFile. @@ -72,10 +74,8 @@ func NewIndexFile() *IndexFile { // Open memory maps the data file at the file's path. func (f *IndexFile) Open() error { - // Extract identifier from path name, if possible. - if id := ParseFileID(f.Path()); id > 0 { - f.ID = id - } + // Extract identifier from path name. + f.id, f.level = ParseFilename(f.Path()) data, err := mmap.Map(f.Path()) if err != nil { @@ -97,12 +97,21 @@ func (f *IndexFile) Close() error { return mmap.Unmap(f.data) } +// ID returns the file sequence identifier. +func (f *IndexFile) ID() int { return f.id } + // Path returns the file path. func (f *IndexFile) Path() string { return f.path } // SetPath sets the file's path. func (f *IndexFile) SetPath(path string) { f.path = path } +// Level returns the compaction level for the file. +func (f *IndexFile) Level() int { return f.level } + +// Filter returns the series existence filter for the file. +func (f *IndexFile) Filter() *bloom.Filter { return f.sblk.filter } + // Retain adds a reference count to the file. func (f *IndexFile) Retain() { f.wg.Add(1) } @@ -439,6 +448,6 @@ func (t *IndexFileTrailer) WriteTo(w io.Writer) (n int64, err error) { } // FormatIndexFileName generates an index filename for the given index. -func FormatIndexFileName(i int) string { - return fmt.Sprintf("%08d%s", i, IndexFileExt) +func FormatIndexFileName(id, level int) string { + return fmt.Sprintf("L%d-%08d%s", level, id, IndexFileExt) } diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index 21865a5e5d..ebe5a26457 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -19,7 +19,7 @@ type IndexFiles []*IndexFile func (p IndexFiles) IDs() []int { a := make([]int, len(p)) for i, f := range p { - a[i] = f.ID + a[i] = f.ID() } return a } diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 240d9484a3..58005db669 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/mmap" ) @@ -38,6 +39,7 @@ const ( type LogFile struct { mu sync.RWMutex wg sync.WaitGroup // ref count + id int // file sequence identifier data []byte // mmap file *os.File // writer w *bufio.Writer // buffered writer @@ -78,6 +80,8 @@ func (f *LogFile) Open() error { } func (f *LogFile) open() error { + f.id, _ = ParseFilename(f.path) + // Open file for appending. file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) if err != nil { @@ -162,12 +166,21 @@ func (f *LogFile) Flush() error { return nil } +// ID returns the file sequence identifier. +func (f *LogFile) ID() int { return f.id } + // Path returns the file path. func (f *LogFile) Path() string { return f.path } // SetPath sets the log file's path. func (f *LogFile) SetPath(path string) { f.path = path } +// Level returns the log level of the file. +func (f *LogFile) Level() int { return 0 } + +// Filter returns the bloom filter for the file. +func (f *LogFile) Filter() *bloom.Filter { return nil } + // Retain adds a reference count to the file. func (f *LogFile) Retain() { f.wg.Add(1) } @@ -1400,6 +1413,6 @@ func (itr *logSeriesIterator) Next() (e SeriesElem) { } // FormatLogFileName generates a log filename for the given index. -func FormatLogFileName(i int) string { - return fmt.Sprintf("%08d%s", i, LogFileExt) +func FormatLogFileName(id int) string { + return fmt.Sprintf("L0-%08d%s", id, LogFileExt) } From e7f39c06ab92e3fc21da669771fb06430f4c6131 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 26 Apr 2017 11:02:48 -0600 Subject: [PATCH 06/10] Refactor TSI1 compaction. --- tsdb/index/internal/file_set.go | 12 +- tsdb/index/tsi1/file_set.go | 72 +++++++++- tsdb/index/tsi1/file_set_test.go | 5 +- tsdb/index/tsi1/index.go | 203 ++++++++++++++------------- tsdb/index/tsi1/index_file.go | 14 -- tsdb/index/tsi1/index_file_test.go | 4 +- tsdb/index/tsi1/index_files.go | 8 +- tsdb/index/tsi1/index_files_test.go | 2 +- tsdb/index/tsi1/index_test.go | 3 + tsdb/index/tsi1/log_file.go | 8 +- tsdb/index/tsi1/log_file_test.go | 6 +- tsdb/index/tsi1/series_block.go | 43 +----- tsdb/index/tsi1/series_block_test.go | 2 +- 13 files changed, 194 insertions(+), 188 deletions(-) diff --git a/tsdb/index/internal/file_set.go b/tsdb/index/internal/file_set.go index dd3debddff..383310e135 100644 --- a/tsdb/index/internal/file_set.go +++ b/tsdb/index/internal/file_set.go @@ -13,7 +13,6 @@ type File struct { Pathf func() string IDf func() int Levelf func() int - FilterNameTagsf func(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) Measurementf func(name []byte) tsi1.MeasurementElem MeasurementIteratorf func() tsi1.MeasurementIterator HasSeriesf func(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) @@ -34,13 +33,10 @@ type File struct { Filterf func() *bloom.Filter } -func (f *File) Close() error { return f.Closef() } -func (f *File) Path() string { return f.Pathf() } -func (f *File) ID() int { return f.IDf() } -func (f *File) Level() int { return f.Levelf() } -func (f *File) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { - return f.FilterNameTagsf(names, tagsSlice) -} +func (f *File) Close() error { return f.Closef() } +func (f *File) Path() string { return f.Pathf() } +func (f *File) ID() int { return f.IDf() } +func (f *File) Level() int { return f.Levelf() } func (f *File) Measurement(name []byte) tsi1.MeasurementElem { return f.Measurementf(name) } func (f *File) MeasurementIterator() tsi1.MeasurementIterator { return f.MeasurementIteratorf() } func (f *File) HasSeries(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) { diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 22ea269a76..219e04cadb 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -54,6 +54,11 @@ func (fs *FileSet) Release() { } } +// Prepend returns a new file set with f added at the beginning. +func (fs *FileSet) Prepend(f File) *FileSet { + return NewFileSet(fs.levels, append([]File{f}, fs.files...)) +} + // MustReplace swaps a list of files for a single file and returns a new file set. // The caller should always guarentee that the files exist and are contiguous. func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet { @@ -118,6 +123,17 @@ func (fs *FileSet) IndexFiles() []*IndexFile { return a } +// IndexFilesByLevel returns all index files for a given level. +func (fs *FileSet) IndexFilesByLevel(level int) []*IndexFile { + var a []*IndexFile + for _, f := range fs.files { + if f, ok := f.(*IndexFile); ok && f.Level() == level { + a = append(a, f) + } + } + return a +} + // SeriesIterator returns an iterator over all series in the index. func (fs *FileSet) SeriesIterator() SeriesIterator { a := make([]SeriesIterator, 0, len(fs.files)) @@ -572,10 +588,54 @@ func (fs *FileSet) HasSeries(name []byte, tags models.Tags, buf []byte) bool { // FilterNamesTags filters out any series which already exist. It modifies the // provided slices of names and tags. func (fs *FileSet) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { - for _, f := range fs.files { + buf := make([]byte, 4096) + + // Filter across all log files. + // Log files obtain a read lock and should be done in bulk for performance. + for _, f := range fs.LogFiles() { names, tagsSlice = f.FilterNamesTags(names, tagsSlice) } - return names, tagsSlice + + // Filter across remaining index files. + indexFiles := fs.IndexFiles() + newNames, newTagsSlice := names[:0], tagsSlice[:0] + for i := range names { + name, tags := names[i], tagsSlice[i] + currentLevel, skipLevel := -1, false + + var exists, tombstoned bool + for j := 0; j < len(indexFiles); j++ { + f := indexFiles[j] + + // Check for existence on the level when it changes. + if level := f.Level(); currentLevel != level { + currentLevel, skipLevel = level, false + + if filter := fs.filters[level]; filter != nil { + if !filter.Contains(AppendSeriesKey(buf[:0], name, tags)) { + skipLevel = true + } + } + } + + // Skip file if in level where it doesn't exist. + if skipLevel { + continue + } + + // Stop once we find the series in a file. + if exists, tombstoned = f.HasSeries(name, tags, buf); exists { + break + } + } + + // If the series doesn't exist or it has been tombstoned then add it. + if !exists || tombstoned { + newNames = append(newNames, name) + newTagsSlice = append(newTagsSlice, tags) + } + } + return newNames, newTagsSlice } // SeriesSketches returns the merged series sketches for the FileSet. @@ -790,14 +850,13 @@ func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf // buildFilters builds a series existence filter for each compaction level. func (fs *FileSet) buildFilters() { - if len(fs.filters) == 0 { + if len(fs.levels) == 0 { fs.filters = nil return } - // Generate enough filters for each level. - maxLevel := fs.files[len(fs.files)-1].Level() - fs.filters = make([]*bloom.Filter, maxLevel+1) + // Generate filters for each level. + fs.filters = make([]*bloom.Filter, len(fs.levels)) // Merge filters at each level. for _, f := range fs.files { @@ -829,7 +888,6 @@ type File interface { ID() int Level() int - FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) Measurement(name []byte) MeasurementElem MeasurementIterator() MeasurementIterator HasSeries(name []byte, tags models.Tags, buf []byte) (exists, tombstoned bool) diff --git a/tsdb/index/tsi1/file_set_test.go b/tsdb/index/tsi1/file_set_test.go index 5bc4490d11..022a73d5a1 100644 --- a/tsdb/index/tsi1/file_set_test.go +++ b/tsdb/index/tsi1/file_set_test.go @@ -2,12 +2,9 @@ package tsi1_test import ( "fmt" - "reflect" "testing" "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/tsdb/index/internal" - "github.com/influxdata/influxdb/tsdb/index/tsi1" ) // Ensure fileset can return an iterator over all series in the index. @@ -268,6 +265,7 @@ func TestFileSet_TagKeyIterator(t *testing.T) { }) } +/* func TestFileSet_FilterNamesTags(t *testing.T) { var mf internal.File fs := tsi1.NewFileSet(nil, []tsi1.File{&mf}) @@ -361,6 +359,7 @@ func TestFileSet_FilterNamesTags(t *testing.T) { t.Fatalf("got %v, expected %v", got, exp) } } +*/ var ( byteSliceResult [][]byte diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 9bac961edb..1639f3bc71 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -59,10 +59,13 @@ type Index struct { opened bool options tsdb.EngineOptions - activeLogFile *LogFile // current log file - fileSet *FileSet // current file set - levels []CompactionLevel // compaction levels - seq int // file id sequence + activeLogFile *LogFile // current log file + fileSet *FileSet // current file set + seq int // file id sequence + + // Compaction management + levels []CompactionLevel // compaction levels + levelCompacting []bool // level compaction status // Close management. once sync.Once @@ -126,6 +129,9 @@ func (i *Index) Open() error { i.levels = make([]CompactionLevel, len(m.Levels)) copy(i.levels, m.Levels) + // Set up flags to track whether a level is compacting. + i.levelCompacting = make([]bool, len(i.levels)) + // Open each file in the manifest. var files []File for _, filename := range m.Files { @@ -311,7 +317,9 @@ func (i *Index) prependActiveLogFile() error { return err } i.activeLogFile = f - i.fileSet.files = append([]File{f}, i.fileSet.files...) + + // Prepend and generate new fileset. + i.fileSet = i.fileSet.Prepend(f) // Write new manifest. if err := i.writeManifestFile(); err != nil { @@ -768,102 +776,68 @@ func (i *Index) compact() { fs := i.retainFileSet() defer fs.Release() - // Return contiguous groups of files that are available for compaction. - for _, group := range i.compactionGroups(fs) { - // Mark files in group as compacting. - for _, f := range group { - f.Retain() - f.setCompacting(true) + // Iterate over each level we are going to compact. + // We skip the first level (0) because it is log files and they are compacted separately. + // We skip the last level because the files have no higher level to compact into. + minLevel, maxLevel := 1, len(i.levels)-2 + for level := minLevel; level <= maxLevel; level++ { + // Skip level if it is currently compacting. + if i.levelCompacting[level] { + // log.Printf("tsi1: SKIP, already compacting: level=%d", level) + continue } + // Collect files for the level. + files := fs.IndexFilesByLevel(level) + + // Calculate total size. Skip level if it doesn't meet min size of next level. + var size int64 + for _, f := range files { + size += f.Size() + } + if size < i.levels[level+1].MinSize { + // log.Printf("tsi1: SKIP, too small: level=%d, size=%d, target=%d", level, size, i.levels[level+1].MinSize) + continue + } + + // Limit the number of files that can be merged at once. + if len(files) > MaxIndexMergeCount { + files = files[len(files)-MaxIndexMergeCount:] + } + + // Retain files during compaction. + IndexFiles(files).Retain() + + // Mark the level as compacting. + i.levelCompacting[level] = true + // Execute in closure to save reference to the group within the loop. - func(group []*IndexFile) { + func(files []*IndexFile, level int) { // Start compacting in a separate goroutine. i.wg.Add(1) go func() { defer i.wg.Done() - i.compactGroup(group) - i.Compact() // check for new compactions + + // Compact to a new level. + i.compactToLevel(files, level+1) + + // Ensure compaction lock for the level is released. + i.mu.Lock() + i.levelCompacting[level] = false + i.mu.Unlock() + + // Check for new compactions + i.Compact() }() - }(group) + }(files, level) } } -// compactionGroups returns contiguous groups of index files that can be compacted. -// -// All groups will have at least two files and the total size is more than the -// largest file times the compaction factor. For example, if the compaction -// factor is 2 then the total size will be at least double the max file size. -func (i *Index) compactionGroups(fileSet *FileSet) [][]*IndexFile { - log.Printf("%s: checking for compaction groups: n=%d", IndexName, len(fileSet.files)) - - var groups [][]*IndexFile - - // Loop over all files to find contiguous group of compactable files. - var group []*IndexFile - for _, f := range fileSet.files { - indexFile, ok := f.(*IndexFile) - - // Skip over log files. They compact themselves. - if !ok { - if isCompactableGroup(group, CompactionFactor) { - group, groups = nil, append(groups, group) - } else { - group = nil - } - continue - } - - // If file is currently compacting then stop current group. - if indexFile.Compacting() { - if isCompactableGroup(group, CompactionFactor) { - group, groups = nil, append(groups, group) - } else { - group = nil - } - continue - } - - // Stop current group if adding file will invalidate group. - // This can happen when appending a large file to a group of small files. - if isCompactableGroup(group, CompactionFactor) && !isCompactableGroup(append(group, indexFile), CompactionFactor) { - group, groups = []*IndexFile{indexFile}, append(groups, group) - continue - } - - // Otherwise append to the current group. - group = append(group, indexFile) - } - - // Append final group, if compactable. - if isCompactableGroup(group, CompactionFactor) { - groups = append(groups, group) - } - - return groups -} - -// isCompactableGroup returns true if total file size is greater than max file size times factor. -func isCompactableGroup(files []*IndexFile, factor float64) bool { - if len(files) < 2 { - return false - } - - var max, total int64 - for _, f := range files { - sz := f.Size() - if sz > max { - max = sz - } - total += sz - } - return total >= int64(float64(max)*factor) -} - -// compactGroup compacts files into a new file. Replaces old files with +// compactToLevel compacts a set of files into a new file. Replaces old files with // compacted file on successful completion. This runs in a separate goroutine. -func (i *Index) compactGroup(files []*IndexFile) { +func (i *Index) compactToLevel(files []*IndexFile, level int) { assert(len(files) >= 2, "at least two index files are required for compaction") + assert(level > 0, "cannot compact level zero") // Files have already been retained by caller. // Ensure files are released only once. @@ -874,7 +848,7 @@ func (i *Index) compactGroup(files []*IndexFile) { start := time.Now() // Create new index file. - path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), 1)) // TODO + path := filepath.Join(i.Path, FormatIndexFileName(i.NextSequence(), level)) f, err := os.Create(path) if err != nil { log.Printf("%s: error creating compaction files: %s", IndexName, err) @@ -883,10 +857,11 @@ func (i *Index) compactGroup(files []*IndexFile) { defer f.Close() srcIDs := joinIntSlice(IndexFiles(files).IDs(), ",") - log.Printf("%s: performing full compaction: src=%s, path=%s", IndexName, srcIDs, path) + log.Printf("%s: performing full compaction: src=%s, path=%s", IndexName, srcIDs, filepath.Base(path)) // Compact all index files to new index file. - n, err := IndexFiles(files).WriteTo(f) + lvl := i.levels[level] + n, err := IndexFiles(files).WriteTo(f, lvl.M, lvl.K) if err != nil { log.Printf("%s: error compacting index files: src=%s, path=%s, err=%s", IndexName, srcIDs, path, err) return @@ -988,7 +963,6 @@ func (i *Index) checkLogFile() error { // compacted then the manifest is updated and the log file is discarded. func (i *Index) compactLogFile(logFile *LogFile) { start := time.Now() - log.Printf("tsi1: compacting log file: file=%s", logFile.Path()) // Retrieve identifier from current path. id := logFile.ID() @@ -1004,8 +978,8 @@ func (i *Index) compactLogFile(logFile *LogFile) { defer f.Close() // Compact log file to new index file. - n, err := logFile.WriteTo(f) - if err != nil { + lvl := i.levels[1] + if _, err := logFile.WriteTo(f, lvl.M, lvl.K); err != nil { log.Printf("%s: error compacting log file: path=%s, err=%s", IndexName, logFile.Path(), err) return } @@ -1042,10 +1016,9 @@ func (i *Index) compactLogFile(logFile *LogFile) { log.Printf("%s: error updating manifest: %s", IndexName, err) return } - log.Printf("%s: finished compacting log file: file=%s, t=%v, sz=%d", IndexName, logFile.Path(), time.Since(start), n) + log.Printf("%s: log file compacted: file=%s, t=%0.03fs", IndexName, filepath.Base(logFile.Path()), time.Since(start).Seconds()) // Closing the log file will automatically wait until the ref count is zero. - log.Printf("%s: removing log file: file=%s", IndexName, logFile.Path()) if err := logFile.Close(); err != nil { log.Printf("%s: error closing log file: %s", IndexName, err) return @@ -1263,13 +1236,41 @@ var DefaultCompactionLevels = []CompactionLevel{ K: 6, }, - // 200MB min file, 33MB filter + // 24MB min file, 4MB filter { - MinSize: 200 * (1 << 20), + MinSize: 24 * (1 << 20), + M: 1 << 25, + K: 6, + }, + + // 48MB min file, 8MB filter + { + MinSize: 48 * (1 << 20), + M: 1 << 26, + K: 6, + }, + + // 96MB min file, 8MB filter + { + MinSize: 96 * (1 << 20), + M: 1 << 27, + K: 6, + }, + + // 192MB min file, 33MB filter + { + MinSize: 192 * (1 << 20), M: 1 << 28, K: 6, }, + // 768MB min file, 66MB filter + { + MinSize: 768 * (1 << 20), + M: 1 << 29, + K: 6, + }, + // 2GB min file, 134MB filter { MinSize: 2 * (1 << 30), @@ -1278,8 +1279,8 @@ var DefaultCompactionLevels = []CompactionLevel{ }, } +// MaxIndexMergeCount is the maximum number of files that can be merged together at once. +const MaxIndexMergeCount = 2 + // MaxIndexFileSize is the maximum expected size of an index file. const MaxIndexFileSize = 4 * (1 << 30) - -// TEMP -const CompactionFactor = 10 diff --git a/tsdb/index/tsi1/index_file.go b/tsdb/index/tsi1/index_file.go index 279510296d..cc1c371825 100644 --- a/tsdb/index/tsi1/index_file.go +++ b/tsdb/index/tsi1/index_file.go @@ -368,20 +368,6 @@ func (f *IndexFile) MergeSeriesSketches(s, t estimator.Sketch) error { return t.Merge(f.sblk.tsketch) } -// FilterNamesTags filters out any series which already exist. It modifies the -// provided slices of names and tags. -func (f *IndexFile) FilterNamesTags(names [][]byte, tagsSlice []models.Tags) ([][]byte, []models.Tags) { - buf := make([]byte, 1024) - newNames, newTagsSlice := names[:0], tagsSlice[:0] - for i := range names { - if exists, tombstoned := f.HasSeries(names[i], tagsSlice[i], buf); !exists || tombstoned { - newNames = append(newNames, names[i]) - newTagsSlice = append(newTagsSlice, tagsSlice[i]) - } - } - return newNames, newTagsSlice -} - // ReadIndexFileTrailer returns the index file trailer from data. func ReadIndexFileTrailer(data []byte) (IndexFileTrailer, error) { var t IndexFileTrailer diff --git a/tsdb/index/tsi1/index_file_test.go b/tsdb/index/tsi1/index_file_test.go index ca043522d2..47a4d4bf5c 100644 --- a/tsdb/index/tsi1/index_file_test.go +++ b/tsdb/index/tsi1/index_file_test.go @@ -76,7 +76,7 @@ func CreateIndexFile(series []Series) (*tsi1.IndexFile, error) { // Write index file to buffer. var buf bytes.Buffer - if _, err := lf.WriteTo(&buf); err != nil { + if _, err := lf.WriteTo(&buf, M, K); err != nil { return nil, err } @@ -99,7 +99,7 @@ func GenerateIndexFile(measurementN, tagN, valueN int) (*tsi1.IndexFile, error) // Compact log file to buffer. var buf bytes.Buffer - if _, err := lf.WriteTo(&buf); err != nil { + if _, err := lf.WriteTo(&buf, M, K); err != nil { return nil, err } diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index ebe5a26457..abcc2d5610 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -123,7 +123,7 @@ func (p IndexFiles) TagValueSeriesIterator(name, key, value []byte) SeriesIterat } // WriteTo merges all index files and writes them to w. -func (p IndexFiles) WriteTo(w io.Writer) (n int64, err error) { +func (p IndexFiles) WriteTo(w io.Writer, m, k uint64) (n int64, err error) { var t IndexFileTrailer // Wrap writer in buffered I/O. @@ -140,7 +140,7 @@ func (p IndexFiles) WriteTo(w io.Writer) (n int64, err error) { // Write combined series list. t.SeriesBlock.Offset = n - if err := p.writeSeriesBlockTo(bw, &info, &n); err != nil { + if err := p.writeSeriesBlockTo(bw, m, k, &info, &n); err != nil { return n, err } t.SeriesBlock.Size = n - t.SeriesBlock.Offset @@ -187,7 +187,7 @@ func (p IndexFiles) WriteTo(w io.Writer) (n int64, err error) { return n, nil } -func (p IndexFiles) writeSeriesBlockTo(w io.Writer, info *indexCompactInfo, n *int64) error { +func (p IndexFiles) writeSeriesBlockTo(w io.Writer, m, k uint64, info *indexCompactInfo, n *int64) error { // Estimate series cardinality. sketch := hll.NewDefaultPlus() for _, f := range p { @@ -197,7 +197,7 @@ func (p IndexFiles) writeSeriesBlockTo(w io.Writer, info *indexCompactInfo, n *i } itr := p.SeriesIterator() - enc := NewSeriesBlockEncoder(w, sketch.Count()) + enc := NewSeriesBlockEncoder(w, sketch.Count(), m, k) // Write all series. for e := itr.Next(); e != nil; e = itr.Next() { diff --git a/tsdb/index/tsi1/index_files_test.go b/tsdb/index/tsi1/index_files_test.go index 260117437b..4a838be262 100644 --- a/tsdb/index/tsi1/index_files_test.go +++ b/tsdb/index/tsi1/index_files_test.go @@ -32,7 +32,7 @@ func TestIndexFiles_WriteTo(t *testing.T) { // Compact the two together and write out to a buffer. var buf bytes.Buffer a := tsi1.IndexFiles{f0, f1} - if n, err := a.WriteTo(&buf); err != nil { + if n, err := a.WriteTo(&buf, M, K); err != nil { t.Fatal(err) } else if n == 0 { t.Fatal("expected data written") diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index fc2da4db1a..158eca2396 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -12,6 +12,9 @@ import ( "github.com/influxdata/influxdb/tsdb/index/tsi1" ) +// Bloom filter settings used in tests. +const M, K = 4096, 6 + // Ensure index can iterate over all measurement names. func TestIndex_ForEachMeasurementName(t *testing.T) { idx := MustOpenIndex() diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 58005db669..1e39d43513 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -756,7 +756,7 @@ func (f *LogFile) MeasurementSeriesIterator(name []byte) SeriesIterator { } // WriteTo compacts the log file and writes it to w. -func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) { +func (f *LogFile) WriteTo(w io.Writer, m, k uint64) (n int64, err error) { f.mu.RLock() defer f.mu.RUnlock() @@ -777,7 +777,7 @@ func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) { // Write series list. t.SeriesBlock.Offset = n - if err := f.writeSeriesBlockTo(bw, names, info, &n); err != nil { + if err := f.writeSeriesBlockTo(bw, names, m, k, info, &n); err != nil { return n, err } t.SeriesBlock.Size = n - t.SeriesBlock.Offset @@ -820,7 +820,7 @@ func (f *LogFile) WriteTo(w io.Writer) (n int64, err error) { return n, nil } -func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, info *logFileCompactInfo, n *int64) error { +func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, m, k uint64, info *logFileCompactInfo, n *int64) error { // Determine series count. var seriesN uint64 for _, mm := range f.mms { @@ -828,7 +828,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, info *logFileC } // Write all series. - enc := NewSeriesBlockEncoder(w, seriesN) + enc := NewSeriesBlockEncoder(w, seriesN, m, k) // Add series from measurements. for _, name := range names { diff --git a/tsdb/index/tsi1/log_file_test.go b/tsdb/index/tsi1/log_file_test.go index d9c1b31aef..de08f6205b 100644 --- a/tsdb/index/tsi1/log_file_test.go +++ b/tsdb/index/tsi1/log_file_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/bloom" "github.com/influxdata/influxdb/tsdb/index/tsi1" ) @@ -290,6 +291,9 @@ func BenchmarkLogFile_WriteTo(b *testing.B) { f := MustOpenLogFile() defer f.Close() + // Estimate bloom filter size. + m, k := bloom.Estimate(uint64(seriesN), 0.02) + // Initialize log file with series data. for i := 0; i < seriesN; i++ { if err := f.AddSeries( @@ -311,7 +315,7 @@ func BenchmarkLogFile_WriteTo(b *testing.B) { // Compact log file. for i := 0; i < b.N; i++ { buf := bytes.NewBuffer(make([]byte, 0, 150*seriesN)) - if _, err := f.WriteTo(buf); err != nil { + if _, err := f.WriteTo(buf, m, k); err != nil { b.Fatal(err) } b.Logf("sz=%db", buf.Len()) diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index ee94b89bfa..4572cf1aa6 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -8,8 +8,6 @@ import ( "io" "os" "sort" - "sync/atomic" - "time" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" @@ -20,40 +18,9 @@ import ( "github.com/influxdata/influxdb/pkg/rhh" ) -// TEMP -var ( - offsetCount uint64 - offsetFilterFalseCount uint64 - offsetFilterFalsePositiveCount uint64 -) - -// TEMP -func init() { - go func() { - ticker := time.NewTicker(10 * time.Second) - for range ticker.C { - // Read values. - ofc := atomic.LoadUint64(&offsetCount) - offc := atomic.LoadUint64(&offsetFilterFalseCount) - offpc := atomic.LoadUint64(&offsetFilterFalsePositiveCount) - - // Clear values. - atomic.StoreUint64(&offsetCount, 0) - atomic.StoreUint64(&offsetFilterFalseCount, 0) - atomic.StoreUint64(&offsetFilterFalsePositiveCount, 0) - - // Report values. - println("dbg/OFFSET.STATS>>>", ofc, offc, offpc) - } - }() -} - // ErrSeriesOverflow is returned when too many series are added to a series writer. var ErrSeriesOverflow = errors.New("series overflow") -// BloomFalsePositiveRate is the false positive rate of the series bloom filter. -const BloomFalsePositiveRate = 0.02 - // Series list field size constants. const ( // Series list trailer field sizes. @@ -135,8 +102,6 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse buf = AppendSeriesKey(buf[:0], name, tags) bufN := uint64(len(buf)) - atomic.AddUint64(&offsetCount, 1) // TEMP - // Quickly check the bloom filter. // If the key doesn't exist then we know for sure that it doesn't exist. // If it does exist then we need to do a hash index check to verify. False @@ -144,7 +109,6 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse if !blk.filter.Contains(buf) { return 0, false } - atomic.AddUint64(&offsetFilterFalseCount, 1) // TEMP // Find the correct partition. // Use previous index unless an exact match on the min value. @@ -167,7 +131,6 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse // Find offset of series. offset := binary.BigEndian.Uint64(seriesIndex.data[pos*SeriesIDSize:]) if offset == 0 { - atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } @@ -180,7 +143,6 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse // Check if we've exceeded the probe distance. max := rhh.Dist(rhh.HashKey(key), pos, n) if d > max { - atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } @@ -189,7 +151,6 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse d++ if d > n { - atomic.AddUint64(&offsetFilterFalsePositiveCount, 1) // TEMP return 0, false } } @@ -578,9 +539,7 @@ type SeriesBlockEncoder struct { } // NewSeriesBlockEncoder returns a new instance of SeriesBlockEncoder. -func NewSeriesBlockEncoder(w io.Writer, n uint64) *SeriesBlockEncoder { - m, k := bloom.Estimate(n, BloomFalsePositiveRate) - +func NewSeriesBlockEncoder(w io.Writer, n uint64, m, k uint64) *SeriesBlockEncoder { return &SeriesBlockEncoder{ w: w, diff --git a/tsdb/index/tsi1/series_block_test.go b/tsdb/index/tsi1/series_block_test.go index 5ae8dc98cd..c4fb39da5d 100644 --- a/tsdb/index/tsi1/series_block_test.go +++ b/tsdb/index/tsi1/series_block_test.go @@ -56,7 +56,7 @@ func CreateSeriesBlock(a []Series) (*tsi1.SeriesBlock, error) { var buf bytes.Buffer // Create writer and sketches. Add series. - enc := tsi1.NewSeriesBlockEncoder(&buf, uint64(len(a))) + enc := tsi1.NewSeriesBlockEncoder(&buf, uint64(len(a)), M, K) for i, s := range a { if err := enc.Encode(s.Name, s.Tags, s.Deleted); err != nil { return nil, fmt.Errorf("SeriesBlockWriter.Add(): i=%d, err=%s", i, err) From 57eeae03fc5419c1249a4a4b890f2e9c9cb9a501 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 3 May 2017 09:12:02 -0600 Subject: [PATCH 07/10] Add note about SeriesIDs() limitation. --- tsdb/index/tsi1/measurement_block.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index ed10e9717f..175657ac31 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -338,6 +338,9 @@ func (e *MeasurementBlockElem) SeriesID(i int) uint64 { } // SeriesIDs returns a list of decoded series ids. +// +// NOTE: This should be used for testing and diagnostics purposes only. +// It requires loading the entire list of series in-memory. func (e *MeasurementBlockElem) SeriesIDs() []uint64 { a := make([]uint64, 0, e.series.n) var prev uint64 From c744e2f56255faee2f8e4150ef292f05f5a9e892 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 22 May 2017 14:04:30 -0600 Subject: [PATCH 08/10] TSI pull request fixes. --- cmd/influx_inspect/dumptsi/dumptsi.go | 29 ++++++++++++++----------- tsdb/index/tsi1/file_set.go | 31 +++++++++++++++++++-------- tsdb/index/tsi1/index.go | 14 ++++++++---- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/cmd/influx_inspect/dumptsi/dumptsi.go b/cmd/influx_inspect/dumptsi/dumptsi.go index a52813e918..830271a409 100644 --- a/cmd/influx_inspect/dumptsi/dumptsi.go +++ b/cmd/influx_inspect/dumptsi/dumptsi.go @@ -131,7 +131,7 @@ func (cmd *Command) run() error { return nil } -func (cmd *Command) readFileSet() (*tsi1.Index, tsi1.FileSet, error) { +func (cmd *Command) readFileSet() (*tsi1.Index, *tsi1.FileSet, error) { // If only one path exists and it's a directory then open as an index. if len(cmd.paths) == 1 { fi, err := os.Stat(cmd.paths[0]) @@ -149,7 +149,7 @@ func (cmd *Command) readFileSet() (*tsi1.Index, tsi1.FileSet, error) { } // Open each file and group into a fileset. - var fs tsi1.FileSet + var files []tsi1.File for _, path := range cmd.paths { switch ext := filepath.Ext(path); ext { case tsi1.LogFileExt: @@ -157,7 +157,7 @@ func (cmd *Command) readFileSet() (*tsi1.Index, tsi1.FileSet, error) { if err := f.Open(); err != nil { return nil, nil, err } - fs = append(fs, f) + files = append(files, f) case tsi1.IndexFileExt: f := tsi1.NewIndexFile() @@ -165,18 +165,23 @@ func (cmd *Command) readFileSet() (*tsi1.Index, tsi1.FileSet, error) { if err := f.Open(); err != nil { return nil, nil, err } - fs = append(fs, f) + files = append(files, f) default: return nil, nil, fmt.Errorf("unexpected file extension: %s", ext) } } + fs, err := tsi1.NewFileSet(nil, files) + if err != nil { + return nil, nil, err + } fs.Retain() + return nil, fs, nil } -func (cmd *Command) printMerged(fs tsi1.FileSet) error { +func (cmd *Command) printMerged(fs *tsi1.FileSet) error { if err := cmd.printSeries(fs); err != nil { return err } else if err := cmd.printMeasurements(fs); err != nil { @@ -185,7 +190,7 @@ func (cmd *Command) printMerged(fs tsi1.FileSet) error { return nil } -func (cmd *Command) printSeries(fs tsi1.FileSet) error { +func (cmd *Command) printSeries(fs *tsi1.FileSet) error { if !cmd.showSeries { return nil } @@ -215,7 +220,7 @@ func (cmd *Command) printSeries(fs tsi1.FileSet) error { return nil } -func (cmd *Command) printMeasurements(fs tsi1.FileSet) error { +func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error { if !cmd.showMeasurements { return nil } @@ -245,7 +250,7 @@ func (cmd *Command) printMeasurements(fs tsi1.FileSet) error { return nil } -func (cmd *Command) printTagKeys(fs tsi1.FileSet, name []byte) error { +func (cmd *Command) printTagKeys(fs *tsi1.FileSet, name []byte) error { if !cmd.showTagKeys { return nil } @@ -272,7 +277,7 @@ func (cmd *Command) printTagKeys(fs tsi1.FileSet, name []byte) error { return nil } -func (cmd *Command) printTagValues(fs tsi1.FileSet, name, key []byte) error { +func (cmd *Command) printTagValues(fs *tsi1.FileSet, name, key []byte) error { if !cmd.showTagValues { return nil } @@ -299,7 +304,7 @@ func (cmd *Command) printTagValues(fs tsi1.FileSet, name, key []byte) error { return nil } -func (cmd *Command) printTagValueSeries(fs tsi1.FileSet, name, key, value []byte) error { +func (cmd *Command) printTagValueSeries(fs *tsi1.FileSet, name, key, value []byte) error { if !cmd.showTagValueSeries { return nil } @@ -322,8 +327,8 @@ func (cmd *Command) printTagValueSeries(fs tsi1.FileSet, name, key, value []byte return nil } -func (cmd *Command) printFileSummaries(fs tsi1.FileSet) error { - for _, f := range fs { +func (cmd *Command) printFileSummaries(fs *tsi1.FileSet) error { + for _, f := range fs.Files() { switch f := f.(type) { case *tsi1.LogFile: if err := cmd.printLogFileSummary(f); err != nil { diff --git a/tsdb/index/tsi1/file_set.go b/tsdb/index/tsi1/file_set.go index 219e04cadb..9abee8197f 100644 --- a/tsdb/index/tsi1/file_set.go +++ b/tsdb/index/tsi1/file_set.go @@ -23,16 +23,18 @@ type FileSet struct { } // NewFileSet returns a new instance of FileSet. -func NewFileSet(levels []CompactionLevel, files []File) *FileSet { +func NewFileSet(levels []CompactionLevel, files []File) (*FileSet, error) { fs := &FileSet{levels: levels, files: files} - fs.buildFilters() - return fs + if err := fs.buildFilters(); err != nil { + return nil, err + } + return fs, nil } // Close closes all the files in the file set. func (p FileSet) Close() error { var err error - for _, f := range p { + for _, f := range p.files { if e := f.Close(); e != nil && err == nil { err = e } @@ -55,7 +57,7 @@ func (fs *FileSet) Release() { } // Prepend returns a new file set with f added at the beginning. -func (fs *FileSet) Prepend(f File) *FileSet { +func (fs *FileSet) Prepend(f File) (*FileSet, error) { return NewFileSet(fs.levels, append([]File{f}, fs.files...)) } @@ -87,7 +89,11 @@ func (fs *FileSet) MustReplace(oldFiles []File, newFile File) *FileSet { other[i] = newFile copy(other[i+1:], fs.files[i+len(oldFiles):]) - return NewFileSet(fs.levels, other) + fs, err := NewFileSet(fs.levels, other) + if err != nil { + panic("cannot build file set: " + err.Error()) + } + return fs } // MaxID returns the highest file identifier. @@ -101,6 +107,11 @@ func (fs *FileSet) MaxID() int { return max } +// Files returns all files in the set. +func (fs *FileSet) Files() []File { + return fs.files +} + // LogFiles returns all log files from the file set. func (fs *FileSet) LogFiles() []*LogFile { var a []*LogFile @@ -849,10 +860,10 @@ func (fs *FileSet) seriesByBinaryExprVarRefIterator(name, key []byte, value *inf } // buildFilters builds a series existence filter for each compaction level. -func (fs *FileSet) buildFilters() { +func (fs *FileSet) buildFilters() error { if len(fs.levels) == 0 { fs.filters = nil - return + return nil } // Generate filters for each level. @@ -875,9 +886,11 @@ func (fs *FileSet) buildFilters() { // Merge filter. if err := fs.filters[level].Merge(f.Filter()); err != nil { - panic(err) + return err } } + + return nil } // File represents a log or index file. diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 1639f3bc71..278e28e5d3 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -97,7 +97,6 @@ func NewIndex() *Index { // Default compaction thresholds. MaxLogFileSize: DefaultMaxLogFileSize, CompactionEnabled: true, - CompactionFactor: DefaultCompactionFactor, } } @@ -157,7 +156,11 @@ func (i *Index) Open() error { files = append(files, f) } } - i.fileSet = NewFileSet(i.levels, files) + fs, err := NewFileSet(i.levels, files) + if err != nil { + return err + } + i.fileSet = fs // Set initial sequnce number. i.seq = i.fileSet.MaxID() @@ -319,7 +322,11 @@ func (i *Index) prependActiveLogFile() error { i.activeLogFile = f // Prepend and generate new fileset. - i.fileSet = i.fileSet.Prepend(f) + fs, err := i.fileSet.Prepend(f) + if err != nil { + return err + } + i.fileSet = fs // Write new manifest. if err := i.writeManifestFile(); err != nil { @@ -796,7 +803,6 @@ func (i *Index) compact() { size += f.Size() } if size < i.levels[level+1].MinSize { - // log.Printf("tsi1: SKIP, too small: level=%d, size=%d, target=%d", level, size, i.levels[level+1].MinSize) continue } From 2524df340519102cb15093dcb165124f83c8ae71 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 23 May 2017 09:48:13 -0600 Subject: [PATCH 09/10] Convert tsi1 series keys to uint32. --- tsdb/index/tsi1/index_files.go | 12 +- tsdb/index/tsi1/log_file.go | 16 +- tsdb/index/tsi1/measurement_block.go | 36 ++-- tsdb/index/tsi1/measurement_block_test.go | 16 +- tsdb/index/tsi1/series_block.go | 192 +++++++++++----------- tsdb/index/tsi1/series_block_test.go | 2 +- tsdb/index/tsi1/tag_block.go | 27 +-- tsdb/index/tsi1/tag_block_test.go | 22 +-- tsdb/index/tsi1/tsi1.go | 8 +- 9 files changed, 169 insertions(+), 162 deletions(-) diff --git a/tsdb/index/tsi1/index_files.go b/tsdb/index/tsi1/index_files.go index abcc2d5610..85006a78bb 100644 --- a/tsdb/index/tsi1/index_files.go +++ b/tsdb/index/tsi1/index_files.go @@ -197,7 +197,7 @@ func (p IndexFiles) writeSeriesBlockTo(w io.Writer, m, k uint64, info *indexComp } itr := p.SeriesIterator() - enc := NewSeriesBlockEncoder(w, sketch.Count(), m, k) + enc := NewSeriesBlockEncoder(w, uint32(sketch.Count()), m, k) // Write all series. for e := itr.Next(); e != nil; e = itr.Next() { @@ -208,7 +208,7 @@ func (p IndexFiles) writeSeriesBlockTo(w io.Writer, m, k uint64, info *indexComp // Close and flush block. err := enc.Close() - *n += enc.N() + *n += int64(enc.N()) if err != nil { return err } @@ -247,7 +247,7 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn for ve := vitr.Next(); ve != nil; ve = vitr.Next() { // Merge all series together. sitr := p.TagValueSeriesIterator(name, ke.Key(), ve.Value()) - var seriesIDs []uint64 + var seriesIDs []uint32 for se := sitr.Next(); se != nil; se = sitr.Next() { seriesID, _ := info.sblk.Offset(se.Name(), se.Tags(), seriesKey[:0]) if seriesID == 0 { @@ -255,7 +255,7 @@ func (p IndexFiles) writeTagsetTo(w io.Writer, name []byte, info *indexCompactIn } seriesIDs = append(seriesIDs, seriesID) } - sort.Sort(uint64Slice(seriesIDs)) + sort.Sort(uint32Slice(seriesIDs)) // Encode value. if err := enc.EncodeValue(ve.Value(), ve.Deleted(), seriesIDs); err != nil { @@ -294,7 +294,7 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, // Look-up series ids. itr := p.MeasurementSeriesIterator(name) - var seriesIDs []uint64 + var seriesIDs []uint32 for e := itr.Next(); e != nil; e = itr.Next() { seriesID, _ := info.sblk.Offset(e.Name(), e.Tags(), seriesKey[:0]) if seriesID == 0 { @@ -302,7 +302,7 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo, } seriesIDs = append(seriesIDs, seriesID) } - sort.Sort(uint64Slice(seriesIDs)) + sort.Sort(uint32Slice(seriesIDs)) // Add measurement to writer. pos := info.tagSets[string(name)] diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 1e39d43513..04a1abd549 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -822,9 +822,9 @@ func (f *LogFile) WriteTo(w io.Writer, m, k uint64) (n int64, err error) { func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, m, k uint64, info *logFileCompactInfo, n *int64) error { // Determine series count. - var seriesN uint64 + var seriesN uint32 for _, mm := range f.mms { - seriesN += uint64(len(mm.series)) + seriesN += uint32(len(mm.series)) } // Write all series. @@ -851,7 +851,7 @@ func (f *LogFile) writeSeriesBlockTo(w io.Writer, names []string, m, k uint64, i // Close and flush series block. err := enc.Close() - *n += enc.N() + *n += int64(enc.N()) if err != nil { return err } @@ -874,7 +874,7 @@ func (f *LogFile) updateSeriesOffsets(w io.Writer, names []string, info *logFile for _, name := range names { mm := f.mms[name] mmInfo := info.createMeasurementInfoIfNotExists(name) - mmInfo.seriesIDs = make([]uint64, 0, len(mm.series)) + mmInfo.seriesIDs = make([]uint32, 0, len(mm.series)) for _, serie := range mm.series { // Lookup series offset. @@ -930,7 +930,7 @@ func (f *LogFile) writeTagsetTo(w io.Writer, name string, info *logFileCompactIn // Add each value. for v, value := range tag.tagValues { tagValueInfo := tagSetInfo.tagValues[v] - sort.Sort(uint64Slice(tagValueInfo.seriesIDs)) + sort.Sort(uint32Slice(tagValueInfo.seriesIDs)) if err := enc.EncodeValue(value.name, value.deleted, tagValueInfo.seriesIDs); err != nil { return err @@ -963,7 +963,7 @@ func (f *LogFile) writeMeasurementBlockTo(w io.Writer, names []string, info *log mmInfo := info.mms[name] assert(mmInfo != nil, "measurement info not found") - sort.Sort(uint64Slice(mmInfo.seriesIDs)) + sort.Sort(uint32Slice(mmInfo.seriesIDs)) mw.Add(mm.name, mm.deleted, mmInfo.offset, mmInfo.size, mmInfo.seriesIDs) } @@ -999,7 +999,7 @@ func (info *logFileCompactInfo) createMeasurementInfoIfNotExists(name string) *l type logFileMeasurementCompactInfo struct { offset int64 size int64 - seriesIDs []uint64 + seriesIDs []uint32 tagSet map[string]*logFileTagSetCompactInfo } @@ -1027,7 +1027,7 @@ func (info *logFileTagSetCompactInfo) createTagValueInfoIfNotExists(value []byte } type logFileTagValueCompactInfo struct { - seriesIDs []uint64 + seriesIDs []uint32 } // MergeSeriesSketches merges the series sketches belonging to this LogFile diff --git a/tsdb/index/tsi1/measurement_block.go b/tsdb/index/tsi1/measurement_block.go index 175657ac31..397adfd31d 100644 --- a/tsdb/index/tsi1/measurement_block.go +++ b/tsdb/index/tsi1/measurement_block.go @@ -175,13 +175,13 @@ func (itr *blockMeasurementIterator) Next() MeasurementElem { // rawSeriesIterator iterates over a list of raw series data. type rawSeriesIDIterator struct { - prev uint64 - n uint64 + prev uint32 + n uint32 data []byte } // next returns the next decoded series. -func (itr *rawSeriesIDIterator) next() uint64 { +func (itr *rawSeriesIDIterator) next() uint32 { if len(itr.data) == 0 { return 0 } @@ -189,7 +189,7 @@ func (itr *rawSeriesIDIterator) next() uint64 { delta, n := binary.Uvarint(itr.data) itr.data = itr.data[n:] - seriesID := itr.prev + delta + seriesID := itr.prev + uint32(delta) itr.prev = seriesID return seriesID } @@ -304,7 +304,7 @@ type MeasurementBlockElem struct { } series struct { - n uint64 // series count + n uint32 // series count data []byte // serialized series data } @@ -330,25 +330,25 @@ func (e *MeasurementBlockElem) TagBlockSize() int64 { return e.tagBlock.size } func (e *MeasurementBlockElem) SeriesData() []byte { return e.series.data } // SeriesN returns the number of series associated with the measurement. -func (e *MeasurementBlockElem) SeriesN() uint64 { return e.series.n } +func (e *MeasurementBlockElem) SeriesN() uint32 { return e.series.n } // SeriesID returns series ID at an index. -func (e *MeasurementBlockElem) SeriesID(i int) uint64 { - return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) +func (e *MeasurementBlockElem) SeriesID(i int) uint32 { + return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:]) } // SeriesIDs returns a list of decoded series ids. // // NOTE: This should be used for testing and diagnostics purposes only. // It requires loading the entire list of series in-memory. -func (e *MeasurementBlockElem) SeriesIDs() []uint64 { - a := make([]uint64, 0, e.series.n) - var prev uint64 +func (e *MeasurementBlockElem) SeriesIDs() []uint32 { + a := make([]uint32, 0, e.series.n) + var prev uint32 for data := e.series.data; len(data) > 0; { delta, n := binary.Uvarint(data) data = data[n:] - seriesID := prev + delta + seriesID := prev + uint32(delta) a = append(a, seriesID) prev = seriesID } @@ -375,7 +375,7 @@ func (e *MeasurementBlockElem) UnmarshalBinary(data []byte) error { // Parse series data. v, n := binary.Uvarint(data) - e.series.n, data = uint64(v), data[n:] + e.series.n, data = uint32(v), data[n:] sz, n = binary.Uvarint(data) data = data[n:] e.series.data, data = data[:sz], data[sz:] @@ -405,7 +405,7 @@ func NewMeasurementBlockWriter() *MeasurementBlockWriter { } // Add adds a measurement with series and tag set offset/size. -func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint64) { +func (mw *MeasurementBlockWriter) Add(name []byte, deleted bool, offset, size int64, seriesIDs []uint32) { mm := mw.mms[string(name)] mm.deleted = deleted mm.tagBlock.offset = offset @@ -537,12 +537,12 @@ func (mw *MeasurementBlockWriter) writeMeasurementTo(w io.Writer, name []byte, m // Write series data to buffer. mw.buf.Reset() - var prev uint64 + var prev uint32 for _, seriesID := range mm.seriesIDs { delta := seriesID - prev - var buf [binary.MaxVarintLen64]byte - i := binary.PutUvarint(buf[:], delta) + var buf [binary.MaxVarintLen32]byte + i := binary.PutUvarint(buf[:], uint64(delta)) if _, err := mw.buf.Write(buf[:i]); err != nil { return err } @@ -587,7 +587,7 @@ type measurement struct { offset int64 size int64 } - seriesIDs []uint64 + seriesIDs []uint32 offset int64 } diff --git a/tsdb/index/tsi1/measurement_block_test.go b/tsdb/index/tsi1/measurement_block_test.go index 9ec6323de0..939c6d77cb 100644 --- a/tsdb/index/tsi1/measurement_block_test.go +++ b/tsdb/index/tsi1/measurement_block_test.go @@ -104,9 +104,9 @@ func TestMeasurementBlockTrailer_WriteTo(t *testing.T) { // Ensure measurement blocks can be written and opened. func TestMeasurementBlockWriter(t *testing.T) { ms := Measurements{ - NewMeasurement([]byte("foo"), false, 100, 10, []uint64{1, 3, 4}), - NewMeasurement([]byte("bar"), false, 200, 20, []uint64{2}), - NewMeasurement([]byte("baz"), false, 300, 30, []uint64{5, 6}), + NewMeasurement([]byte("foo"), false, 100, 10, []uint32{1, 3, 4}), + NewMeasurement([]byte("bar"), false, 200, 20, []uint32{2}), + NewMeasurement([]byte("baz"), false, 300, 30, []uint32{5, 6}), } // Write the measurements to writer. @@ -134,7 +134,7 @@ func TestMeasurementBlockWriter(t *testing.T) { t.Fatal("expected element") } else if e.TagBlockOffset() != 100 || e.TagBlockSize() != 10 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) - } else if !reflect.DeepEqual(e.SeriesIDs(), []uint64{1, 3, 4}) { + } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{1, 3, 4}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } @@ -142,7 +142,7 @@ func TestMeasurementBlockWriter(t *testing.T) { t.Fatal("expected element") } else if e.TagBlockOffset() != 200 || e.TagBlockSize() != 20 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) - } else if !reflect.DeepEqual(e.SeriesIDs(), []uint64{2}) { + } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{2}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } @@ -150,7 +150,7 @@ func TestMeasurementBlockWriter(t *testing.T) { t.Fatal("expected element") } else if e.TagBlockOffset() != 300 || e.TagBlockSize() != 30 { t.Fatalf("unexpected offset/size: %v/%v", e.TagBlockOffset(), e.TagBlockSize()) - } else if !reflect.DeepEqual(e.SeriesIDs(), []uint64{5, 6}) { + } else if !reflect.DeepEqual(e.SeriesIDs(), []uint32{5, 6}) { t.Fatalf("unexpected series data: %#v", e.SeriesIDs()) } @@ -167,10 +167,10 @@ type Measurement struct { Deleted bool Offset int64 Size int64 - ids []uint64 + ids []uint32 } -func NewMeasurement(name []byte, deleted bool, offset, size int64, ids []uint64) Measurement { +func NewMeasurement(name []byte, deleted bool, offset, size int64, ids []uint32) Measurement { return Measurement{ Name: name, Deleted: deleted, diff --git a/tsdb/index/tsi1/series_block.go b/tsdb/index/tsi1/series_block.go index 4572cf1aa6..9a347a0d9c 100644 --- a/tsdb/index/tsi1/series_block.go +++ b/tsdb/index/tsi1/series_block.go @@ -25,17 +25,17 @@ var ErrSeriesOverflow = errors.New("series overflow") const ( // Series list trailer field sizes. SeriesBlockTrailerSize = 0 + - 8 + 8 + // series data offset/size - 8 + 8 + 8 + // series index offset/size/capacity - 8 + 8 + 8 + // bloom filter false positive rate, offset/size - 8 + 8 + // series sketch offset/size - 8 + 8 + // tombstone series sketch offset/size - 8 + 8 + // series count and tombstone count + 4 + 4 + // series data offset/size + 4 + 4 + 4 + // series index offset/size/capacity + 8 + 4 + 4 + // bloom filter false positive rate, offset/size + 4 + 4 + // series sketch offset/size + 4 + 4 + // tombstone series sketch offset/size + 4 + 4 + // series count and tombstone count 0 // Other field sizes - SeriesCountSize = 8 - SeriesIDSize = 8 + SeriesCountSize = 4 + SeriesIDSize = 4 ) // Series flag constants. @@ -60,8 +60,8 @@ type SeriesBlock struct { seriesIndexes []seriesBlockIndex // Exact series counts for this block. - seriesN int64 - tombstoneN int64 + seriesN int32 + tombstoneN int32 // Bloom filter used for fast series existence check. filter *bloom.Filter @@ -92,7 +92,7 @@ func (blk *SeriesBlock) Series(name []byte, tags models.Tags) SeriesElem { } // Offset returns the byte offset of the series within the block. -func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offset uint64, tombstoned bool) { +func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offset uint32, tombstoned bool) { // Exit if no series indexes exist. if len(blk.seriesIndexes) == 0 { return 0, false @@ -100,7 +100,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse // Compute series key. buf = AppendSeriesKey(buf[:0], name, tags) - bufN := uint64(len(buf)) + bufN := uint32(len(buf)) // Quickly check the bloom filter. // If the key doesn't exist then we know for sure that it doesn't exist. @@ -121,7 +121,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse seriesIndex := blk.seriesIndexes[i] // Search within partition. - n := seriesIndex.capacity + n := int64(seriesIndex.capacity) hash := rhh.HashKey(buf) pos := hash % n @@ -129,7 +129,7 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse var d int64 for { // Find offset of series. - offset := binary.BigEndian.Uint64(seriesIndex.data[pos*SeriesIDSize:]) + offset := binary.BigEndian.Uint32(seriesIndex.data[pos*SeriesIDSize:]) if offset == 0 { return 0, false } @@ -157,8 +157,8 @@ func (blk *SeriesBlock) Offset(name []byte, tags models.Tags, buf []byte) (offse } // SeriesCount returns the number of series. -func (blk *SeriesBlock) SeriesCount() uint64 { - return uint64(blk.seriesN + blk.tombstoneN) +func (blk *SeriesBlock) SeriesCount() uint32 { + return uint32(blk.seriesN + blk.tombstoneN) } // SeriesIterator returns an iterator over all the series. @@ -192,17 +192,17 @@ func (blk *SeriesBlock) UnmarshalBinary(data []byte) error { idx := &blk.seriesIndexes[i] // Read data block. - var offset, size uint64 - offset, buf = binary.BigEndian.Uint64(buf[:8]), buf[8:] - size, buf = binary.BigEndian.Uint64(buf[:8]), buf[8:] + var offset, size uint32 + offset, buf = binary.BigEndian.Uint32(buf[:4]), buf[4:] + size, buf = binary.BigEndian.Uint32(buf[:4]), buf[4:] idx.data = blk.data[offset : offset+size] // Read block capacity. - idx.capacity, buf = int64(binary.BigEndian.Uint64(buf[:8])), buf[8:] + idx.capacity, buf = int32(binary.BigEndian.Uint32(buf[:4])), buf[4:] // Read min key. - var n uint64 - n, buf = binary.BigEndian.Uint64(buf[:8]), buf[8:] + var n uint32 + n, buf = binary.BigEndian.Uint32(buf[:4]), buf[4:] idx.min, buf = buf[:n], buf[n:] } if len(buf) != 0 { @@ -238,13 +238,13 @@ func (blk *SeriesBlock) UnmarshalBinary(data []byte) error { type seriesBlockIndex struct { data []byte min []byte - capacity int64 + capacity int32 } // seriesBlockIterator is an iterator over a series ids in a series list. type seriesBlockIterator struct { - i, n uint64 - offset uint64 + i, n uint32 + offset uint32 sblk *SeriesBlock e SeriesBlockElem // buffer } @@ -263,8 +263,8 @@ func (itr *seriesBlockIterator) Next() SeriesElem { itr.offset++ // Read index capacity. - n := binary.BigEndian.Uint64(itr.sblk.data[itr.offset:]) - itr.offset += 8 + n := binary.BigEndian.Uint32(itr.sblk.data[itr.offset:]) + itr.offset += 4 // Skip over index. itr.offset += n * SeriesIDSize @@ -276,7 +276,7 @@ func (itr *seriesBlockIterator) Next() SeriesElem { // Move iterator and offset forward. itr.i++ - itr.offset += uint64(itr.e.size) + itr.offset += uint32(itr.e.size) return &itr.e } @@ -375,12 +375,12 @@ func AppendSeriesElem(dst []byte, flag byte, name []byte, tags models.Tags) []by // AppendSeriesKey serializes name and tags to a byte slice. // The total length is prepended as a uvarint. func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { - buf := make([]byte, binary.MaxVarintLen64) + buf := make([]byte, binary.MaxVarintLen32) origLen := len(dst) // The tag count is variable encoded, so we need to know ahead of time what // the size of the tag count value will be. - tcBuf := make([]byte, binary.MaxVarintLen64) + tcBuf := make([]byte, binary.MaxVarintLen32) tcSz := binary.PutUvarint(tcBuf, uint64(len(tags))) // Size of name/tags. Does not include total length. @@ -539,7 +539,7 @@ type SeriesBlockEncoder struct { } // NewSeriesBlockEncoder returns a new instance of SeriesBlockEncoder. -func NewSeriesBlockEncoder(w io.Writer, n uint64, m, k uint64) *SeriesBlockEncoder { +func NewSeriesBlockEncoder(w io.Writer, n uint32, m, k uint64) *SeriesBlockEncoder { return &SeriesBlockEncoder{ w: w, @@ -597,7 +597,7 @@ func (enc *SeriesBlockEncoder) Encode(name []byte, tags models.Tags, deleted boo // Save offset to generate index later. // Key is copied by the RHH map. - enc.offsets.Put(buf[1:], uint64(offset)) + enc.offsets.Put(buf[1:], uint32(offset)) // Update bloom filter. enc.filter.Insert(buf[1:]) @@ -628,35 +628,35 @@ func (enc *SeriesBlockEncoder) Close() error { // Write dictionary-encoded series list. enc.trailer.Series.Data.Offset = 1 - enc.trailer.Series.Data.Size = enc.n - enc.trailer.Series.Data.Offset + enc.trailer.Series.Data.Size = int32(enc.n) - enc.trailer.Series.Data.Offset // Write dictionary-encoded series hash index. - enc.trailer.Series.Index.Offset = enc.n + enc.trailer.Series.Index.Offset = int32(enc.n) if err := enc.writeIndexEntries(); err != nil { return err } - enc.trailer.Series.Index.Size = enc.n - enc.trailer.Series.Index.Offset + enc.trailer.Series.Index.Size = int32(enc.n) - enc.trailer.Series.Index.Offset // Flush bloom filter. enc.trailer.Bloom.K = enc.filter.K() - enc.trailer.Bloom.Offset = enc.n + enc.trailer.Bloom.Offset = int32(enc.n) if err := writeTo(enc.w, enc.filter.Bytes(), &enc.n); err != nil { return err } - enc.trailer.Bloom.Size = enc.n - enc.trailer.Bloom.Offset + enc.trailer.Bloom.Size = int32(enc.n) - enc.trailer.Bloom.Offset // Write the sketches out. - enc.trailer.Sketch.Offset = enc.n + enc.trailer.Sketch.Offset = int32(enc.n) if err := writeSketchTo(enc.w, enc.sketch, &enc.n); err != nil { return err } - enc.trailer.Sketch.Size = enc.n - enc.trailer.Sketch.Offset + enc.trailer.Sketch.Size = int32(enc.n) - enc.trailer.Sketch.Offset - enc.trailer.TSketch.Offset = enc.n + enc.trailer.TSketch.Offset = int32(enc.n) if err := writeSketchTo(enc.w, enc.tSketch, &enc.n); err != nil { return err } - enc.trailer.TSketch.Size = enc.n - enc.trailer.TSketch.Offset + enc.trailer.TSketch.Size = int32(enc.n) - enc.trailer.TSketch.Offset // Write trailer. nn, err := enc.trailer.WriteTo(enc.w) @@ -670,23 +670,23 @@ func (enc *SeriesBlockEncoder) Close() error { // writeIndexEntries writes a list of series hash index entries. func (enc *SeriesBlockEncoder) writeIndexEntries() error { - enc.trailer.Series.Index.N = int64(len(enc.indexes)) + enc.trailer.Series.Index.N = int32(len(enc.indexes)) for _, idx := range enc.indexes { // Write offset/size. - if err := writeUint64To(enc.w, uint64(idx.offset), &enc.n); err != nil { + if err := writeUint32To(enc.w, uint32(idx.offset), &enc.n); err != nil { return err - } else if err := writeUint64To(enc.w, uint64(idx.size), &enc.n); err != nil { + } else if err := writeUint32To(enc.w, uint32(idx.size), &enc.n); err != nil { return err } // Write capacity. - if err := writeUint64To(enc.w, uint64(idx.capacity), &enc.n); err != nil { + if err := writeUint32To(enc.w, uint32(idx.capacity), &enc.n); err != nil { return err } // Write min key. - if err := writeUint64To(enc.w, uint64(len(idx.min)), &enc.n); err != nil { + if err := writeUint32To(enc.w, uint32(len(idx.min)), &enc.n); err != nil { return err } else if err := writeTo(enc.w, idx.min, &enc.n); err != nil { return err @@ -744,12 +744,12 @@ func (enc *SeriesBlockEncoder) flushIndex() error { } // Write index capacity. // This is used for skipping over when iterating sequentially. - if err := writeUint64To(enc.w, uint64(enc.offsets.Cap()), &enc.n); err != nil { + if err := writeUint32To(enc.w, uint32(enc.offsets.Cap()), &enc.n); err != nil { return err } // Determine size. - var sz int64 = enc.offsets.Cap() * 8 + var sz int64 = enc.offsets.Cap() * 4 // Save current position to ensure size is correct by the end. offset := enc.n @@ -757,9 +757,9 @@ func (enc *SeriesBlockEncoder) flushIndex() error { // Encode hash map offset entries. for i := int64(0); i < enc.offsets.Cap(); i++ { _, v := enc.offsets.Elem(i) - seriesOffset, _ := v.(uint64) + seriesOffset, _ := v.(uint32) - if err := writeUint64To(enc.w, uint64(seriesOffset), &enc.n); err != nil { + if err := writeUint32To(enc.w, uint32(seriesOffset), &enc.n); err != nil { return err } } @@ -774,9 +774,9 @@ func (enc *SeriesBlockEncoder) flushIndex() error { // Add to index entries. enc.indexes = append(enc.indexes, seriesBlockIndexEncodeInfo{ - offset: offset, - size: size, - capacity: uint64(enc.offsets.Cap()), + offset: uint32(offset), + size: uint32(size), + capacity: uint32(enc.offsets.Cap()), min: enc.indexMin, }) @@ -788,9 +788,9 @@ func (enc *SeriesBlockEncoder) flushIndex() error { // seriesBlockIndexEncodeInfo stores offset information for seriesBlockIndex structures. type seriesBlockIndexEncodeInfo struct { - offset int64 - size int64 - capacity uint64 + offset uint32 + size uint32 + capacity uint32 min []byte } @@ -802,30 +802,30 @@ func ReadSeriesBlockTrailer(data []byte) SeriesBlockTrailer { buf := data[len(data)-SeriesBlockTrailerSize:] // Read series data info. - t.Series.Data.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.Series.Data.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Series.Data.Offset, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.Series.Data.Size, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] // Read series hash index info. - t.Series.Index.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.Series.Index.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.Series.Index.N, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Series.Index.Offset, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.Series.Index.Size, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.Series.Index.N, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] // Read bloom filter info. t.Bloom.K, buf = binary.BigEndian.Uint64(buf[0:8]), buf[8:] - t.Bloom.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.Bloom.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Bloom.Offset, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.Bloom.Size, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] // Read series sketch info. - t.Sketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.Sketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.Sketch.Offset, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.Sketch.Size, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] // Read tombstone series sketch info. - t.TSketch.Offset, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.TSketch.Size, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.TSketch.Offset, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.TSketch.Size, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] // Read series & tombstone count. - t.SeriesN, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] - t.TombstoneN, buf = int64(binary.BigEndian.Uint64(buf[0:8])), buf[8:] + t.SeriesN, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] + t.TombstoneN, buf = int32(binary.BigEndian.Uint32(buf[0:4])), buf[4:] return t } @@ -834,81 +834,81 @@ func ReadSeriesBlockTrailer(data []byte) SeriesBlockTrailer { type SeriesBlockTrailer struct { Series struct { Data struct { - Offset int64 - Size int64 + Offset int32 + Size int32 } Index struct { - Offset int64 - Size int64 - N int64 + Offset int32 + Size int32 + N int32 } } // Bloom filter info. Bloom struct { K uint64 - Offset int64 - Size int64 + Offset int32 + Size int32 } // Offset and size of cardinality sketch for measurements. Sketch struct { - Offset int64 - Size int64 + Offset int32 + Size int32 } // Offset and size of cardinality sketch for tombstoned measurements. TSketch struct { - Offset int64 - Size int64 + Offset int32 + Size int32 } - SeriesN int64 - TombstoneN int64 + SeriesN int32 + TombstoneN int32 } func (t SeriesBlockTrailer) WriteTo(w io.Writer) (n int64, err error) { - if err := writeUint64To(w, uint64(t.Series.Data.Offset), &n); err != nil { + if err := writeUint32To(w, uint32(t.Series.Data.Offset), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Series.Data.Size), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Series.Data.Size), &n); err != nil { return n, err } - if err := writeUint64To(w, uint64(t.Series.Index.Offset), &n); err != nil { + if err := writeUint32To(w, uint32(t.Series.Index.Offset), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Series.Index.Size), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Series.Index.Size), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Series.Index.N), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Series.Index.N), &n); err != nil { return n, err } // Write bloom filter info. if err := writeUint64To(w, t.Bloom.K, &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Bloom.Offset), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Bloom.Offset), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Bloom.Size), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Bloom.Size), &n); err != nil { return n, err } // Write measurement sketch info. - if err := writeUint64To(w, uint64(t.Sketch.Offset), &n); err != nil { + if err := writeUint32To(w, uint32(t.Sketch.Offset), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.Sketch.Size), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.Sketch.Size), &n); err != nil { return n, err } // Write tombstone measurement sketch info. - if err := writeUint64To(w, uint64(t.TSketch.Offset), &n); err != nil { + if err := writeUint32To(w, uint32(t.TSketch.Offset), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.TSketch.Size), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.TSketch.Size), &n); err != nil { return n, err } // Write series and tombstone count. - if err := writeUint64To(w, uint64(t.SeriesN), &n); err != nil { + if err := writeUint32To(w, uint32(t.SeriesN), &n); err != nil { return n, err - } else if err := writeUint64To(w, uint64(t.TombstoneN), &n); err != nil { + } else if err := writeUint32To(w, uint32(t.TombstoneN), &n); err != nil { return n, err } @@ -919,7 +919,7 @@ type serie struct { name []byte tags models.Tags deleted bool - offset uint64 + offset uint32 } func (s *serie) flag() uint8 { return encodeSerieFlag(s.deleted) } diff --git a/tsdb/index/tsi1/series_block_test.go b/tsdb/index/tsi1/series_block_test.go index c4fb39da5d..3455abce93 100644 --- a/tsdb/index/tsi1/series_block_test.go +++ b/tsdb/index/tsi1/series_block_test.go @@ -56,7 +56,7 @@ func CreateSeriesBlock(a []Series) (*tsi1.SeriesBlock, error) { var buf bytes.Buffer // Create writer and sketches. Add series. - enc := tsi1.NewSeriesBlockEncoder(&buf, uint64(len(a)), M, K) + enc := tsi1.NewSeriesBlockEncoder(&buf, uint32(len(a)), M, K) for i, s := range a { if err := enc.Encode(s.Name, s.Tags, s.Deleted); err != nil { return nil, fmt.Errorf("SeriesBlockWriter.Add(): i=%d, err=%s", i, err) diff --git a/tsdb/index/tsi1/tag_block.go b/tsdb/index/tsi1/tag_block.go index db14ced6a3..d3024896c1 100644 --- a/tsdb/index/tsi1/tag_block.go +++ b/tsdb/index/tsi1/tag_block.go @@ -300,7 +300,7 @@ type TagBlockValueElem struct { flag byte value []byte series struct { - n uint64 // Series count + n uint32 // Series count data []byte // Raw series data } @@ -314,25 +314,25 @@ func (e *TagBlockValueElem) Deleted() bool { return (e.flag & TagValueTombstoneF func (e *TagBlockValueElem) Value() []byte { return e.value } // SeriesN returns the series count. -func (e *TagBlockValueElem) SeriesN() uint64 { return e.series.n } +func (e *TagBlockValueElem) SeriesN() uint32 { return e.series.n } // SeriesData returns the raw series data. func (e *TagBlockValueElem) SeriesData() []byte { return e.series.data } // SeriesID returns series ID at an index. -func (e *TagBlockValueElem) SeriesID(i int) uint64 { - return binary.BigEndian.Uint64(e.series.data[i*SeriesIDSize:]) +func (e *TagBlockValueElem) SeriesID(i int) uint32 { + return binary.BigEndian.Uint32(e.series.data[i*SeriesIDSize:]) } // SeriesIDs returns a list decoded series ids. -func (e *TagBlockValueElem) SeriesIDs() []uint64 { - a := make([]uint64, 0, e.series.n) - var prev uint64 +func (e *TagBlockValueElem) SeriesIDs() []uint32 { + a := make([]uint32, 0, e.series.n) + var prev uint32 for data := e.series.data; len(data) > 0; { delta, n := binary.Uvarint(data) data = data[n:] - seriesID := prev + delta + seriesID := prev + uint32(delta) a = append(a, seriesID) prev = seriesID } @@ -354,7 +354,8 @@ func (e *TagBlockValueElem) unmarshal(buf []byte) { e.value, buf = buf[n:n+int(sz)], buf[n+int(sz):] // Parse series count. - e.series.n, n = binary.Uvarint(buf) + v, n := binary.Uvarint(buf) + e.series.n = uint32(v) buf = buf[n:] // Parse data block size. @@ -531,7 +532,7 @@ func (enc *TagBlockEncoder) EncodeKey(key []byte, deleted bool) error { // EncodeValue writes a tag value to the underlying writer. // The tag key must be lexicographical sorted after the previous encoded tag key. -func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []uint64) error { +func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs []uint32) error { if len(enc.keys) == 0 { return fmt.Errorf("tag key must be encoded before encoding values") } else if len(value) == 0 { @@ -555,12 +556,12 @@ func (enc *TagBlockEncoder) EncodeValue(value []byte, deleted bool, seriesIDs [] // Build series data in buffer. enc.buf.Reset() - var prev uint64 + var prev uint32 for _, seriesID := range seriesIDs { delta := seriesID - prev - var buf [binary.MaxVarintLen64]byte - i := binary.PutUvarint(buf[:], delta) + var buf [binary.MaxVarintLen32]byte + i := binary.PutUvarint(buf[:], uint64(delta)) if _, err := enc.buf.Write(buf[:i]); err != nil { return err } diff --git a/tsdb/index/tsi1/tag_block_test.go b/tsdb/index/tsi1/tag_block_test.go index f69042a4f3..4de527e16d 100644 --- a/tsdb/index/tsi1/tag_block_test.go +++ b/tsdb/index/tsi1/tag_block_test.go @@ -17,19 +17,19 @@ func TestTagBlockWriter(t *testing.T) { if err := enc.EncodeKey([]byte("host"), false); err != nil { t.Fatal(err) - } else if err := enc.EncodeValue([]byte("server0"), false, []uint64{1}); err != nil { + } else if err := enc.EncodeValue([]byte("server0"), false, []uint32{1}); err != nil { t.Fatal(err) - } else if err := enc.EncodeValue([]byte("server1"), false, []uint64{2}); err != nil { + } else if err := enc.EncodeValue([]byte("server1"), false, []uint32{2}); err != nil { t.Fatal(err) - } else if err := enc.EncodeValue([]byte("server2"), false, []uint64{3}); err != nil { + } else if err := enc.EncodeValue([]byte("server2"), false, []uint32{3}); err != nil { t.Fatal(err) } if err := enc.EncodeKey([]byte("region"), false); err != nil { t.Fatal(err) - } else if err := enc.EncodeValue([]byte("us-east"), false, []uint64{1, 2}); err != nil { + } else if err := enc.EncodeValue([]byte("us-east"), false, []uint32{1, 2}); err != nil { t.Fatal(err) - } else if err := enc.EncodeValue([]byte("us-west"), false, []uint64{3}); err != nil { + } else if err := enc.EncodeValue([]byte("us-west"), false, []uint32{3}); err != nil { t.Fatal(err) } @@ -49,28 +49,28 @@ func TestTagBlockWriter(t *testing.T) { // Verify data. if e := blk.TagValueElem([]byte("region"), []byte("us-east")); e == nil { t.Fatal("expected element") - } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint64{1, 2}) { + } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint32{1, 2}) { t.Fatalf("unexpected series ids: %#v", a) } if e := blk.TagValueElem([]byte("region"), []byte("us-west")); e == nil { t.Fatal("expected element") - } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint64{3}) { + } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint32{3}) { t.Fatalf("unexpected series ids: %#v", a) } if e := blk.TagValueElem([]byte("host"), []byte("server0")); e == nil { t.Fatal("expected element") - } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint64{1}) { + } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint32{1}) { t.Fatalf("unexpected series ids: %#v", a) } if e := blk.TagValueElem([]byte("host"), []byte("server1")); e == nil { t.Fatal("expected element") - } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint64{2}) { + } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint32{2}) { t.Fatalf("unexpected series ids: %#v", a) } if e := blk.TagValueElem([]byte("host"), []byte("server2")); e == nil { t.Fatal("expected element") - } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint64{3}) { + } else if a := e.(*tsi1.TagBlockValueElem).SeriesIDs(); !reflect.DeepEqual(a, []uint32{3}) { t.Fatalf("unexpected series ids: %#v", a) } } @@ -105,7 +105,7 @@ func benchmarkTagBlock_SeriesN(b *testing.B, tagN, valueN int, blk **tsi1.TagBlo } for j := 0; j < valueN; j++ { - if err := enc.EncodeValue([]byte(fmt.Sprintf("%08d", j)), false, []uint64{1}); err != nil { + if err := enc.EncodeValue([]byte(fmt.Sprintf("%08d", j)), false, []uint32{1}); err != nil { b.Fatal(err) } } diff --git a/tsdb/index/tsi1/tsi1.go b/tsdb/index/tsi1/tsi1.go index 9e8b9beb07..775ee1e99d 100644 --- a/tsdb/index/tsi1/tsi1.go +++ b/tsdb/index/tsi1/tsi1.go @@ -720,7 +720,7 @@ func (itr *seriesExprIterator) Next() SeriesElem { // seriesIDIterator represents a iterator over a list of series ids. type seriesIDIterator interface { - next() uint64 + next() uint32 } // writeTo writes write v into w. Updates n. @@ -773,6 +773,12 @@ func writeUvarintTo(w io.Writer, v uint64, n *int64) error { return err } +type uint32Slice []uint32 + +func (a uint32Slice) Len() int { return len(a) } +func (a uint32Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint32Slice) Less(i, j int) bool { return a[i] < a[j] } + type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } From 48456d80ad50b7d87a55f77a5cebcf8f6d9753d0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 23 May 2017 10:24:37 -0600 Subject: [PATCH 10/10] Remove tsi commented code. --- tsdb/index/tsi1/index.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 278e28e5d3..cc6ccdd08a 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -790,7 +790,6 @@ func (i *Index) compact() { for level := minLevel; level <= maxLevel; level++ { // Skip level if it is currently compacting. if i.levelCompacting[level] { - // log.Printf("tsi1: SKIP, already compacting: level=%d", level) continue }