feat(tsm1): Add Read<Type>ArrayBlock APIs to TSMReader and mmapAccessor
parent
b260a1dcd3
commit
790639d728
|
@ -43,6 +43,7 @@ import (
|
|||
//go:generate tmpl -data=@file_store.gen.go.tmpldata file_store.gen.go.tmpl
|
||||
//go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl
|
||||
//go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl
|
||||
//go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl
|
||||
|
||||
func init() {
|
||||
tsdb.RegisterEngine("tsm1", NewEngine)
|
||||
|
|
|
@ -46,10 +46,15 @@ type TSMFile interface {
|
|||
// ReadAt returns all the values in the block identified by entry.
|
||||
ReadAt(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
ReadFloatBlockAt(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
|
||||
ReadFloatArrayBlockAt(entry *IndexEntry, values *tsdb.FloatArray) error
|
||||
ReadIntegerBlockAt(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
ReadIntegerArrayBlockAt(entry *IndexEntry, values *tsdb.IntegerArray) error
|
||||
ReadUnsignedBlockAt(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
|
||||
ReadUnsignedArrayBlockAt(entry *IndexEntry, values *tsdb.UnsignedArray) error
|
||||
ReadStringBlockAt(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
|
||||
ReadStringArrayBlockAt(entry *IndexEntry, values *tsdb.StringArray) error
|
||||
ReadBooleanBlockAt(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
ReadBooleanArrayBlockAt(entry *IndexEntry, values *tsdb.BooleanArray) error
|
||||
|
||||
// Entries returns the index entries for all blocks for the given key.
|
||||
Entries(key []byte) []IndexEntry
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func TestNewMergeKeyIterator(t *testing.T) {
|
||||
|
@ -196,3 +197,23 @@ func (*mockTSMFile) ReadStringBlockAt(*IndexEntry, *[]StringValue) ([]StringValu
|
|||
func (*mockTSMFile) ReadBooleanBlockAt(*IndexEntry, *[]BooleanValue) ([]BooleanValue, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadFloatArrayBlockAt(*IndexEntry, *tsdb.FloatArray) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadIntegerArrayBlockAt(*IndexEntry, *tsdb.IntegerArray) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadUnsignedArrayBlockAt(*IndexEntry, *tsdb.UnsignedArray) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadStringArrayBlockAt(*IndexEntry, *tsdb.StringArray) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*mockTSMFile) ReadBooleanArrayBlockAt(*IndexEntry, *tsdb.BooleanArray) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,285 @@
|
|||
// Generated by tmpl
|
||||
// https://github.com/benbjohnson/tmpl
|
||||
//
|
||||
// DO NOT EDIT!
|
||||
// Source: reader.gen.go.tmpl
|
||||
|
||||
package tsm1
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// ReadFloatBlockAt returns the float values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readFloatBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadFloatArrayBlockAt fills vals with the float values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadFloatArrayBlockAt(entry *IndexEntry, vals *tsdb.FloatArray) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.readFloatArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadIntegerBlockAt returns the integer values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readIntegerBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadIntegerArrayBlockAt fills vals with the integer values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadIntegerArrayBlockAt(entry *IndexEntry, vals *tsdb.IntegerArray) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.readIntegerArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadUnsignedBlockAt returns the unsigned values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readUnsignedBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadUnsignedArrayBlockAt fills vals with the unsigned values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadUnsignedArrayBlockAt(entry *IndexEntry, vals *tsdb.UnsignedArray) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.readUnsignedArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadStringBlockAt returns the string values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readStringBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadStringArrayBlockAt fills vals with the string values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadStringArrayBlockAt(entry *IndexEntry, vals *tsdb.StringArray) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.readStringArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readBooleanBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadBooleanArrayBlockAt fills vals with the boolean values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadBooleanArrayBlockAt(entry *IndexEntry, vals *tsdb.BooleanArray) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.readBooleanArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// blockAccessor abstracts a method of accessing blocks from a
|
||||
// TSM file.
|
||||
type blockAccessor interface {
|
||||
init() (*indirectIndex, error)
|
||||
read(key []byte, timestamp int64) ([]Value, error)
|
||||
readAll(key []byte) ([]Value, error)
|
||||
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
|
||||
readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error
|
||||
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error
|
||||
readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
|
||||
readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error
|
||||
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
|
||||
readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error
|
||||
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error
|
||||
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
|
||||
rename(path string) error
|
||||
path() string
|
||||
close() error
|
||||
free() error
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := DecodeFloatArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := DecodeIntegerArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := DecodeUnsignedArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := DecodeStringArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := DecodeBooleanArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
{{range .}}
|
||||
// Read{{.Name}}BlockAt returns the {{.name}} values corresponding to the given index entry.
|
||||
func (t *TSMReader) Read{{.Name}}BlockAt(entry *IndexEntry, vals *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.read{{.Name}}Block(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// Read{{.Name}}ArrayBlockAt fills vals with the {{.name}} values corresponding to the given index entry.
|
||||
func (t *TSMReader) Read{{.Name}}ArrayBlockAt(entry *IndexEntry, vals *tsdb.{{.Name}}Array) error {
|
||||
t.mu.RLock()
|
||||
err := t.accessor.read{{.Name}}ArrayBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
{{end}}
|
||||
|
||||
// blockAccessor abstracts a method of accessing blocks from a
|
||||
// TSM file.
|
||||
type blockAccessor interface {
|
||||
init() (*indirectIndex, error)
|
||||
read(key []byte, timestamp int64) ([]Value, error)
|
||||
readAll(key []byte) ([]Value, error)
|
||||
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
{{- range .}}
|
||||
read{{.Name}}Block(entry *IndexEntry, values *[]{{.Name}}Value) ([]{{.Name}}Value, error)
|
||||
read{{.Name}}ArrayBlock(entry *IndexEntry, values *tsdb.{{.Name}}Array) error
|
||||
{{- end}}
|
||||
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
|
||||
rename(path string) error
|
||||
path() string
|
||||
close() error
|
||||
free() error
|
||||
}
|
||||
|
||||
{{range .}}
|
||||
func (m *mmapAccessor) read{{.Name}}Block(entry *IndexEntry, values *[]{{.Name}}Value) ([]{{.Name}}Value, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := Decode{{.Name}}Block(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) read{{.Name}}ArrayBlock(entry *IndexEntry, values *tsdb.{{.Name}}Array) error {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return ErrTSMClosed
|
||||
}
|
||||
|
||||
err := Decode{{.Name}}ArrayBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
return err
|
||||
}
|
||||
{{end}}
|
|
@ -0,0 +1,22 @@
|
|||
[
|
||||
{
|
||||
"Name":"Float",
|
||||
"name":"float"
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
"name":"integer"
|
||||
},
|
||||
{
|
||||
"Name":"Unsigned",
|
||||
"name":"unsigned"
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
"name":"string"
|
||||
},
|
||||
{
|
||||
"Name":"Boolean",
|
||||
"name":"boolean"
|
||||
}
|
||||
]
|
|
@ -207,25 +207,6 @@ func (b *BlockIterator) Err() error {
|
|||
return b.err
|
||||
}
|
||||
|
||||
// blockAccessor abstracts a method of accessing blocks from a
|
||||
// TSM file.
|
||||
type blockAccessor interface {
|
||||
init() (*indirectIndex, error)
|
||||
read(key []byte, timestamp int64) ([]Value, error)
|
||||
readAll(key []byte) ([]Value, error)
|
||||
readBlock(entry *IndexEntry, values []Value) ([]Value, error)
|
||||
readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error)
|
||||
readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error)
|
||||
readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error)
|
||||
readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error)
|
||||
readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error)
|
||||
readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error)
|
||||
rename(path string) error
|
||||
path() string
|
||||
close() error
|
||||
free() error
|
||||
}
|
||||
|
||||
// NewTSMReader returns a new TSMReader from the given file.
|
||||
func NewTSMReader(f *os.File) (*TSMReader, error) {
|
||||
t := &TSMReader{}
|
||||
|
@ -336,46 +317,6 @@ func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
|
|||
return v, err
|
||||
}
|
||||
|
||||
// ReadFloatBlockAt returns the float values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readFloatBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadIntegerBlockAt returns the integer values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readIntegerBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadUnsignedBlockAt returns the unsigned integer values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadUnsignedBlockAt(entry *IndexEntry, vals *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readUnsignedBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadStringBlockAt returns the string values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readStringBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry.
|
||||
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
|
||||
t.mu.RLock()
|
||||
v, err := t.accessor.readBooleanBlock(entry, vals)
|
||||
t.mu.RUnlock()
|
||||
return v, err
|
||||
}
|
||||
|
||||
// Read returns the values corresponding to the block at the given key and timestamp.
|
||||
func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
|
||||
t.mu.RLock()
|
||||
|
@ -1497,101 +1438,6 @@ func (m *mmapAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, er
|
|||
return values, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeFloatBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeIntegerBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeUnsignedBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeStringBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
|
||||
m.incAccess()
|
||||
|
||||
m.mu.RLock()
|
||||
if int64(len(m.b)) < entry.Offset+int64(entry.Size) {
|
||||
m.mu.RUnlock()
|
||||
return nil, ErrTSMClosed
|
||||
}
|
||||
|
||||
a, err := DecodeBooleanBlock(m.b[entry.Offset+4:entry.Offset+int64(entry.Size)], values)
|
||||
m.mu.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, error) {
|
||||
m.incAccess()
|
||||
|
||||
|
|
Loading…
Reference in New Issue