reduce allocations in query execution

This commit removes some heap objects by converting them from
pointer references to non-pointers or by reusing buffers.
pull/6035/head
Ben Johnson 2016-03-16 14:03:53 -06:00
parent a82778c75c
commit 6e1c1da25b
10 changed files with 196 additions and 154 deletions

View File

@ -836,7 +836,11 @@ func (itr *floatReduceFloatIterator) reduce() []FloatPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -968,7 +972,11 @@ func (itr *floatReduceIntegerIterator) reduce() []IntegerPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -1100,7 +1108,11 @@ func (itr *floatReduceStringIterator) reduce() []StringPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -1232,7 +1244,11 @@ func (itr *floatReduceBooleanIterator) reduce() []BooleanPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -2271,7 +2287,11 @@ func (itr *integerReduceFloatIterator) reduce() []FloatPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -2403,7 +2423,11 @@ func (itr *integerReduceIntegerIterator) reduce() []IntegerPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -2535,7 +2559,11 @@ func (itr *integerReduceStringIterator) reduce() []StringPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -2667,7 +2695,11 @@ func (itr *integerReduceBooleanIterator) reduce() []BooleanPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -3706,7 +3738,11 @@ func (itr *stringReduceFloatIterator) reduce() []FloatPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -3838,7 +3874,11 @@ func (itr *stringReduceIntegerIterator) reduce() []IntegerPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -3970,7 +4010,11 @@ func (itr *stringReduceStringIterator) reduce() []StringPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -4102,7 +4146,11 @@ func (itr *stringReduceBooleanIterator) reduce() []BooleanPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -5141,7 +5189,11 @@ func (itr *booleanReduceFloatIterator) reduce() []FloatPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -5273,7 +5325,11 @@ func (itr *booleanReduceIntegerIterator) reduce() []IntegerPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -5405,7 +5461,11 @@ func (itr *booleanReduceStringIterator) reduce() []StringPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]
@ -5537,7 +5597,11 @@ func (itr *booleanReduceBooleanIterator) reduce() []BooleanPoint {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]

View File

@ -685,10 +685,6 @@ func (itr *{{$k.name}}AuxIterator) SeriesKeys(opt IteratorOptions) (SeriesList,
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) ExpandSources(sources Sources) (Sources, error) {
return nil, errors.New("not implemented")
}
func (itr *{{.name}}AuxIterator) stream() {
for {
// Read next point.
@ -837,7 +833,11 @@ func (itr *{{$k.name}}Reduce{{$v.Name}}Iterator) reduce() []{{$v.Name}}Point {
continue
}
tags := curr.Tags.Subset(itr.opt.Dimensions)
id := curr.Name + "\x00" + tags.ID()
id := curr.Name
if len(tags.m) > 0 {
id += "\x00" + tags.ID()
}
// Retrieve the aggregator for this name/tag combination or create one.
rp := m[id]

View File

@ -17,12 +17,7 @@ const (
)
// BooleanEncoder encodes a series of booleans to an in-memory buffer.
type BooleanEncoder interface {
Write(b bool)
Bytes() ([]byte, error)
}
type booleanEncoder struct {
type BooleanEncoder struct {
// The encoded bytes
bytes []byte
@ -38,10 +33,10 @@ type booleanEncoder struct {
// NewBooleanEncoder returns a new instance of BooleanEncoder.
func NewBooleanEncoder() BooleanEncoder {
return &booleanEncoder{}
return BooleanEncoder{}
}
func (e *booleanEncoder) Write(b bool) {
func (e *BooleanEncoder) Write(b bool) {
// If we have filled the current byte, flush it
if e.i >= 8 {
e.flush()
@ -60,7 +55,7 @@ func (e *booleanEncoder) Write(b bool) {
e.n++
}
func (e *booleanEncoder) flush() {
func (e *BooleanEncoder) flush() {
// Pad remaining byte w/ 0s
for e.i < 8 {
e.b = e.b << 1
@ -75,7 +70,7 @@ func (e *booleanEncoder) flush() {
}
}
func (e *booleanEncoder) Bytes() ([]byte, error) {
func (e *BooleanEncoder) Bytes() ([]byte, error) {
// Ensure the current byte is flushed
e.flush()
b := make([]byte, 10+1)
@ -92,13 +87,7 @@ func (e *booleanEncoder) Bytes() ([]byte, error) {
}
// BooleanDecoder decodes a series of booleans from an in-memory buffer.
type BooleanDecoder interface {
Next() bool
Read() bool
Error() error
}
type booleanDecoder struct {
type BooleanDecoder struct {
b []byte
i int
n int
@ -111,15 +100,15 @@ func NewBooleanDecoder(b []byte) BooleanDecoder {
// currently ignore for now.
b = b[1:]
count, n := binary.Uvarint(b)
return &booleanDecoder{b: b[n:], i: -1, n: int(count)}
return BooleanDecoder{b: b[n:], i: -1, n: int(count)}
}
func (e *booleanDecoder) Next() bool {
func (e *BooleanDecoder) Next() bool {
e.i++
return e.i < e.n
}
func (e *booleanDecoder) Read() bool {
func (e *BooleanDecoder) Read() bool {
// Index into the byte slice
idx := e.i / 8
@ -136,6 +125,6 @@ func (e *booleanDecoder) Read() bool {
return v&mask == mask
}
func (e *booleanDecoder) Error() error {
func (e *BooleanDecoder) Error() error {
return e.err
}

View File

@ -32,7 +32,8 @@ type TSMFile interface {
ReadBooleanBlockAt(entry *IndexEntry, values []BooleanValue) ([]BooleanValue, error)
// Entries returns the index entries for all blocks for the given key.
Entries(key string) []*IndexEntry
Entries(key string) []IndexEntry
ReadEntries(key string, entries *[]IndexEntry)
// Returns true if the TSMFile may contain a value with the specified
// key and time
@ -493,8 +494,8 @@ func (f *FileStore) BlockCount(path string, idx int) int {
// locations returns the files and index blocks for a key and time. ascending indicates
// whether the key will be scan in ascending time order or descenging time order.
func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
var locations []*location
func (f *FileStore) locations(key string, t int64, ascending bool) []location {
var locations []location
f.mu.RLock()
filesSnapshot := make([]TSMFile, len(f.files))
@ -503,6 +504,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
}
f.mu.RUnlock()
var entries []IndexEntry
for _, fd := range filesSnapshot {
minTime, maxTime := fd.TimeRange()
@ -518,7 +520,8 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
// This file could potential contain points we are looking for so find the blocks for
// the given key.
for _, ie := range fd.Entries(key) {
fd.ReadEntries(key, &entries)
for _, ie := range entries {
// If we ascending and the max time of a block is before where we are looking, skip
// it since the data is out of our range
if ascending && ie.MaxTime < t {
@ -530,7 +533,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
}
// Otherwise, add this file and block location
locations = append(locations, &location{
locations = append(locations, location{
r: fd,
entry: ie,
})
@ -565,12 +568,12 @@ type KeyCursor struct {
fs *FileStore
// seeks is all the file locations that we need to return during iteration.
seeks []*location
seeks []location
// current is the set of blocks possibly containing the next set of points.
// Normally this is just one entry, but there may be multiple if points have
// been overwritten.
current []*location
current []location
buf []Value
// pos is the index within seeks. Based on ascending, it will increment or
@ -587,7 +590,7 @@ type KeyCursor struct {
type location struct {
r TSMFile
entry *IndexEntry
entry IndexEntry
// Has this location been read before
read bool
@ -705,7 +708,7 @@ func (c *KeyCursor) nextAscending() {
}
// Append the first matching block
c.current = []*location{c.seeks[c.pos]}
c.current = []location{c.seeks[c.pos]}
// We're done if there are no overlapping blocks.
if !c.duplicates {
@ -736,7 +739,7 @@ func (c *KeyCursor) nextDescending() {
}
// Append the first matching block
c.current = []*location{c.seeks[c.pos]}
c.current = []location{c.seeks[c.pos]}
// We're done if there are no overlapping blocks.
if !c.duplicates {
@ -764,7 +767,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) {
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadFloatBlockAt(first.entry, buf[:0])
values, err := first.r.ReadFloatBlockAt(&first.entry, buf[:0])
first.read = true
// Only one block with this key and time range so return it
@ -779,7 +782,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadFloatBlockAt(cur.entry, nil)
v, err := cur.r.ReadFloatBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -788,7 +791,7 @@ func (c *KeyCursor) ReadFloatBlock(buf []FloatValue) ([]FloatValue, error) {
cur.read = true
c.pos--
v, err := cur.r.ReadFloatBlockAt(cur.entry, nil)
v, err := cur.r.ReadFloatBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -808,7 +811,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error)
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadIntegerBlockAt(first.entry, buf[:0])
values, err := first.r.ReadIntegerBlockAt(&first.entry, buf[:0])
first.read = true
// Only one block with this key and time range so return it
@ -823,7 +826,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error)
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadIntegerBlockAt(cur.entry, nil)
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -832,7 +835,7 @@ func (c *KeyCursor) ReadIntegerBlock(buf []IntegerValue) ([]IntegerValue, error)
cur.read = true
c.pos--
v, err := cur.r.ReadIntegerBlockAt(cur.entry, nil)
v, err := cur.r.ReadIntegerBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -852,7 +855,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) {
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadStringBlockAt(first.entry, buf[:0])
values, err := first.r.ReadStringBlockAt(&first.entry, buf[:0])
first.read = true
// Only one block with this key and time range so return it
@ -867,7 +870,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) {
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadStringBlockAt(cur.entry, nil)
v, err := cur.r.ReadStringBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -876,7 +879,7 @@ func (c *KeyCursor) ReadStringBlock(buf []StringValue) ([]StringValue, error) {
cur.read = true
c.pos--
v, err := cur.r.ReadStringBlockAt(cur.entry, nil)
v, err := cur.r.ReadStringBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -896,7 +899,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error)
// First block is the oldest block containing the points we're search for.
first := c.current[0]
values, err := first.r.ReadBooleanBlockAt(first.entry, buf[:0])
values, err := first.r.ReadBooleanBlockAt(&first.entry, buf[:0])
first.read = true
// Only one block with this key and time range so return it
@ -911,7 +914,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error)
if c.ascending && !cur.read {
cur.read = true
c.pos++
v, err := cur.r.ReadBooleanBlockAt(cur.entry, nil)
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}
@ -920,7 +923,7 @@ func (c *KeyCursor) ReadBooleanBlock(buf []BooleanValue) ([]BooleanValue, error)
cur.read = true
c.pos--
v, err := cur.r.ReadBooleanBlockAt(cur.entry, nil)
v, err := cur.r.ReadBooleanBlockAt(&cur.entry, nil)
if err != nil {
return nil, err
}

View File

@ -138,17 +138,17 @@ type FloatDecoder struct {
err error
}
func NewFloatDecoder(b []byte) (*FloatDecoder, error) {
func NewFloatDecoder(b []byte) (FloatDecoder, error) {
// first byte is the compression type but we currently just have gorilla
// compression
br := bitstream.NewReader(bytes.NewReader(b[1:]))
v, err := br.ReadBits(64)
if err != nil {
return nil, err
return FloatDecoder{}, err
}
return &FloatDecoder{
return FloatDecoder{
val: math.Float64frombits(v),
first: true,
br: br,

View File

@ -37,29 +37,17 @@ const (
)
// IntegerEncoder encoders int64 into byte slices
type IntegerEncoder interface {
Write(v int64)
Bytes() ([]byte, error)
}
// IntegerDecoder decodes a byte slice into int64s
type IntegerDecoder interface {
Next() bool
Read() int64
Error() error
}
type integerEncoder struct {
type IntegerEncoder struct {
prev int64
rle bool
values []uint64
}
func NewIntegerEncoder() IntegerEncoder {
return &integerEncoder{rle: true}
return IntegerEncoder{rle: true}
}
func (e *integerEncoder) Write(v int64) {
func (e *IntegerEncoder) Write(v int64) {
// Delta-encode each value as it's written. This happens before
// ZigZagEncoding because the deltas could be negative.
delta := v - e.prev
@ -72,7 +60,7 @@ func (e *integerEncoder) Write(v int64) {
e.values = append(e.values, enc)
}
func (e *integerEncoder) Bytes() ([]byte, error) {
func (e *IntegerEncoder) Bytes() ([]byte, error) {
// Only run-length encode if it could be reduce storage size
if e.rle && len(e.values) > 2 {
return e.encodeRLE()
@ -88,7 +76,7 @@ func (e *integerEncoder) Bytes() ([]byte, error) {
return e.encodePacked()
}
func (e *integerEncoder) encodeRLE() ([]byte, error) {
func (e *IntegerEncoder) encodeRLE() ([]byte, error) {
// Large varints can take up to 10 bytes
b := make([]byte, 1+10*3)
@ -107,7 +95,7 @@ func (e *integerEncoder) encodeRLE() ([]byte, error) {
return b[:i], nil
}
func (e *integerEncoder) encodePacked() ([]byte, error) {
func (e *IntegerEncoder) encodePacked() ([]byte, error) {
if len(e.values) == 0 {
return nil, nil
}
@ -133,7 +121,7 @@ func (e *integerEncoder) encodePacked() ([]byte, error) {
return b, nil
}
func (e *integerEncoder) encodeUncompressed() ([]byte, error) {
func (e *IntegerEncoder) encodeUncompressed() ([]byte, error) {
if len(e.values) == 0 {
return nil, nil
}
@ -148,7 +136,8 @@ func (e *integerEncoder) encodeUncompressed() ([]byte, error) {
return b, nil
}
type integerDecoder struct {
// IntegerDecoder decodes a byte slice into int64s.
type IntegerDecoder struct {
values []uint64
bytes []byte
i int
@ -166,7 +155,7 @@ type integerDecoder struct {
}
func NewIntegerDecoder(b []byte) IntegerDecoder {
d := &integerDecoder{
d := IntegerDecoder{
// 240 is the maximum number of values that can be encoded into a single uint64 using simple8b
values: make([]uint64, 240),
}
@ -175,7 +164,7 @@ func NewIntegerDecoder(b []byte) IntegerDecoder {
return d
}
func (d *integerDecoder) SetBytes(b []byte) {
func (d *IntegerDecoder) SetBytes(b []byte) {
if len(b) > 0 {
d.encoding = b[0] >> 4
d.bytes = b[1:]
@ -185,7 +174,7 @@ func (d *integerDecoder) SetBytes(b []byte) {
d.n = 0
}
func (d *integerDecoder) Next() bool {
func (d *IntegerDecoder) Next() bool {
if d.i >= d.n && len(d.bytes) == 0 {
return false
}
@ -207,11 +196,11 @@ func (d *integerDecoder) Next() bool {
return d.i < d.n
}
func (d *integerDecoder) Error() error {
func (d *IntegerDecoder) Error() error {
return d.err
}
func (d *integerDecoder) Read() int64 {
func (d *IntegerDecoder) Read() int64 {
switch d.encoding {
case intCompressedRLE:
return ZigZagDecode(d.rleFirst + uint64(d.i)*d.rleDelta)
@ -225,7 +214,7 @@ func (d *integerDecoder) Read() int64 {
}
}
func (d *integerDecoder) decodeRLE() {
func (d *IntegerDecoder) decodeRLE() {
if len(d.bytes) == 0 {
return
}
@ -256,7 +245,7 @@ func (d *integerDecoder) decodeRLE() {
d.bytes = nil
}
func (d *integerDecoder) decodePacked() {
func (d *IntegerDecoder) decodePacked() {
if len(d.bytes) == 0 {
return
}
@ -281,7 +270,7 @@ func (d *integerDecoder) decodePacked() {
d.bytes = d.bytes[8:]
}
func (d *integerDecoder) decodeUncompressed() {
func (d *IntegerDecoder) decodeUncompressed() {
if len(d.bytes) == 0 {
return
}

View File

@ -498,9 +498,8 @@ func BenchmarkIntegerDecoderPackedSimple(b *testing.B) {
b.ResetTimer()
dec := NewIntegerDecoder(bytes)
for i := 0; i < b.N; i++ {
dec.(byteSetter).SetBytes(bytes)
dec.SetBytes(bytes)
for dec.Next() {
}
}
@ -520,7 +519,7 @@ func BenchmarkIntegerDecoderRLE(b *testing.B) {
dec := NewIntegerDecoder(bytes)
for i := 0; i < b.N; i++ {
dec.(byteSetter).SetBytes(bytes)
dec.SetBytes(bytes)
for dec.Next() {
}
}

View File

@ -42,7 +42,7 @@ type BlockIterator struct {
n int
key string
entries []*IndexEntry
entries []IndexEntry
err error
}
@ -82,7 +82,7 @@ func (b *BlockIterator) Read() (string, int64, int64, []byte, error) {
return "", 0, 0, nil, b.err
}
buf, err := b.r.readBytes(b.entries[0], nil)
buf, err := b.r.readBytes(&b.entries[0], nil)
if err != nil {
return "", 0, 0, nil, err
}
@ -193,7 +193,7 @@ func (t *TSMReader) Keys() []string {
return t.index.Keys()
}
func (t *TSMReader) Key(index int) (string, []*IndexEntry) {
func (t *TSMReader) Key(index int) (string, []IndexEntry) {
return t.index.Key(index)
}
@ -314,10 +314,14 @@ func (t *TSMReader) KeyCount() int {
return t.index.KeyCount()
}
func (t *TSMReader) Entries(key string) []*IndexEntry {
func (t *TSMReader) Entries(key string) []IndexEntry {
return t.index.Entries(key)
}
func (t *TSMReader) ReadEntries(key string, entries *[]IndexEntry) {
t.index.ReadEntries(key, entries)
}
func (t *TSMReader) IndexSize() uint32 {
return t.index.Size()
}
@ -477,7 +481,7 @@ func (d *indirectIndex) search(key []byte) int {
}
// Entries returns all index entries for a key.
func (d *indirectIndex) Entries(key string) []*IndexEntry {
func (d *indirectIndex) Entries(key string) []IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
@ -499,10 +503,9 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry {
// Read and return all the entries
ofs += n
_, entries, err := readEntries(d.b[ofs:])
if err != nil {
var entries indexEntries
if _, err := readEntries(d.b[ofs:], &entries); err != nil {
panic(fmt.Sprintf("error reading entries: %v", err))
}
return entries.entries
}
@ -511,13 +514,18 @@ func (d *indirectIndex) Entries(key string) []*IndexEntry {
return nil
}
// ReadEntries returns all index entries for a key.
func (d *indirectIndex) ReadEntries(key string, entries *[]IndexEntry) {
*entries = d.Entries(key)
}
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key an timestamp, nil is returned.
func (d *indirectIndex) Entry(key string, timestamp int64) *IndexEntry {
entries := d.Entries(key)
for _, entry := range entries {
if entry.Contains(timestamp) {
return entry
return &entry
}
}
return nil
@ -535,7 +543,7 @@ func (d *indirectIndex) Keys() []string {
return keys
}
func (d *indirectIndex) Key(idx int) (string, []*IndexEntry) {
func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -543,7 +551,9 @@ func (d *indirectIndex) Key(idx int) (string, []*IndexEntry) {
return "", nil
}
n, key, _ := readKey(d.b[d.offsets[idx]:])
_, entries, _ := readEntries(d.b[int(d.offsets[idx])+n:])
var entries indexEntries
readEntries(d.b[int(d.offsets[idx])+n:], &entries)
return string(key), entries.entries
}
@ -892,7 +902,7 @@ func (f *fileAccessor) readAll(key string) ([]Value, error) {
b := make([]byte, 16*1024)
for _, block := range blocks {
b, err := f.readBytes(block, b)
b, err := f.readBytes(&block, b)
if err != nil {
return nil, err
}
@ -1133,7 +1143,7 @@ func (m *mmapAccessor) close() error {
type indexEntries struct {
Type byte
entries []*IndexEntry
entries []IndexEntry
}
func (a *indexEntries) Len() int { return len(a.entries) }
@ -1142,10 +1152,6 @@ func (a *indexEntries) Less(i, j int) bool {
return a.entries[i].MinTime < a.entries[j].MinTime
}
func (a *indexEntries) Append(entry ...*IndexEntry) {
a.entries = append(a.entries, entry...)
}
func (a *indexEntries) MarshalBinary() ([]byte, error) {
buf := make([]byte, len(a.entries)*indexEntrySize)
@ -1183,25 +1189,22 @@ func readKey(b []byte) (n int, key []byte, err error) {
return
}
func readEntries(b []byte) (n int, entries *indexEntries, err error) {
func readEntries(b []byte, entries *indexEntries) (n int, err error) {
// 1 byte block type
blockType := b[n]
entries = &indexEntries{
Type: blockType,
entries: []*IndexEntry{},
}
entries.Type = b[n]
n++
// 2 byte count of index entries
count := int(binary.BigEndian.Uint16(b[n : n+indexCountSize]))
n += indexCountSize
entries.entries = make([]IndexEntry, count)
for i := 0; i < count; i++ {
ie := &IndexEntry{}
var ie IndexEntry
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize+indexTypeSize : i*indexEntrySize+indexCountSize+indexEntrySize+indexTypeSize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
return 0, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries.Append(ie)
entries.entries[i] = ie
n += indexEntrySize
}
return

View File

@ -21,27 +21,16 @@ const (
stringCompressedSnappy = 1
)
type StringEncoder interface {
Write(s string)
Bytes() ([]byte, error)
}
type StringDecoder interface {
Next() bool
Read() string
Error() error
}
type stringEncoder struct {
type StringEncoder struct {
// The encoded bytes
bytes []byte
}
func NewStringEncoder() StringEncoder {
return &stringEncoder{}
return StringEncoder{}
}
func (e *stringEncoder) Write(s string) {
func (e *StringEncoder) Write(s string) {
b := make([]byte, 10)
// Append the length of the string using variable byte encoding
i := binary.PutUvarint(b, uint64(len(s)))
@ -51,14 +40,14 @@ func (e *stringEncoder) Write(s string) {
e.bytes = append(e.bytes, s...)
}
func (e *stringEncoder) Bytes() ([]byte, error) {
func (e *StringEncoder) Bytes() ([]byte, error) {
// Compress the currently appended bytes using snappy and prefix with
// a 1 byte header for future extension
data := snappy.Encode(nil, e.bytes)
return append([]byte{stringCompressedSnappy << 4}, data...), nil
}
type stringDecoder struct {
type StringDecoder struct {
b []byte
l int
i int
@ -70,18 +59,18 @@ func NewStringDecoder(b []byte) (StringDecoder, error) {
// currently so ignore for now.
data, err := snappy.Decode(nil, b[1:])
if err != nil {
return nil, fmt.Errorf("failed to decode string block: %v", err.Error())
return StringDecoder{}, fmt.Errorf("failed to decode string block: %v", err.Error())
}
return &stringDecoder{b: data}, nil
return StringDecoder{b: data}, nil
}
func (e *stringDecoder) Next() bool {
func (e *StringDecoder) Next() bool {
e.i += e.l
return e.i < len(e.b)
}
func (e *stringDecoder) Read() string {
func (e *StringDecoder) Read() string {
// Read the length of the string
length, n := binary.Uvarint(e.b[e.i:])
@ -91,6 +80,6 @@ func (e *stringDecoder) Read() string {
return string(e.b[e.i+n : e.i+n+int(length)])
}
func (e *stringDecoder) Error() error {
func (e *StringDecoder) Error() error {
return e.err
}

View File

@ -148,7 +148,8 @@ type TSMIndex interface {
ContainsValue(key string, timestamp int64) bool
// Entries returns all index entries for a key.
Entries(key string) []*IndexEntry
Entries(key string) []IndexEntry
ReadEntries(key string, entries *[]IndexEntry)
// Entry returns the index entry for the specified key and timestamp. If no entry
// matches the key and timestamp, nil is returned.
@ -158,7 +159,7 @@ type TSMIndex interface {
Keys() []string
// Key returns the key in the index at the given postion.
Key(index int) (string, []*IndexEntry)
Key(index int) (string, []IndexEntry)
// KeyAt returns the key in the index at the given postion.
KeyAt(index int) string
@ -280,7 +281,7 @@ func (d *directIndex) Add(key string, blockType byte, minTime, maxTime int64, of
// size of the count of entries stored in the index
d.size += indexCountSize
}
entries.Append(&IndexEntry{
entries.entries = append(entries.entries, IndexEntry{
MinTime: minTime,
MaxTime: maxTime,
Offset: offset,
@ -291,7 +292,7 @@ func (d *directIndex) Add(key string, blockType byte, minTime, maxTime int64, of
d.size += indexEntrySize
}
func (d *directIndex) Entries(key string) []*IndexEntry {
func (d *directIndex) Entries(key string) []IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
@ -302,6 +303,10 @@ func (d *directIndex) Entries(key string) []*IndexEntry {
return d.blocks[key].entries
}
func (d *directIndex) ReadEntries(key string, entries *[]IndexEntry) {
*entries = d.Entries(key)
}
func (d *directIndex) Entry(key string, t int64) *IndexEntry {
d.mu.RLock()
defer d.mu.RUnlock()
@ -309,7 +314,7 @@ func (d *directIndex) Entry(key string, t int64) *IndexEntry {
entries := d.Entries(key)
for _, entry := range entries {
if entry.Contains(t) {
return entry
return &entry
}
}
return nil
@ -354,7 +359,7 @@ func (d *directIndex) Keys() []string {
return keys
}
func (d *directIndex) Key(idx int) (string, []*IndexEntry) {
func (d *directIndex) Key(idx int) (string, []IndexEntry) {
if idx < 0 || idx >= len(d.blocks) {
return "", nil
}
@ -408,7 +413,7 @@ func (d *directIndex) addEntries(key string, entries *indexEntries) {
d.blocks[key] = entries
return
}
existing.Append(entries.entries...)
existing.entries = append(existing.entries, entries.entries...)
}
func (d *directIndex) Write(w io.Writer) error {
@ -483,13 +488,14 @@ func (d *directIndex) UnmarshalBinary(b []byte) error {
}
pos += n
n, entries, err := readEntries(b[pos:])
var entries indexEntries
n, err = readEntries(b[pos:], &entries)
if err != nil {
return fmt.Errorf("readIndex: read entries error: %v", err)
}
pos += n
d.addEntries(string(key), entries)
d.addEntries(string(key), &entries)
}
return nil
}