Update godoc for tsm1 package

pull/7781/head
Mark Rushakoff 2017-01-02 06:58:11 -08:00
parent 4a774eb600
commit 41415cf2fb
16 changed files with 398 additions and 171 deletions

View File

@ -1,3 +1,6 @@
// Package engine can be imported to initialize and register all available TSDB engines.
//
// Alternatively, you can import any individual subpackage underneath engine.
package engine // import "github.com/influxdata/influxdb/tsdb/engine"
import (

View File

@ -47,7 +47,8 @@ func (r *BitReader) ReadBit() (bool, error) {
return v != 0, err
}
// ReadBits reads nbits from the underlying data.
// ReadBits reads nbits from the underlying data into a uint64.
// nbits must be from 1 to 64, inclusive.
func (r *BitReader) ReadBits(nbits uint) (uint64, error) {
// Return EOF if there is no more data.
if r.buf.n == 0 {

View File

@ -41,6 +41,7 @@ func NewBooleanEncoder(sz int) BooleanEncoder {
}
}
// Reset sets the encoder to its initial state.
func (e *BooleanEncoder) Reset() {
e.bytes = e.bytes[:0]
e.b = 0
@ -48,6 +49,7 @@ func (e *BooleanEncoder) Reset() {
e.n = 0
}
// Write encodes b to the underlying buffer.
func (e *BooleanEncoder) Write(b bool) {
// If we have filled the current byte, flush it
if e.i >= 8 {
@ -82,6 +84,7 @@ func (e *BooleanEncoder) flush() {
}
}
// Bytes returns a new byte slice containing the encoded booleans from previous calls to Write.
func (e *BooleanEncoder) Bytes() ([]byte, error) {
// Ensure the current byte is flushed
e.flush()
@ -132,6 +135,9 @@ func (e *BooleanDecoder) SetBytes(b []byte) {
}
}
// Next returns whether there are any bits remaining in the decoder.
// It returns false if there was an error decoding.
// The error is available on the Error method.
func (e *BooleanDecoder) Next() bool {
if e.err != nil {
return false
@ -141,6 +147,7 @@ func (e *BooleanDecoder) Next() bool {
return e.i < e.n
}
// Read returns the next bit from the decoder.
func (e *BooleanDecoder) Read() bool {
// Index into the byte slice
idx := e.i >> 3 // integer division by 8
@ -158,6 +165,7 @@ func (e *BooleanDecoder) Read() bool {
return v&mask == mask
}
// Error returns the error encountered during decoding, if one occurred.
func (e *BooleanDecoder) Error() error {
return e.err
}

View File

@ -21,10 +21,12 @@ import (
const ringShards = 128
var (
ErrCacheInvalidCheckpoint = fmt.Errorf("invalid checkpoint")
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
// ErrSnapshotInProgress is returned if a snapshot is attempted while one is already running.
ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
)
// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation
// could not be completed due to exceeding the cache-max-memory-size setting.
func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit)
}
@ -42,8 +44,8 @@ type entry struct {
// newEntryValues returns a new instance of entry with the given values. If the
// values are not valid, an error is returned.
//
// newEntryValues takes an optional hint, which is only respected if it's
// positive.
// newEntryValues takes an optional hint to indicate the initial buffer size.
// The hint is only respected if it's positive.
func newEntryValues(values []Value, hint int) (*entry, error) {
// Ensure we start off with a reasonably sized values slice.
if hint < 32 {
@ -110,8 +112,8 @@ func (e *entry) add(values []Value) error {
return nil
}
// deduplicate sorts and orders the entry's values. If values are already deduped and
// and sorted, the function does no work and simply returns.
// deduplicate sorts and orders the entry's values. If values are already deduped and sorted,
// the function does no work and simply returns.
func (e *entry) deduplicate() {
e.mu.Lock()
defer e.mu.Unlock()
@ -122,7 +124,7 @@ func (e *entry) deduplicate() {
e.values = e.values.Deduplicate()
}
// count returns number of values for this entry
// count returns the number of values in this entry.
func (e *entry) count() int {
e.mu.RLock()
n := len(e.values)
@ -130,14 +132,14 @@ func (e *entry) count() int {
return n
}
// filter removes all values between min and max inclusive
// filter removes all values with timestamps between min and max inclusive.
func (e *entry) filter(min, max int64) {
e.mu.Lock()
e.values = e.values.Exclude(min, max)
e.mu.Unlock()
}
// size returns the size of this entry in bytes
// size returns the size of this entry in bytes.
func (e *entry) size() int {
e.mu.RLock()
sz := e.values.Size()
@ -191,7 +193,7 @@ type Cache struct {
}
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots
// Only used for engine caches, never for snapshots.
func NewCache(maxSize uint64, path string) *Cache {
store, _ := newring(ringShards)
c := &Cache{
@ -241,7 +243,7 @@ func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
}
// Write writes the set of values for the key to the cache. This function is goroutine-safe.
// It returns an error if the cache will exceeded its max size by adding the new values.
// It returns an error if the cache will exceed its max size by adding the new values.
func (c *Cache) Write(key string, values []Value) error {
addedSize := uint64(Values(values).Size())
@ -318,8 +320,8 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
return werr
}
// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
// are being flushed, and reset the current cache with new values
// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
// are being flushed, and resets the current cache with new values.
func (c *Cache) Snapshot() (*Cache, error) {
c.mu.Lock()
defer c.mu.Unlock()
@ -378,7 +380,7 @@ func (c *Cache) Snapshot() (*Cache, error) {
}
// Deduplicate sorts the snapshot before returning it. The compactor and any queries
// coming in while it writes will need the values sorted
// coming in while it writes will need the values sorted.
func (c *Cache) Deduplicate() {
c.mu.RLock()
store := c.store
@ -389,8 +391,8 @@ func (c *Cache) Deduplicate() {
_ = store.apply(func(_ string, e *entry) error { e.deduplicate(); return nil })
}
// ClearSnapshot will remove the snapshot cache from the list of flushing caches and
// adjust the size
// ClearSnapshot removes the snapshot cache from the list of flushing caches and
// adjusts the size.
func (c *Cache) ClearSnapshot(success bool) {
c.mu.Lock()
defer c.mu.Unlock()
@ -506,13 +508,13 @@ func (c *Cache) Values(key string) Values {
return values
}
// Delete will remove the keys from the cache
// Delete removes all values for the given keys from the cache.
func (c *Cache) Delete(keys []string) {
c.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}
// DeleteRange will remove the values for all keys containing points
// between min and max from the cache.
// DeleteRange removes the values for all keys containing points
// with timestamps between between min and max from the cache.
//
// TODO(edd): Lock usage could possibly be optimised if necessary.
func (c *Cache) DeleteRange(keys []string, min, max int64) {
@ -545,6 +547,7 @@ func (c *Cache) DeleteRange(keys []string, min, max int64) {
atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size()))
}
// SetMaxSize updates the memory limit of the cache.
func (c *Cache) SetMaxSize(size uint64) {
c.mu.Lock()
c.maxSize = size
@ -641,11 +644,12 @@ func (cl *CacheLoader) Load(cache *Cache) error {
return nil
}
// WithLogger sets the logger on the CacheLoader.
func (cl *CacheLoader) WithLogger(log zap.Logger) {
cl.Logger = log.With(zap.String("service", "cacheloader"))
}
// Updates the age statistic
// UpdateAge updates the age statistic based on the current time.
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
@ -653,17 +657,17 @@ func (c *Cache) UpdateAge() {
atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
}
// Updates WAL compaction time statistic
// UpdateCompactTime updates WAL compaction time statistic based on d.
func (c *Cache) UpdateCompactTime(d time.Duration) {
atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
}
// Update the cachedBytes counter
// updateCachedBytes increases the cachedBytes counter by b.
func (c *Cache) updateCachedBytes(b uint64) {
atomic.AddInt64(&c.stats.CachedBytes, int64(b))
}
// Update the memSize level
// updateMemSize updates the memSize level by b.
func (c *Cache) updateMemSize(b int64) {
atomic.AddInt64(&c.stats.MemSizeBytes, b)
}
@ -683,7 +687,7 @@ func valueType(v Value) int {
}
}
// Update the snapshotsCount and the diskSize levels
// updateSnapshots updates the snapshotsCount and the diskSize levels.
func (c *Cache) updateSnapshots() {
// Update disk stats
atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize)))

View File

@ -8,7 +8,7 @@ package tsm1
// smaller TSM files need to be merged to reduce file counts and improve
// compression ratios.
//
// The the compaction process is stream-oriented using multiple readers and
// The compaction process is stream-oriented using multiple readers and
// iterators. The resulting stream is written sorted and chunked to allow for
// one-pass writing of a new TSM file.
@ -27,8 +27,11 @@ import (
const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
const (
// CompactionTempExtension is the extension used for temporary files created during compaction.
CompactionTempExtension = "tmp"
TSMFileExtension = "tsm"
// TSMFileExtension is the extension used for TSM files.
TSMFileExtension = "tsm"
)
var (
@ -39,11 +42,7 @@ var (
errCompactionInProgress = fmt.Errorf("compaction in progress")
)
var (
MaxTime = time.Unix(0, math.MaxInt64)
MinTime = time.Unix(0, 0)
)
// CompactionGroup represents a list of files eligible to be compacted together.
type CompactionGroup []string
// CompactionPlanner determines what TSM files and WAL segments to include in a
@ -90,7 +89,7 @@ type tsmGeneration struct {
files []FileStat
}
// size returns the total size of the generation
// size returns the total size of the files in the generation.
func (t *tsmGeneration) size() uint64 {
var n uint64
for _, f := range t.files {
@ -99,7 +98,7 @@ func (t *tsmGeneration) size() uint64 {
return n
}
// compactionLevel returns the level of the files in this generation
// compactionLevel returns the level of the files in this generation.
func (t *tsmGeneration) level() int {
// Level 0 is always created from the result of a cache compaction. It generates
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
@ -123,12 +122,12 @@ func (t *tsmGeneration) lastModified() int64 {
return max
}
// count return then number of files in the generation
// count returns the number of files in the generation.
func (t *tsmGeneration) count() int {
return len(t.files)
}
// hasTombstones returns true if there a keys removed for any of the files
// hasTombstones returns true if there are keys removed for any of the files.
func (t *tsmGeneration) hasTombstones() bool {
for _, f := range t.files {
if f.HasTombstone {
@ -138,7 +137,7 @@ func (t *tsmGeneration) hasTombstones() bool {
return false
}
// PlanLevel returns a set of TSM files to rewrite for a specific level
// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
@ -415,7 +414,6 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
startIndex++
continue
}
}
if skipGroup {
@ -462,8 +460,8 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
return tsmFiles
}
// findGenerations groups all the TSM files by they generation based
// on their filename then returns the generations in descending order (newest first)
// findGenerations groups all the TSM files by generation based
// on their filename, then returns the generations in descending order (newest first).
func (c *DefaultPlanner) findGenerations() tsmGenerations {
c.mu.RLock()
last := c.lastFindGenerations
@ -507,7 +505,7 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
}
// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files
// writes a Cache into 1 or more TSM files.
type Compactor struct {
Dir string
Size int
@ -523,6 +521,7 @@ type Compactor struct {
files map[string]struct{}
}
// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
defer c.mu.Unlock()
@ -535,6 +534,7 @@ func (c *Compactor) Open() {
c.files = make(map[string]struct{})
}
// Close disables the Compactor.
func (c *Compactor) Close() {
c.mu.Lock()
defer c.mu.Unlock()
@ -545,31 +545,35 @@ func (c *Compactor) Close() {
c.compactionsEnabled = false
}
// DisableSnapshots disables the compactor from performing snapshots.
func (c *Compactor) DisableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = false
c.mu.Unlock()
}
// EnableSnapshots allows the compactor to perform snapshots.
func (c *Compactor) EnableSnapshots() {
c.mu.Lock()
c.snapshotsEnabled = true
c.mu.Unlock()
}
// DisableSnapshots disables the compactor from performing compactions.
func (c *Compactor) DisableCompactions() {
c.mu.Lock()
c.compactionsEnabled = false
c.mu.Unlock()
}
// EnableCompactions allows the compactor to perform compactions.
func (c *Compactor) EnableCompactions() {
c.mu.Lock()
c.compactionsEnabled = true
c.mu.Unlock()
}
// WriteSnapshot will write a Cache snapshot to a new TSM files.
// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
@ -594,7 +598,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
return files, err
}
// Compact will write multiple smaller TSM files into 1 or more larger files
// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
size := c.Size
if size <= 0 {
@ -648,7 +652,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return c.writeNewFiles(maxGeneration, maxSequence, tsm)
}
// Compact will write multiple smaller TSM files into 1 or more larger files
// CompactFull writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
@ -677,7 +681,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
return files, err
}
// Compact will write multiple smaller TSM files into 1 or more larger files
// CompactFast writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
c.mu.RLock()
enabled := c.compactionsEnabled
@ -707,8 +711,8 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
}
// writeNewFiles will write from the iterator into new TSM files, rotating
// to a new file when we've reached the max TSM file size
// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) {
// These are the new TSM files written
var files []string
@ -837,8 +841,14 @@ func (c *Compactor) remove(files []string) {
// KeyIterator allows iteration over set of keys and values in sorted order.
type KeyIterator interface {
// Next returns true if there are any values remaining in the iterator.
Next() bool
Read() (string, int64, int64, []byte, error)
// Read returns the key, time range, and raw data for the next block,
// or any error that occurred.
Read() (key string, minTime int64, maxTime int64, data []byte, err error)
// Close closes the iterator.
Close() error
}
@ -931,6 +941,8 @@ func (a blocks) Less(i, j int) bool {
func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator, error) {
var iter []*BlockIterator
for _, r := range readers {
@ -948,6 +960,7 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
}, nil
}
// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
// Any merged blocks pending?
if len(k.merged) > 0 {
@ -1055,7 +1068,7 @@ func (k *tsmKeyIterator) Next() bool {
return len(k.merged) > 0
}
// merge combines the next set of blocks into merged blocks
// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge() {
// No blocks left, or pending merged values, we're done
if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedValues) == 0 {
@ -1278,6 +1291,7 @@ type cacheKeyIterator struct {
err error
}
// NewCacheKeyIterator returns a new KeyIterator from a Cache.
func NewCacheKeyIterator(cache *Cache, size int) KeyIterator {
keys := cache.Keys()

View File

@ -12,16 +12,16 @@ import (
)
const (
// BlockFloat64 designates a block encodes float64 values
// BlockFloat64 designates a block encodes float64 values.
BlockFloat64 = byte(0)
// BlockInteger designates a block encodes int64 values
// BlockInteger designates a block encodes int64 values.
BlockInteger = byte(1)
// BlockBoolean designates a block encodes boolean values
// BlockBoolean designates a block encodes boolean values.
BlockBoolean = byte(2)
// BlockString designates a block encodes string values
// BlockString designates a block encodes string values.
BlockString = byte(3)
// encodedBlockHeaderSize is the size of the header for an encoded block. There is one
@ -30,7 +30,7 @@ const (
)
func init() {
// Prime the pools with with at one encoder/decoder for each available CPU
// Prime the pools with one encoder/decoder for each available CPU.
vals := make([]interface{}, 0, runtime.NumCPU())
for _, p := range []*pool.Generic{
timeEncoderPool, timeDecoderPool,
@ -54,6 +54,7 @@ func init() {
var (
// encoder pools
timeEncoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
return NewTimeEncoder(sz)
})
@ -71,6 +72,7 @@ var (
})
// decoder pools
timeDecoderPool = pool.NewGeneric(runtime.NumCPU(), func(sz int) interface{} {
return &TimeDecoder{}
})
@ -88,15 +90,26 @@ var (
})
)
// Value represents a TSM-encoded value.
type Value interface {
// UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
UnixNano() int64
// Value returns the underlying value.
Value() interface{}
// Size returns the number of bytes necessary to represent the value and its timestamp.
Size() int
// String returns the string representation of the value and its timestamp.
String() string
// internalOnly is unexported to ensure implementations of Value
// can only originate in this package.
internalOnly()
}
// NewValue returns a new Value with the underlying type dependent on value.
func NewValue(t int64, value interface{}) Value {
switch v := value.(type) {
case int64:
@ -111,28 +124,40 @@ func NewValue(t int64, value interface{}) Value {
return EmptyValue{}
}
// NewIntegerValue returns a new integer value.
func NewIntegerValue(t int64, v int64) Value {
return IntegerValue{unixnano: t, value: v}
}
// NewFloatValue returns a new float value.
func NewFloatValue(t int64, v float64) Value {
return FloatValue{unixnano: t, value: v}
}
// NewBooleanValue returns a new boolean value.
func NewBooleanValue(t int64, v bool) Value {
return BooleanValue{unixnano: t, value: v}
}
// NewStringValue returns a new string value.
func NewStringValue(t int64, v string) Value {
return StringValue{unixnano: t, value: v}
}
// EmptyValue is used when there is no appropriate other value.
type EmptyValue struct{}
func (e EmptyValue) UnixNano() int64 { return tsdb.EOF }
// UnixNano returns tsdb.EOF.
func (e EmptyValue) UnixNano() int64 { return tsdb.EOF }
// Value returns nil.
func (e EmptyValue) Value() interface{} { return nil }
func (e EmptyValue) Size() int { return 0 }
func (e EmptyValue) String() string { return "" }
// Size returns 0.
func (e EmptyValue) Size() int { return 0 }
// String returns the empty string.
func (e EmptyValue) String() string { return "" }
func (_ EmptyValue) internalOnly() {}
func (_ StringValue) internalOnly() {}
@ -193,6 +218,7 @@ func BlockType(block []byte) (byte, error) {
}
}
// BlockCount returns the number of timestamps encoded in block.
func BlockCount(block []byte) int {
if len(block) <= encodedBlockHeaderSize {
panic(fmt.Sprintf("count of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
@ -205,7 +231,7 @@ func BlockCount(block []byte) int {
return CountTimestamps(tb)
}
// DecodeBlock takes a byte array and will decode into values of the appropriate type
// DecodeBlock takes a byte slice and decodes it into values of the appropriate type
// based on the block.
func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
if len(block) <= encodedBlockHeaderSize {
@ -266,23 +292,28 @@ func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
}
}
// FloatValue represents a float64 value.
type FloatValue struct {
unixnano int64
value float64
}
// UnixNano returns the timestamp of the value.
func (f FloatValue) UnixNano() int64 {
return f.unixnano
}
// Value returns the underlying float64 value.
func (f FloatValue) Value() interface{} {
return f.value
}
// Size returns the number of bytes necessary to represent the value and its timestamp.
func (f FloatValue) Size() int {
return 16
}
// String returns the string representation of the value and its timestamp.
func (f FloatValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.value)
}
@ -335,6 +366,8 @@ func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
// DecodeFloatBlock decodes the float block from the byte slice
// and appends the float values to a.
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
@ -394,23 +427,28 @@ func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
return (*a)[:i], err
}
// BooleanValue represents a boolean value.
type BooleanValue struct {
unixnano int64
value bool
}
// Size returns the number of bytes necessary to represent the value and its timestamp.
func (b BooleanValue) Size() int {
return 9
}
// UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
func (b BooleanValue) UnixNano() int64 {
return b.unixnano
}
// Value returns the underlying boolean value.
func (b BooleanValue) Value() interface{} {
return b.value
}
// String returns the string representation of the value and its timestamp.
func (f BooleanValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}
@ -458,6 +496,8 @@ func encodeBooleanBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
// DecodeBooleanBlock decodes the boolean block from the byte slice
// and appends the boolean values to a.
func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error) {
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
@ -513,23 +553,28 @@ func DecodeBooleanBlock(block []byte, a *[]BooleanValue) ([]BooleanValue, error)
return (*a)[:i], err
}
// FloatValue represents an int64 value.
type IntegerValue struct {
unixnano int64
value int64
}
// Value returns the underlying int64 value.
func (v IntegerValue) Value() interface{} {
return v.value
}
// UnixNano returns the timestamp of the value.
func (v IntegerValue) UnixNano() int64 {
return v.unixnano
}
// Size returns the number of bytes necessary to represent the value and its timestamp.
func (v IntegerValue) Size() int {
return 16
}
// String returns the string representation of the value and its timestamp.
func (f IntegerValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}
@ -568,6 +613,8 @@ func encodeIntegerBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
// DecodeIntegerBlock decodes the integer block from the byte slice
// and appends the integer values to a.
func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error) {
blockType := block[0]
if blockType != BlockInteger {
@ -624,23 +671,28 @@ func DecodeIntegerBlock(block []byte, a *[]IntegerValue) ([]IntegerValue, error)
return (*a)[:i], err
}
// StringValue represents a string value.
type StringValue struct {
unixnano int64
value string
}
// Value returns the underlying string value.
func (v StringValue) Value() interface{} {
return v.value
}
// UnixNano returns the timestamp of the value.
func (v StringValue) UnixNano() int64 {
return v.unixnano
}
// Size returns the number of bytes necessary to represent the value and its timestamp.
func (v StringValue) Size() int {
return 8 + len(v.value)
}
// String returns the string representation of the value and its timestamp.
func (f StringValue) String() string {
return fmt.Sprintf("%v %v", time.Unix(0, f.unixnano), f.Value())
}
@ -680,6 +732,8 @@ func encodeStringBlock(buf []byte, values []Value) ([]byte, error) {
return b, err
}
// DecodeStringBlock decodes the string block from the byte slice
// and appends the string values to a.
func DecodeStringBlock(block []byte, a *[]StringValue) ([]StringValue, error) {
blockType := block[0]
if blockType != BlockString {
@ -781,12 +835,12 @@ func unpackBlock(buf []byte) (ts, values []byte, err error) {
}
// ZigZagEncode converts a int64 to a uint64 by zig zagging negative and positive values
// across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]
// across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3].
func ZigZagEncode(x int64) uint64 {
return uint64(uint64(x<<1) ^ uint64((int64(x) >> 63)))
}
// ZigZagDecode converts a previously zigzag encoded uint64 back to a int64
// ZigZagDecode converts a previously zigzag encoded uint64 back to a int64.
func ZigZagDecode(v uint64) int64 {
return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63))
}

View File

@ -1,3 +1,4 @@
// Package tsm1 provides a TSDB in the Time Structured Merge tree format.
package tsm1 // import "github.com/influxdata/influxdb/tsdb/engine/tsm1"
import (
@ -171,6 +172,7 @@ func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) t
return e
}
// SetEnabled sets whether the engine is enabled.
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled
e.SetCompactionsEnabled(enabled)
@ -217,10 +219,10 @@ func (e *Engine) enableLevelCompactions(wait bool) {
go func() { defer e.wg.Done(); e.compactTSMLevel(false, 3, quit) }()
}
// disableLevelCompactions will stop level compactions before returning
// disableLevelCompactions will stop level compactions before returning.
//
// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be
// required before level compactions will start back up again
// required before level compactions will start back up again.
func (e *Engine) disableLevelCompactions(wait bool) {
e.mu.Lock()
old := e.levelWorkers
@ -306,7 +308,7 @@ func (e *Engine) MeasurementFields(measurement string) *tsdb.MeasurementFields {
return m
}
// Format returns the format type of this engine
// Format returns the format type of this engine.
func (e *Engine) Format() tsdb.EngineFormat {
return tsdb.TSM1Format
}
@ -424,6 +426,7 @@ func (e *Engine) Close() error {
return e.WAL.Close()
}
// WithLogger sets the logger for the engine.
func (e *Engine) WithLogger(log zap.Logger) {
e.logger = log.With(zap.String("engine", "tsm1"))
@ -473,7 +476,7 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index *tsdb.DatabaseIndex) er
return nil
}
// Backup will write a tar archive of any TSM files modified since the passed
// Backup writes a tar archive of any TSM files modified since the passed
// in time to the passed in writer. The basePath will be prepended to the names
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
@ -524,8 +527,8 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
return nil
}
// writeFileToBackup will copy the file into the tar archive. Files will use the shardRelativePath
// in their names. This should be the <db>/<retention policy>/<id> part of the path
// writeFileToBackup copies the file into the tar archive. Files will use the shardRelativePath
// in their names. This should be the <db>/<retention policy>/<id> part of the path.
func (e *Engine) writeFileToBackup(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
h := &tar.Header{
Name: filepath.ToSlash(filepath.Join(shardRelativePath, f.Name())),
@ -548,7 +551,7 @@ func (e *Engine) writeFileToBackup(f os.FileInfo, shardRelativePath, fullPath st
return err
}
// Restore will read a tar archive generated by Backup().
// Restore reads a tar archive generated by Backup().
// Only files that match basePath will be copied into the directory. This obtains
// a write lock so no operations can be performed while restoring.
func (e *Engine) Restore(r io.Reader, basePath string) error {
@ -621,8 +624,8 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string) er
return renameFile(tmp, destPath)
}
// addToIndexFromKey will pull the measurement name, series key, and field name from a composite key and add it to the
// database index and measurement fields
// addToIndexFromKey extracts the measurement name, series key, and field name from a composite key, and adds it to the
// database index and measurement fields.
func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxql.DataType, index *tsdb.DatabaseIndex) error {
seriesKey, field := SeriesAndFieldFromCompositeKey(key)
measurement := tsdb.MeasurementFromSeriesKey(string(seriesKey))
@ -662,7 +665,7 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key []byte, fieldType influxq
}
// WritePoints writes metadata and point data into the engine.
// Returns an error if new points are added to an existing key.
// It returns an error if new points are added to an existing key.
func (e *Engine) WritePoints(points []models.Point) error {
values := make(map[string][]Value, len(points))
var keyBuf []byte
@ -819,7 +822,7 @@ func (e *Engine) SeriesCount() (n int, err error) {
return e.index.SeriesN(), nil
}
// LastModified returns the time when this shard was last modified
// LastModified returns the time when this shard was last modified.
func (e *Engine) LastModified() time.Time {
walTime := e.WAL.LastWriteTime()
fsTime := e.FileStore.LastModified()
@ -831,6 +834,7 @@ func (e *Engine) LastModified() time.Time {
return fsTime
}
// WriteTo is not implemented.
func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
@ -886,7 +890,7 @@ func (e *Engine) WriteSnapshot() error {
}
// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files
// temporary hardlinks to the underylyng shard files.
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
return "", err
@ -898,9 +902,8 @@ func (e *Engine) CreateSnapshot() (string, error) {
return e.FileStore.CreateSnapshot()
}
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments
// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (err error) {
defer func() {
if err != nil {
e.Cache.ClearSnapshot(false)
@ -932,7 +935,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
return nil
}
// compactCache continually checks if the WAL cache should be written to disk
// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache(quit <-chan struct{}) {
t := time.NewTimer(time.Second)
defer t.Stop()
@ -961,7 +964,7 @@ func (e *Engine) compactCache(quit <-chan struct{}) {
}
// ShouldCompactCache returns true if the Cache is over its flush threshold
// or if the passed in lastWriteTime is older than the write cold threshold
// or if the passed in lastWriteTime is older than the write cold threshold.
func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
sz := e.Cache.Size()
@ -1221,12 +1224,14 @@ func (e *Engine) cleanupTempTSMFiles() error {
return nil
}
// KeyCursor returns a KeyCursor for the given key starting at time t.
func (e *Engine) KeyCursor(key string, t int64, ascending bool) *KeyCursor {
e.mu.RLock()
defer e.mu.RUnlock()
return e.FileStore.KeyCursor(key, t, ascending)
}
// CreateIterator returns an iterator based on opt.
func (e *Engine) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if call, ok := opt.Expr.(*influxql.Call); ok {
refOpt := opt
@ -1618,7 +1623,7 @@ func (e *Engine) buildBooleanCursor(measurement, seriesKey, field string, opt in
return newBooleanCursor(opt.SeekTime(), opt.Ascending, cacheValues, keyCursor)
}
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
// SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID.
func SeriesFieldKey(seriesKey, field string) string {
return seriesKey + keyFieldSeparator + field
}
@ -1638,6 +1643,7 @@ func tsmFieldTypeToInfluxQLDataType(typ byte) (influxql.DataType, error) {
}
}
// SeriesAndFieldFromCompositeKey returns the series key and the field key extracted from the composite key.
func SeriesAndFieldFromCompositeKey(key []byte) ([]byte, string) {
sep := bytes.Index(key, []byte(keyFieldSeparator))
if sep == -1 {

View File

@ -18,12 +18,13 @@ import (
"go.uber.org/zap"
)
// TSMFile represents an on-disk TSM file.
type TSMFile interface {
// Path returns the underlying file path for the TSMFile. If the file
// has not be written or loaded from disk, the zero value is returned.
Path() string
// Read returns all the values in the block where time t resides
// Read returns all the values in the block where time t resides.
Read(key string, t int64) ([]Value, error)
// ReadAt returns all the values in the block identified by entry.
@ -38,7 +39,7 @@ type TSMFile interface {
ReadEntries(key string, entries *[]IndexEntry)
// Returns true if the TSMFile may contain a value with the specified
// key and time
// key and time.
ContainsValue(key string, t int64) bool
// Contains returns true if the file contains any values for the given
@ -54,10 +55,10 @@ type TSMFile interface {
// KeyRange returns the min and max keys in the file.
KeyRange() (string, string)
// KeyCount returns the number of distict keys in the file.
// KeyCount returns the number of distinct keys in the file.
KeyCount() int
// KeyAt returns the key located at index position idx
// KeyAt returns the key located at index position idx.
KeyAt(idx int) ([]byte, byte)
// Type returns the block type of the values stored for the key. Returns one of
@ -68,7 +69,7 @@ type TSMFile interface {
// Delete removes the keys from the set of keys available in this file.
Delete(keys []string) error
// DeleteRange removes the values for keys between min and max.
// DeleteRange removes the values for keys between timestamps min and max.
DeleteRange(keys []string, min, max int64) error
// HasTombstones returns true if file contains values that have been deleted.
@ -78,7 +79,7 @@ type TSMFile interface {
// written for this file.
TombstoneFiles() []FileStat
// Close the underlying file resources
// Close closes the underlying file resources.
Close() error
// Size returns the size of the file on disk in bytes.
@ -88,23 +89,23 @@ type TSMFile interface {
// file name. Index and Reader state are not re-initialized.
Rename(path string) error
// Remove deletes the file from the filesystem
// Remove deletes the file from the filesystem.
Remove() error
// Returns true if the file is currently in use by queries
// InUse returns true if the file is currently in use by queries.
InUse() bool
// Ref records that this file is actively in use
// Ref records that this file is actively in use.
Ref()
// Unref records that this file is no longer in user
// Unref records that this file is no longer in use.
Unref()
// Stats returns summary information about the TSM file.
Stats() FileStat
// BlockIterator returns an iterator pointing to the first block in the file and
// allows sequential iteration to each every block.
// allows sequential iteration to each and every block.
BlockIterator() *BlockIterator
// Removes mmap references held by another object.
@ -121,6 +122,7 @@ const (
statFileStoreCount = "numFiles"
)
// FileStore is an abstraction around multiple TSM files.
type FileStore struct {
mu sync.RWMutex
lastModified time.Time
@ -146,6 +148,7 @@ type FileStore struct {
dereferencer dereferencer
}
// FileStat holds information about a TSM file on disk.
type FileStat struct {
Path string
HasTombstone bool
@ -155,18 +158,22 @@ type FileStat struct {
MinKey, MaxKey string
}
// OverlapsTimeRange returns true if the time range of the file intersect min and max.
func (f FileStat) OverlapsTimeRange(min, max int64) bool {
return f.MinTime <= max && f.MaxTime >= min
}
// OverlapsKeyRange returns true if the min and max keys of the file overlap the arguments min and max.
func (f FileStat) OverlapsKeyRange(min, max string) bool {
return min != "" && max != "" && f.MinKey <= max && f.MaxKey >= min
}
// ContainsKey returns true if the min and max keys of the file overlap the arguments min and max.
func (f FileStat) ContainsKey(key string) bool {
return f.MinKey >= key || key <= f.MaxKey
}
// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string) *FileStore {
logger := zap.New(zap.NullEncoder())
fs := &FileStore{
@ -192,6 +199,7 @@ func (f *FileStore) enableTraceLogging(enabled bool) {
}
}
// WithLogger sets the logger on the file store.
func (f *FileStore) WithLogger(log zap.Logger) {
f.logger = log.With(zap.String("service", "filestore"))
f.purger.logger = f.logger
@ -219,28 +227,28 @@ func (f *FileStore) Statistics(tags map[string]string) []models.Statistic {
}}
}
// Returns the number of TSM files currently loaded
// Count returns the number of TSM files currently loaded.
func (f *FileStore) Count() int {
f.mu.RLock()
defer f.mu.RUnlock()
return len(f.files)
}
// Files returns TSM files currently loaded.
// Files returns the slice of TSM files currently loaded.
func (f *FileStore) Files() []TSMFile {
f.mu.RLock()
defer f.mu.RUnlock()
return f.files
}
// CurrentGeneration returns the current generation of the TSM files
// CurrentGeneration returns the current generation of the TSM files.
func (f *FileStore) CurrentGeneration() int {
f.mu.RLock()
defer f.mu.RUnlock()
return f.currentGeneration
}
// NextGeneration returns the max file ID + 1
// NextGeneration increments the max file ID and returns the new value.
func (f *FileStore) NextGeneration() int {
f.mu.Lock()
defer f.mu.Unlock()
@ -248,6 +256,7 @@ func (f *FileStore) NextGeneration() int {
return f.currentGeneration
}
// Add adds the given files to the file store's list of files.
func (f *FileStore) Add(files ...TSMFile) {
f.mu.Lock()
defer f.mu.Unlock()
@ -305,7 +314,7 @@ func (f *FileStore) WalkKeys(fn func(key []byte, typ byte) error) error {
return nil
}
// Keys returns all keys and types for all files
// Keys returns all keys and types for all files in the file store.
func (f *FileStore) Keys() map[string]byte {
f.mu.RLock()
defer f.mu.RUnlock()
@ -321,6 +330,7 @@ func (f *FileStore) Keys() map[string]byte {
return uniqueKeys
}
// Type returns the type of values store at the block for key.
func (f *FileStore) Type(key string) (byte, error) {
f.mu.RLock()
defer f.mu.RUnlock()
@ -333,11 +343,12 @@ func (f *FileStore) Type(key string) (byte, error) {
return 0, fmt.Errorf("unknown type for %v", key)
}
// Delete removes the keys from the set of keys available in this file.
func (f *FileStore) Delete(keys []string) error {
return f.DeleteRange(keys, math.MinInt64, math.MaxInt64)
}
// DeleteRange removes the values for keys between min and max.
// DeleteRange removes the values for keys between timestamps min and max.
func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
f.mu.Lock()
f.lastModified = time.Now().UTC()
@ -348,6 +359,7 @@ func (f *FileStore) DeleteRange(keys []string, min, max int64) error {
})
}
// Open loads all the TSM files in the configured directory.
func (f *FileStore) Open() error {
f.mu.Lock()
defer f.mu.Unlock()
@ -440,6 +452,7 @@ func (f *FileStore) Open() error {
return nil
}
// Close closes the file store.
func (f *FileStore) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
@ -457,6 +470,8 @@ func (f *FileStore) Close() error {
return nil
}
// Read returns the slice of values for the given key and the given timestamp,
// if any file matches those constraints.
func (f *FileStore) Read(key string, t int64) ([]Value, error) {
f.mu.RLock()
defer f.mu.RUnlock()
@ -480,12 +495,14 @@ func (f *FileStore) Read(key string, t int64) ([]Value, error) {
return nil, nil
}
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
func (f *FileStore) KeyCursor(key string, t int64, ascending bool) *KeyCursor {
f.mu.RLock()
defer f.mu.RUnlock()
return newKeyCursor(f, key, t, ascending)
}
// Stats returns the stats of the underlying files, preferring the cached version if it is still valid.
func (f *FileStore) Stats() []FileStat {
f.mu.RLock()
if len(f.lastFileStats) > 0 {
@ -511,6 +528,7 @@ func (f *FileStore) Stats() []FileStat {
return f.lastFileStats
}
// Replace replaces oldFiles with newFiles.
func (f *FileStore) Replace(oldFiles, newFiles []string) error {
if len(oldFiles) == 0 && len(newFiles) == 0 {
return nil
@ -651,7 +669,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
}
// LastModified returns the last time the file store was updated with new
// TSM files or a delete
// TSM files or a delete.
func (f *FileStore) LastModified() time.Time {
f.mu.RLock()
defer f.mu.RUnlock()
@ -686,7 +704,7 @@ func (f *FileStore) BlockCount(path string, idx int) int {
return 0
}
// walkFiles calls fn for every files in filestore in parallel
// walkFiles calls fn for each file in filestore in parallel.
func (f *FileStore) walkFiles(fn func(f TSMFile) error) error {
// Copy the current TSM files to prevent a slow walker from
// blocking other operations.
@ -792,8 +810,8 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
return locations
}
// CreateSnapshot will create hardlinks for all tsm and tombstone files
// in the path provided
// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot() (string, error) {
f.traceLogger.Info(fmt.Sprintf("Creating snapshot in %s", f.dir))
files := f.Files()
@ -857,6 +875,7 @@ func ParseTSMFileName(name string) (int, int, error) {
return int(generation), int(sequence), nil
}
// KeyCursor allows iteration through keys in a set of files within a FileStore.
type KeyCursor struct {
key string
fs *FileStore

View File

@ -30,7 +30,7 @@ const (
// uvnan is the constant returned from math.NaN().
const uvnan = 0x7FF8000000000001
// FloatEncoder encodes multiple float64s into a byte slice
// FloatEncoder encodes multiple float64s into a byte slice.
type FloatEncoder struct {
val float64
err error
@ -45,6 +45,7 @@ type FloatEncoder struct {
finished bool
}
// NewFloatEncoder returns a new FloatEncoder.
func NewFloatEncoder() *FloatEncoder {
s := FloatEncoder{
first: true,
@ -55,9 +56,9 @@ func NewFloatEncoder() *FloatEncoder {
s.buf.WriteByte(floatCompressedGorilla << 4)
return &s
}
// Reset sets the encoder back to its initial state.
func (s *FloatEncoder) Reset() {
s.val = 0
s.err = nil
@ -72,10 +73,12 @@ func (s *FloatEncoder) Reset() {
s.first = true
}
// Bytes returns a copy of the underlying byte buffer used in the encoder.
func (s *FloatEncoder) Bytes() ([]byte, error) {
return s.buf.Bytes(), s.err
}
// Finish indicates there are no more values to encode.
func (s *FloatEncoder) Finish() {
if !s.finished {
// write an end-of-stream record
@ -85,6 +88,7 @@ func (s *FloatEncoder) Finish() {
}
}
// Push encodes v to the underlying buffer.
func (s *FloatEncoder) Push(v float64) {
// Only allow NaN as a sentinel value
if math.IsNaN(v) && !s.finished {
@ -139,7 +143,7 @@ func (s *FloatEncoder) Push(v float64) {
s.val = v
}
// FloatDecoder decodes a byte slice into multipe float64 values
// FloatDecoder decodes a byte slice into multiple float64 values.
type FloatDecoder struct {
val uint64
@ -184,6 +188,7 @@ func (it *FloatDecoder) SetBytes(b []byte) error {
return nil
}
// Next returns true if there are remaining values to read.
func (it *FloatDecoder) Next() bool {
if it.err != nil || it.finished {
return false
@ -269,10 +274,12 @@ func (it *FloatDecoder) Next() bool {
return true
}
// Values returns the current float64 value.
func (it *FloatDecoder) Values() float64 {
return math.Float64frombits(it.val)
}
// Error returns the current decoding error.
func (it *FloatDecoder) Error() error {
return it.err
}

View File

@ -8,10 +8,10 @@ package tsm1
// https://developers.google.com/protocol-buffers/docs/encoding?hl=en#signed-integers
// for more information.
//
// If all the zig zag encoded values less than 1 << 60 - 1, they are compressed using
// simple8b encoding. If any values is larger than 1 << 60 - 1, the values are stored uncompressed.
// If all the zig zag encoded values are less than 1 << 60 - 1, they are compressed using
// simple8b encoding. If any value is larger than 1 << 60 - 1, the values are stored uncompressed.
//
// Each encoded byte slice, contains a 1 byte header followed by multiple 8 byte packed integers
// Each encoded byte slice contains a 1 byte header followed by multiple 8 byte packed integers
// or 8 byte uncompressed integers. The 4 high bits of the first byte indicate the encoding type
// for the remaining bytes.
//
@ -36,13 +36,14 @@ const (
intCompressedRLE = 2
)
// IntegerEncoder encoders int64 into byte slices
// IntegerEncoder encodes int64s into byte slices.
type IntegerEncoder struct {
prev int64
rle bool
values []uint64
}
// NewIntegerEncoder returns a new integer encoder with an initial buffer of values sized at sz.
func NewIntegerEncoder(sz int) IntegerEncoder {
return IntegerEncoder{
rle: true,
@ -50,12 +51,14 @@ func NewIntegerEncoder(sz int) IntegerEncoder {
}
}
// Reset sets the encoder back to its initial state.
func (e *IntegerEncoder) Reset() {
e.prev = 0
e.rle = true
e.values = e.values[:0]
}
// Write encodes v to the underlying buffers.
func (e *IntegerEncoder) Write(v int64) {
// Delta-encode each value as it's written. This happens before
// ZigZagEncoding because the deltas could be negative.
@ -69,8 +72,9 @@ func (e *IntegerEncoder) Write(v int64) {
e.values = append(e.values, enc)
}
// Bytes returns a copy of the underlying buffer.
func (e *IntegerEncoder) Bytes() ([]byte, error) {
// Only run-length encode if it could be reduce storage size
// Only run-length encode if it could reduce storage size.
if e.rle && len(e.values) > 2 {
return e.encodeRLE()
}
@ -165,6 +169,7 @@ type IntegerDecoder struct {
err error
}
// SetBytes sets the underlying byte slice of the decoder.
func (d *IntegerDecoder) SetBytes(b []byte) {
if len(b) > 0 {
d.encoding = b[0] >> 4
@ -184,6 +189,7 @@ func (d *IntegerDecoder) SetBytes(b []byte) {
d.err = nil
}
// Next returns true if there are any values remaining to be decoded.
func (d *IntegerDecoder) Next() bool {
if d.i >= d.n && len(d.bytes) == 0 {
return false
@ -206,10 +212,12 @@ func (d *IntegerDecoder) Next() bool {
return d.err == nil && d.i < d.n
}
// Error returns the last error encountered by the decoder.
func (d *IntegerDecoder) Error() error {
return d.err
}
// Read returns the next value from the decoder.
func (d *IntegerDecoder) Read() int64 {
switch d.encoding {
case intCompressedRLE:
@ -220,7 +228,6 @@ func (d *IntegerDecoder) Read() int64 {
v = v + d.prev
d.prev = v
return v
}
}

View File

@ -12,15 +12,17 @@ import (
"sync/atomic"
)
// ErrFileInUse is returned when attempting to remove or close a TSM file that is still being used.
var ErrFileInUse = fmt.Errorf("file still in use")
// TSMReader is a reader for a TSM file.
type TSMReader struct {
// refs is the count of active references to this reader
// refs is the count of active references to this reader.
refs int64
mu sync.RWMutex
// accessor provides access and decoding of blocks for the reader
// accessor provides access and decoding of blocks for the reader.
accessor blockAccessor
// index is the index of all blocks.
@ -39,7 +41,6 @@ type TSMReader struct {
// TSMIndex represent the index section of a TSM file. The index records all
// blocks, their locations, sizes, min and max times.
type TSMIndex interface {
// Delete removes the given keys from the index.
Delete(keys []string)
@ -49,9 +50,9 @@ type TSMIndex interface {
// Contains return true if the given key exists in the index.
Contains(key string) bool
// ContainsValue returns true if key and time might exists in this file. This function could
// ContainsValue returns true if key and time might exist in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exists in this file, but not have point exactly at time t.
// exist in this file, but not have a point exactly at time t.
ContainsValue(key string, timestamp int64) bool
// Entries returns all index entries for a key.
@ -64,16 +65,16 @@ type TSMIndex interface {
// matches the key and timestamp, nil is returned.
Entry(key string, timestamp int64) *IndexEntry
// Key returns the key in the index at the given postion.
// Key returns the key in the index at the given position.
Key(index int) (string, []IndexEntry)
// KeyAt returns the key in the index at the given postion.
// KeyAt returns the key in the index at the given position.
KeyAt(index int) ([]byte, byte)
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Size returns the size of a the current index in bytes
// Size returns the size of the current index in bytes.
Size() uint32
// TimeRange returns the min and max time across all keys in the file.
@ -111,6 +112,7 @@ type BlockIterator struct {
err error
}
// PeekNext returns the next key to be iterated or an empty string.
func (b *BlockIterator) PeekNext() string {
if len(b.entries) > 1 {
return b.key
@ -121,6 +123,7 @@ func (b *BlockIterator) PeekNext() string {
return ""
}
// Next returns true if there are more blocks to iterate through.
func (b *BlockIterator) Next() bool {
if b.n-b.i == 0 && len(b.entries) == 0 {
return false
@ -145,11 +148,12 @@ func (b *BlockIterator) Next() bool {
return false
}
func (b *BlockIterator) Read() (string, int64, int64, uint32, []byte, error) {
// Read reads information about the next block to be iterated.
func (b *BlockIterator) Read() (key string, minTime int64, maxTime int64, checksum uint32, buf []byte, err error) {
if b.err != nil {
return "", 0, 0, 0, nil, b.err
}
checksum, buf, err := b.r.readBytes(&b.entries[0], nil)
checksum, buf, err = b.r.readBytes(&b.entries[0], nil)
if err != nil {
return "", 0, 0, 0, nil, err
}
@ -173,6 +177,7 @@ type blockAccessor interface {
close() error
}
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File) (*TSMReader, error) {
t := &TSMReader{}
@ -231,6 +236,7 @@ func (t *TSMReader) applyTombstones() error {
return nil
}
// Path returns the path of the file the TSMReader was initialized with.
func (t *TSMReader) Path() string {
t.mu.RLock()
p := t.accessor.path()
@ -238,6 +244,7 @@ func (t *TSMReader) Path() string {
return p
}
// Key returns the key and the underlying entry at the numeric index.
func (t *TSMReader) Key(index int) (string, []IndexEntry) {
return t.index.Key(index)
}
@ -247,6 +254,7 @@ func (t *TSMReader) KeyAt(idx int) ([]byte, byte) {
return t.index.KeyAt(idx)
}
// ReadAt returns the values corresponding to the given index entry.
func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.readBlock(entry, vals)
@ -254,6 +262,7 @@ func (t *TSMReader) ReadAt(entry *IndexEntry, vals []Value) ([]Value, error) {
return v, err
}
// ReadFloatBlockAt returns the float values corresponding to the given index entry.
func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]FloatValue, error) {
t.mu.RLock()
v, err := t.accessor.readFloatBlock(entry, vals)
@ -261,6 +270,7 @@ func (t *TSMReader) ReadFloatBlockAt(entry *IndexEntry, vals *[]FloatValue) ([]F
return v, err
}
// ReadIntegerBlockAt returns the integer values corresponding to the given index entry.
func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue) ([]IntegerValue, error) {
t.mu.RLock()
v, err := t.accessor.readIntegerBlock(entry, vals)
@ -268,6 +278,7 @@ func (t *TSMReader) ReadIntegerBlockAt(entry *IndexEntry, vals *[]IntegerValue)
return v, err
}
// ReadStringBlockAt returns the string values corresponding to the given index entry.
func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([]StringValue, error) {
t.mu.RLock()
v, err := t.accessor.readStringBlock(entry, vals)
@ -275,6 +286,7 @@ func (t *TSMReader) ReadStringBlockAt(entry *IndexEntry, vals *[]StringValue) ([
return v, err
}
// ReadBooleanBlockAt returns the boolean values corresponding to the given index entry.
func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue) ([]BooleanValue, error) {
t.mu.RLock()
v, err := t.accessor.readBooleanBlock(entry, vals)
@ -282,6 +294,7 @@ func (t *TSMReader) ReadBooleanBlockAt(entry *IndexEntry, vals *[]BooleanValue)
return v, err
}
// Read returns the values corresponding to the block at the given key and timestamp.
func (t *TSMReader) Read(key string, timestamp int64) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.read(key, timestamp)
@ -304,10 +317,12 @@ func (t *TSMReader) readBytes(e *IndexEntry, b []byte) (uint32, []byte, error) {
return n, v, err
}
// Type returns the type of values stored at the given key.
func (t *TSMReader) Type(key string) (byte, error) {
return t.index.Type(key)
}
// Close closes the TSMReader.
func (t *TSMReader) Close() error {
t.mu.Lock()
defer t.mu.Unlock()
@ -337,6 +352,7 @@ func (t *TSMReader) Unref() {
atomic.AddInt64(&t.refs, -1)
}
// InUse returns whether the TSMReader currently has any active references.
func (t *TSMReader) InUse() bool {
refs := atomic.LoadInt64(&t.refs)
return refs > 0
@ -349,6 +365,7 @@ func (t *TSMReader) Remove() error {
return t.remove()
}
// Rename renames the underlying file to the new path.
func (t *TSMReader) Rename(path string) error {
t.mu.Lock()
defer t.mu.Unlock()
@ -373,18 +390,19 @@ func (t *TSMReader) remove() error {
return nil
}
// Contains returns whether the given key is present in the index.
func (t *TSMReader) Contains(key string) bool {
return t.index.Contains(key)
}
// ContainsValue returns true if key and time might exists in this file. This function could
// return true even though the actual point does not exists. For example, the key may
// exists in this file, but not have point exactly at time t.
// return true even though the actual point does not exist. For example, the key may
// exist in this file, but not have a point exactly at time t.
func (t *TSMReader) ContainsValue(key string, ts int64) bool {
return t.index.ContainsValue(key, ts)
}
// DeleteRange removes the given points for keys between minTime and maxTime
// DeleteRange removes the given points for keys between minTime and maxTime.
func (t *TSMReader) DeleteRange(keys []string, minTime, maxTime int64) error {
if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
return err
@ -394,6 +412,7 @@ func (t *TSMReader) DeleteRange(keys []string, minTime, maxTime int64) error {
return nil
}
// Delete deletes blocks indicated by keys.
func (t *TSMReader) Delete(keys []string) error {
if err := t.tombstoner.Add(keys); err != nil {
return err
@ -413,22 +432,27 @@ func (t *TSMReader) KeyRange() (string, string) {
return t.index.KeyRange()
}
// KeyCount returns the count of unique keys in the TSMReader.
func (t *TSMReader) KeyCount() int {
return t.index.KeyCount()
}
// Entries returns all index entries for key.
func (t *TSMReader) Entries(key string) []IndexEntry {
return t.index.Entries(key)
}
// ReadEntries reads the index entries for key into entries.
func (t *TSMReader) ReadEntries(key string, entries *[]IndexEntry) {
t.index.ReadEntries(key, entries)
}
// IndexSize returns the size of the index in bytes.
func (t *TSMReader) IndexSize() uint32 {
return t.index.Size()
}
// Size returns the size of the underlying file in bytes.
func (t *TSMReader) Size() uint32 {
t.mu.RLock()
size := t.size
@ -436,6 +460,7 @@ func (t *TSMReader) Size() uint32 {
return uint32(size)
}
// LastModified returns the last time the underlying file was modified.
func (t *TSMReader) LastModified() int64 {
t.mu.RLock()
lm := t.lastModified
@ -467,6 +492,7 @@ func (t *TSMReader) TombstoneRange(key string) []TimeRange {
return tr
}
// Stats returns the FileStat for the TSMReader's underlying file.
func (t *TSMReader) Stats() FileStat {
minTime, maxTime := t.index.TimeRange()
minKey, maxKey := t.index.KeyRange()
@ -482,6 +508,7 @@ func (t *TSMReader) Stats() FileStat {
}
}
// BlockIterator returns a BlockIterator for the underlying TSM file.
func (t *TSMReader) BlockIterator() *BlockIterator {
return &BlockIterator{
r: t,
@ -555,10 +582,12 @@ type indirectIndex struct {
tombstones map[string][]TimeRange
}
// TimeRange holds a min and max timestamp.
type TimeRange struct {
Min, Max int64
}
// NewIndirectIndex returns a new indirect index.
func NewIndirectIndex() *indirectIndex {
return &indirectIndex{
tombstones: make(map[string][]TimeRange),
@ -655,6 +684,7 @@ func (d *indirectIndex) Entry(key string, timestamp int64) *IndexEntry {
return nil
}
// Key returns the key in the index at the given position.
func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -674,6 +704,7 @@ func (d *indirectIndex) Key(idx int) (string, []IndexEntry) {
return string(key), entries.entries
}
// KeyAt returns the key in the index at the given position.
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -685,6 +716,7 @@ func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
return key, d.b[d.offsets[idx]+int32(n)]
}
// KeyCount returns the count of unique keys in the index.
func (d *indirectIndex) KeyCount() int {
d.mu.RLock()
defer d.mu.RUnlock()
@ -692,6 +724,7 @@ func (d *indirectIndex) KeyCount() int {
return len(d.offsets)
}
// Delete removes the given keys from the index.
func (d *indirectIndex) Delete(keys []string) {
if len(keys) == 0 {
return
@ -717,6 +750,7 @@ func (d *indirectIndex) Delete(keys []string) {
d.offsets = offsets
}
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
func (d *indirectIndex) DeleteRange(keys []string, minTime, maxTime int64) {
// No keys, nothing to do
if len(keys) == 0 {
@ -738,7 +772,6 @@ func (d *indirectIndex) DeleteRange(keys []string, minTime, maxTime int64) {
tombstones := map[string][]TimeRange{}
for _, k := range keys {
// Is the range passed in outside the time range for this key?
entries := d.Entries(k)
@ -770,9 +803,9 @@ func (d *indirectIndex) DeleteRange(keys []string, minTime, maxTime int64) {
d.tombstones[k] = append(d.tombstones[k], v...)
}
d.mu.Unlock()
}
// TombstoneRange returns ranges of time that are deleted for the given key.
func (d *indirectIndex) TombstoneRange(key string) []TimeRange {
d.mu.RLock()
r := d.tombstones[key]
@ -780,10 +813,12 @@ func (d *indirectIndex) TombstoneRange(key string) []TimeRange {
return r
}
// Contains return true if the given key exists in the index.
func (d *indirectIndex) Contains(key string) bool {
return len(d.Entries(key)) > 0
}
// ContainsValue returns true if key and time might exist in this file.
func (d *indirectIndex) ContainsValue(key string, timestamp int64) bool {
entry := d.Entry(key, timestamp)
if entry == nil {
@ -802,6 +837,7 @@ func (d *indirectIndex) ContainsValue(key string, timestamp int64) bool {
return true
}
// Type returns the block type of the values stored for the key.
func (d *indirectIndex) Type(key string) (byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -820,10 +856,12 @@ func (d *indirectIndex) Type(key string) (byte, error) {
return 0, fmt.Errorf("key does not exist: %v", key)
}
// KeyRange returns the min and max keys in the index.
func (d *indirectIndex) KeyRange() (string, string) {
return d.minKey, d.maxKey
}
// TimeRange returns the min and max time across all keys in the index.
func (d *indirectIndex) TimeRange() (int64, int64) {
return d.minTime, d.maxTime
}
@ -918,6 +956,7 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
return nil
}
// Size returns the size of the current index in bytes.
func (d *indirectIndex) Size() uint32 {
d.mu.RLock()
defer d.mu.RUnlock()
@ -1125,7 +1164,7 @@ func (m *mmapAccessor) readBytes(entry *IndexEntry, b []byte) (uint32, []byte, e
return binary.BigEndian.Uint32(m.b[entry.Offset : entry.Offset+4]), m.b[entry.Offset+4 : entry.Offset+int64(entry.Size)], nil
}
// ReadAll returns all values for a key in all blocks.
// readAll returns all values for a key in all blocks.
func (m *mmapAccessor) readAll(key string) ([]Value, error) {
blocks := m.index.Entries(key)
if len(blocks) == 0 {

View File

@ -21,21 +21,25 @@ const (
stringCompressedSnappy = 1
)
// StringEncoder encodes multiple strings into a byte slice.
type StringEncoder struct {
// The encoded bytes
bytes []byte
}
// NewStringEncoder returns a new StringEncoder with an initial buffer ready to hold sz bytes.
func NewStringEncoder(sz int) StringEncoder {
return StringEncoder{
bytes: make([]byte, 0, sz),
}
}
// Reset sets the encoder back to its initial state.
func (e *StringEncoder) Reset() {
e.bytes = e.bytes[:0]
}
// Write encodes s to the underlying buffer.
func (e *StringEncoder) Write(s string) {
b := make([]byte, 10)
// Append the length of the string using variable byte encoding
@ -46,6 +50,7 @@ func (e *StringEncoder) Write(s string) {
e.bytes = append(e.bytes, s...)
}
// Bytes returns a copy of the underlying buffer.
func (e *StringEncoder) Bytes() ([]byte, error) {
// Compress the currently appended bytes using snappy and prefix with
// a 1 byte header for future extension
@ -53,6 +58,7 @@ func (e *StringEncoder) Bytes() ([]byte, error) {
return append([]byte{stringCompressedSnappy << 4}, data...), nil
}
// StringDecoder decodes a byte slice into strings.
type StringDecoder struct {
b []byte
l int
@ -82,6 +88,7 @@ func (e *StringDecoder) SetBytes(b []byte) error {
return nil
}
// Next returns true if there are any values remaining to be decoded.
func (e *StringDecoder) Next() bool {
if e.err != nil {
return false
@ -91,6 +98,7 @@ func (e *StringDecoder) Next() bool {
return e.i < len(e.b)
}
// Read returns the next value from the decoder.
func (e *StringDecoder) Read() string {
// Read the length of the string
length, n := binary.Uvarint(e.b[e.i:])
@ -116,6 +124,7 @@ func (e *StringDecoder) Read() string {
return string(e.b[lower:upper])
}
// Error returns the last error encountered by the decoder.
func (e *StringDecoder) Error() error {
return e.err
}

View File

@ -5,7 +5,7 @@ package tsm1
// as well as falling back to no compression if needed.
//
// Timestamp values to be encoded should be sorted before encoding. When encoded, the values are
// first delta-encoded. The first value is the starting timestamp, subsequent values are the difference.
// first delta-encoded. The first value is the starting timestamp, subsequent values are the difference
// from the prior value.
//
// Timestamp resolution can also be in the nanosecond. Many timestamps are monotonically increasing
@ -16,11 +16,11 @@ package tsm1
//
// Using these adjusted values, if all the deltas are the same, the time range is stored using run
// length encoding. If run length encoding is not possible and all values are less than 1 << 60 - 1
// (~36.5 yrs in nanosecond resolution), then the timestamps are encoded using simple8b encoding. If
// (~36.5 yrs in nanosecond resolution), then the timestamps are encoded using simple8b encoding. If
// any value exceeds the maximum values, the deltas are stored uncompressed using 8b each.
//
// Each compressed byte slice has a 1 byte header indicating the compression type. The 4 high bits
// indicated the encoding type. The 4 low bits are used by the encoding type.
// indicate the encoding type. The 4 low bits are used by the encoding type.
//
// For run-length encoding, the 4 low bits store the log10 of the scaling factor. The next 8 bytes are
// the starting timestamp, next 1-10 bytes is the delta value using variable-length encoding, finally the
@ -62,7 +62,7 @@ type encoder struct {
enc *simple8b.Encoder
}
// NewTimeEncoder returns a TimeEncoder
// NewTimeEncoder returns a TimeEncoder with an initial buffer ready to hold sz bytes.
func NewTimeEncoder(sz int) TimeEncoder {
return &encoder{
ts: make([]uint64, 0, sz),
@ -70,13 +70,14 @@ func NewTimeEncoder(sz int) TimeEncoder {
}
}
// Reset sets the encoder back to its initial state.
func (e *encoder) Reset() {
e.ts = e.ts[:0]
e.bytes = e.bytes[:0]
e.enc.Reset()
}
// Write adds a time.Time to the compressed stream.
// Write adds a timestamp to the compressed stream.
func (e *encoder) Write(t int64) {
e.ts = append(e.ts, uint64(t))
}
@ -206,6 +207,7 @@ func (e *encoder) encodeRLE(first, delta, div uint64, n int) ([]byte, error) {
return b[:i], nil
}
// TimeDecoder decodes a byte slice into timestamps.
type TimeDecoder struct {
v int64
i, n int
@ -219,6 +221,7 @@ type TimeDecoder struct {
encoding byte
}
// Init initializes the decoder with bytes to read from.
func (d *TimeDecoder) Init(b []byte) {
d.v = 0
d.i = 0
@ -231,6 +234,7 @@ func (d *TimeDecoder) Init(b []byte) {
d.decode(b)
}
// Next returns true if there are any timestamps remaining to be decoded.
func (d *TimeDecoder) Next() bool {
if d.err != nil {
return false
@ -253,10 +257,12 @@ func (d *TimeDecoder) Next() bool {
return true
}
// Read returns the next timestamp from the decoder.
func (d *TimeDecoder) Read() int64 {
return d.v
}
// Error returns the last error encountered by the decoder.
func (d *TimeDecoder) Error() error {
return d.err
}

View File

@ -16,6 +16,7 @@ const (
v2headerSize = 4
)
// Tombstoner records tombstones when entries are deleted.
type Tombstoner struct {
mu sync.RWMutex
@ -30,16 +31,17 @@ type Tombstoner struct {
statsLoaded bool
}
// Tombstone represents an individual deletion.
type Tombstone struct {
// Key is the tombstoned series key
// Key is the tombstoned series key.
Key string
// Min and Max are the min and max unix nanosecond time ranges of Key that are deleted. If
// the full range is deleted, both values are -1
// the full range is deleted, both values are -1.
Min, Max int64
}
// Add add the all keys to the tombstone
// Add adds the all keys, across all timestamps, to the tombstone.
func (t *Tombstoner) Add(keys []string) error {
return t.AddRange(keys, math.MinInt64, math.MaxInt64)
}
@ -77,10 +79,12 @@ func (t *Tombstoner) AddRange(keys []string, min, max int64) error {
return t.writeTombstone(tombstones)
}
// ReadAll returns all the tombstones in the Tombstoner's directory.
func (t *Tombstoner) ReadAll() ([]Tombstone, error) {
return t.readTombstone()
}
// Delete removes all the tombstone files from disk.
func (t *Tombstoner) Delete() error {
t.mu.Lock()
defer t.mu.Unlock()
@ -97,7 +101,7 @@ func (t *Tombstoner) HasTombstones() bool {
return len(files) > 0 && files[0].Size > 0
}
// TombstoneFiles returns any tombstone files associated with this TSM file.
// TombstoneFiles returns any tombstone files associated with Tombstoner's TSM file.
func (t *Tombstoner) TombstoneFiles() []FileStat {
t.mu.RLock()
if t.statsLoaded {
@ -131,6 +135,7 @@ func (t *Tombstoner) TombstoneFiles() []FileStat {
return stats
}
// Walk calls fn for every Tombstone under the Tombstoner.
func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
f, err := os.Open(t.tombstonePath())
if os.IsNotExist(err) {

View File

@ -23,12 +23,13 @@ import (
)
const (
// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over
// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over.
DefaultSegmentSize = 10 * 1024 * 1024
// FileExtension is the file extension we expect for wal segments
// WALFileExtension is the file extension we expect for wal segments.
WALFileExtension = "wal"
// WALFilePrefix is the prefix on all wal segment files.
WALFilePrefix = "_"
defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points)
@ -48,18 +49,27 @@ type SegmentInfo struct {
id int
}
// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains
// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains.
type WalEntryType byte
const (
WriteWALEntryType WalEntryType = 0x01
DeleteWALEntryType WalEntryType = 0x02
// WriteWALEntryType indicates a write entry.
WriteWALEntryType WalEntryType = 0x01
// DeleteWALEntryType indicates a delete entry.
DeleteWALEntryType WalEntryType = 0x02
// DeleteRangeWALEntryType indicates a delete range entry.
DeleteRangeWALEntryType WalEntryType = 0x03
)
var (
ErrWALClosed = fmt.Errorf("WAL closed")
ErrWALCorrupt = fmt.Errorf("corrupted WAL entry")
// ErrWALClosed is returned when attempting to write to a closed WAL file.
ErrWALClosed = fmt.Errorf("WAL closed")
// ErrWALCorrupt is returned when reading a corrupt WAL entry.
ErrWALCorrupt = fmt.Errorf("corrupted WAL entry")
defaultWaitingWALWrites = runtime.NumCPU()
)
@ -71,6 +81,7 @@ const (
statWriteErr = "writeErr"
)
// WAL represents the write-ahead log used for writing TSM files.
type WAL struct {
mu sync.RWMutex
lastWriteTime time.Time
@ -97,6 +108,7 @@ type WAL struct {
limiter limiter.Fixed
}
// NewWAL initializes a new WAL at the given directory.
func NewWAL(path string) *WAL {
logger := zap.New(zap.NullEncoder())
return &WAL{
@ -120,6 +132,7 @@ func (l *WAL) enableTraceLogging(enabled bool) {
}
}
// WithLogger sets the WAL's logger.
func (l *WAL) WithLogger(log zap.Logger) {
l.logger = log.With(zap.String("service", "wal"))
@ -150,14 +163,14 @@ func (l *WAL) Statistics(tags map[string]string) []models.Statistic {
}}
}
// Path returns the path the log was initialized with.
// Path returns the directory the log was initialized with.
func (l *WAL) Path() string {
l.mu.RLock()
defer l.mu.RUnlock()
return l.path
}
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
// Open opens and initializes the Log. Open can recover from previous unclosed shutdowns.
func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
@ -215,7 +228,7 @@ func (l *WAL) Open() error {
return nil
}
// WritePoints writes the given points to the WAL. Returns the WAL segment ID to
// WritePoints writes the given points to the WAL. It returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should
// be ignored.
func (l *WAL) WritePoints(values map[string][]Value) (int, error) {
@ -233,6 +246,7 @@ func (l *WAL) WritePoints(values map[string][]Value) (int, error) {
return id, nil
}
// ClosedSegments returns a slice of the names of the closed segment files.
func (l *WAL) ClosedSegments() ([]string, error) {
l.mu.RLock()
defer l.mu.RUnlock()
@ -264,6 +278,7 @@ func (l *WAL) ClosedSegments() ([]string, error) {
return closedFiles, nil
}
// Remove deletes the given segment file paths from disk and cleans up any associated objects.
func (l *WAL) Remove(files []string) error {
l.mu.Lock()
defer l.mu.Unlock()
@ -292,7 +307,7 @@ func (l *WAL) Remove(files []string) error {
return nil
}
// LastWriteTime is the last time anything was written to the WAL
// LastWriteTime is the last time anything was written to the WAL.
func (l *WAL) LastWriteTime() time.Time {
l.mu.RLock()
defer l.mu.RUnlock()
@ -347,8 +362,8 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return l.currentSegmentID, l.currentSegmentWriter.sync()
}
// rollSegment closes the current segment and opens a new one if the current segment is over
// the max segment size.
// rollSegment checks if the current segment is due to roll over to a new segment;
// and if so, opens a new segment file for future writes.
func (l *WAL) rollSegment() error {
if l.currentSegmentWriter == nil || l.currentSegmentWriter.size > DefaultSegmentSize {
if err := l.newSegmentFile(); err != nil {
@ -393,7 +408,8 @@ func (l *WAL) Delete(keys []string) (int, error) {
return id, nil
}
// Delete deletes the given keys, returning the segment ID for the operation.
// DeleteRange deletes the given keys within the given time range,
// returning the segment ID for the operation.
func (l *WAL) DeleteRange(keys []string, min, max int64) (int, error) {
if len(keys) == 0 {
return 0, nil
@ -411,7 +427,7 @@ func (l *WAL) DeleteRange(keys []string, min, max int64) (int, error) {
return id, nil
}
// Close will finish any flush that is currently in process and close file handles
// Close will finish any flush that is currently in progress and close file handles.
func (l *WAL) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
@ -428,7 +444,7 @@ func (l *WAL) Close() error {
return nil
}
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID
// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
func segmentFileNames(dir string) ([]string, error) {
names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
if err != nil {
@ -438,7 +454,7 @@ func segmentFileNames(dir string) ([]string, error) {
return names, nil
}
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log
// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
func (l *WAL) newSegmentFile() error {
l.currentSegmentID++
if l.currentSegmentWriter != nil {
@ -607,12 +623,14 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
return dst[:n], nil
}
// MarshalBinary returns a binary representation of the entry in a new byte slice.
func (w *WriteWALEntry) MarshalBinary() ([]byte, error) {
// Temp buffer to write marshaled points into
b := make([]byte, defaultBufLen)
return w.Encode(b)
}
// UnmarshalBinary deserializes the byte slice into w.
func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
var i int
for i < len(b) {
@ -749,6 +767,7 @@ func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
return nil
}
// Type returns WriteWALEntryType.
func (w *WriteWALEntry) Type() WalEntryType {
return WriteWALEntryType
}
@ -758,16 +777,19 @@ type DeleteWALEntry struct {
Keys []string
}
// MarshalBinary returns a binary representation of the entry in a new byte slice.
func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) {
b := make([]byte, defaultBufLen)
return w.Encode(b)
}
// UnmarshalBinary deserializes the byte slice into w.
func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error {
w.Keys = strings.Split(string(b), "\n")
return nil
}
// Encode converts the DeleteWALEntry into a byte slice, appending to dst.
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
var n int
for _, k := range w.Keys {
@ -785,6 +807,7 @@ func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
return []byte(dst[:n-1]), nil
}
// Type returns DeleteWALEntryType.
func (w *DeleteWALEntry) Type() WalEntryType {
return DeleteWALEntryType
}
@ -795,11 +818,13 @@ type DeleteRangeWALEntry struct {
Min, Max int64
}
// MarshalBinary returns a binary representation of the entry in a new byte slice.
func (w *DeleteRangeWALEntry) MarshalBinary() ([]byte, error) {
b := make([]byte, defaultBufLen)
return w.Encode(b)
}
// UnmarshalBinary deserializes the byte slice into w.
func (w *DeleteRangeWALEntry) UnmarshalBinary(b []byte) error {
if len(b) < 16 {
return ErrWALCorrupt
@ -825,6 +850,7 @@ func (w *DeleteRangeWALEntry) UnmarshalBinary(b []byte) error {
return nil
}
// Encode converts the DeleteRangeWALEntry into a byte slice, appending to b.
func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) {
sz := 16
for _, k := range w.Keys {
@ -849,6 +875,7 @@ func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) {
return b[:i], nil
}
// Type returns DeleteRangeWALEntryType.
func (w *DeleteRangeWALEntry) Type() WalEntryType {
return DeleteRangeWALEntryType
}
@ -859,6 +886,7 @@ type WALSegmentWriter struct {
size int
}
// NewWALSegmentWriter returns a new WALSegmentWriter writing to w.
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
return &WALSegmentWriter{
w: w,
@ -872,8 +900,8 @@ func (w *WALSegmentWriter) path() string {
return ""
}
// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
var buf [5]byte
buf[0] = byte(entryType)
binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))
@ -891,7 +919,8 @@ func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) erro
return nil
}
// Sync flushes the file systems in-memory copy of recently written data to disk.
// Sync flushes the file systems in-memory copy of recently written data to disk,
// if w is writing to an os.File.
func (w *WALSegmentWriter) sync() error {
if f, ok := w.w.(*os.File); ok {
return f.Sync()
@ -911,13 +940,14 @@ type WALSegmentReader struct {
err error
}
// NewWALSegmentReader returns a new WALSegmentReader reading from r.
func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
return &WALSegmentReader{
r: r,
}
}
// Next indicates if there is a value to read
// Next indicates if there is a value to read.
func (r *WALSegmentReader) Next() bool {
b := getBuf(defaultBufLen)
defer putBuf(b)
@ -989,6 +1019,7 @@ func (r *WALSegmentReader) Next() bool {
return true
}
// Read returns the next entry in the reader.
func (r *WALSegmentReader) Read() (WALEntry, error) {
if r.err != nil {
return nil, r.err
@ -1003,15 +1034,17 @@ func (r *WALSegmentReader) Count() int64 {
return r.n
}
// Error returns the last error encountered by the reader.
func (r *WALSegmentReader) Error() error {
return r.err
}
// Close closes the underlying io.Reader.
func (r *WALSegmentReader) Close() error {
return r.r.Close()
}
// idFromFileName parses the segment file ID from its name
// idFromFileName parses the segment file ID from its name.
func idFromFileName(name string) (int, error) {
parts := strings.Split(filepath.Base(name), ".")
if len(parts) != 2 {

View File

@ -80,6 +80,7 @@ const (
// identify the file as a tsm1 formatted file
MagicNumber uint32 = 0x16D116D1
// Version indicates the version of the TSM file format.
Version byte = 1
// Size in bytes of an index entry
@ -99,19 +100,26 @@ const (
)
var (
ErrNoValues = fmt.Errorf("no values written")
ErrTSMClosed = fmt.Errorf("tsm file closed")
//ErrNoValues is returned when TSMWriter.WriteIndex is called and there are no values to write.
ErrNoValues = fmt.Errorf("no values written")
// ErrTSMClosed is returned when performing an operation against a closed TSM file.
ErrTSMClosed = fmt.Errorf("tsm file closed")
// ErrMaxKeyLengthExceeded is returned when attempting to write a key that is too long.
ErrMaxKeyLengthExceeded = fmt.Errorf("max key length exceeded")
ErrMaxBlocksExceeded = fmt.Errorf("max blocks exceeded")
// ErrMaxBlocksExceeded is returned when attempting to write a block past the allowed number.
ErrMaxBlocksExceeded = fmt.Errorf("max blocks exceeded")
)
// TSMWriter writes TSM formatted key and values.
type TSMWriter interface {
// Write writes a new block for key containing and values. Writes append
// blocks in the order that the Write function is called. The caller is
// responsible for ensuring keys and blocks or sorted appropriately.
// responsible for ensuring keys and blocks are sorted appropriately.
// Values are encoded as a full block. The caller is responsible for
// ensuring a fixed number of values are encoded in each block as wells as
// ensuring a fixed number of values are encoded in each block as well as
// ensuring the Values are sorted. The first and last timestamp values are
// used as the minimum and maximum values for the index entry.
Write(key string, values Values) error
@ -126,14 +134,14 @@ type TSMWriter interface {
// WriteIndex finishes the TSM write streams and writes the index.
WriteIndex() error
// Closes any underlying file resources.
// Close closes any underlying file resources.
Close() error
// Size returns the current size in bytes of the file
// Size returns the current size in bytes of the file.
Size() uint32
}
// IndexWriter writes a TSMIndex
// IndexWriter writes a TSMIndex.
type IndexWriter interface {
// Add records a new block entry for a key in the index.
Add(key string, blockType byte, minTime, maxTime int64, offset int64, size uint32)
@ -147,19 +155,18 @@ type IndexWriter interface {
// KeyCount returns the count of unique keys in the index.
KeyCount() int
// Size returns the size of a the current index in bytes
// Size returns the size of a the current index in bytes.
Size() uint32
// MarshalBinary returns a byte slice encoded version of the index.
MarshalBinary() ([]byte, error)
// WriteTo writes the index contents to a writer
// WriteTo writes the index contents to a writer.
WriteTo(w io.Writer) (int64, error)
}
// IndexEntry is the index information for a given block in a TSM file.
type IndexEntry struct {
// The min and max time of all points stored in the block.
MinTime, MaxTime int64
@ -170,7 +177,7 @@ type IndexEntry struct {
Size uint32
}
// UnmarshalBinary decodes an IndexEntry from a byte slice
// UnmarshalBinary decodes an IndexEntry from a byte slice.
func (e *IndexEntry) UnmarshalBinary(b []byte) error {
if len(b) != indexEntrySize {
return fmt.Errorf("unmarshalBinary: short buf: %v != %v", indexEntrySize, len(b))
@ -182,8 +189,8 @@ func (e *IndexEntry) UnmarshalBinary(b []byte) error {
return nil
}
// AppendTo will write a binary-encoded version of IndexEntry to b, allocating
// and returning a new slice, if necessary
// AppendTo writes a binary-encoded version of IndexEntry to b, allocating
// and returning a new slice, if necessary.
func (e *IndexEntry) AppendTo(b []byte) []byte {
if len(b) < indexEntrySize {
if cap(b) < indexEntrySize {
@ -201,21 +208,24 @@ func (e *IndexEntry) AppendTo(b []byte) []byte {
return b
}
// Returns true if this IndexEntry may contain values for the given time. The min and max
// times are inclusive.
// Contains returns true if this IndexEntry may contain values for the given time.
// The min and max times are inclusive.
func (e *IndexEntry) Contains(t int64) bool {
return e.MinTime <= t && e.MaxTime >= t
}
// OverlapsTimeRange returns true if the given time ranges are completely within the entry's time bounds.
func (e *IndexEntry) OverlapsTimeRange(min, max int64) bool {
return e.MinTime <= max && e.MaxTime >= min
}
// String returns a string representation of the entry.
func (e *IndexEntry) String() string {
return fmt.Sprintf("min=%s max=%s ofs=%d siz=%d",
time.Unix(0, e.MinTime).UTC(), time.Unix(0, e.MaxTime).UTC(), e.Offset, e.Size)
}
// NewIndexWriter returns a new IndexWriter.
func NewIndexWriter() IndexWriter {
return &directIndex{
blocks: map[string]*indexEntries{},
@ -418,6 +428,7 @@ type tsmWriter struct {
n int64
}
// NewTSMWriter returns a new TSMWriter writing to w.
func NewTSMWriter(w io.Writer) (TSMWriter, error) {
index := &directIndex{
blocks: map[string]*indexEntries{},
@ -439,6 +450,7 @@ func (t *tsmWriter) writeHeader() error {
return nil
}
// Write writes a new block containing key and values.
func (t *tsmWriter) Write(key string, values Values) error {
// Nothing to write
if len(values) == 0 {
@ -537,7 +549,7 @@ func (t *tsmWriter) WriteBlock(key string, minTime, maxTime int64, block []byte)
}
// WriteIndex writes the index section of the file. If there are no index entries to write,
// this returns ErrNoValues
// this returns ErrNoValues.
func (t *tsmWriter) WriteIndex() error {
indexPos := t.n
@ -579,7 +591,7 @@ func (t *tsmWriter) Size() uint32 {
return uint32(t.n) + t.index.Size()
}
// verifyVersion will verify that the reader's bytes are a TSM byte
// verifyVersion verifies that the reader's bytes are a TSM byte
// stream of the correct version (1)
func verifyVersion(r io.ReadSeeker) error {
_, err := r.Seek(0, 0)