Merge pull request #4959 from influxdb/jw-tsm-query

TSM dev engine startup
pull/4960/head
Jason Wilder 2015-12-02 14:37:16 -07:00
commit 28b9fdc08d
5 changed files with 151 additions and 10 deletions

View File

@ -129,6 +129,12 @@ func ParsePointsString(buf string) ([]Point, error) {
return ParsePoints([]byte(buf))
}
func ParseKey(buf string) (string, Tags, error) {
_, keyBuf, err := scanKey([]byte(buf), 0)
tags := parseTags([]byte(buf))
return string(keyBuf), tags, err
}
// ParsePointsWithPrecision is similar to ParsePoints, but allows the
// caller to provide a precision for time.
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
@ -1106,10 +1112,14 @@ func (p *point) SetTime(t time.Time) {
// Tags returns the tag set for the point
func (p *point) Tags() Tags {
return parseTags(p.key)
}
func parseTags(buf []byte) Tags {
tags := map[string]string{}
if len(p.key) != 0 {
pos, name := scanTo(p.key, 0, ',')
if len(buf) != 0 {
pos, name := scanTo(buf, 0, ',')
// it's an empyt key, so there are no tags
if len(name) == 0 {
@ -1119,11 +1129,11 @@ func (p *point) Tags() Tags {
i := pos + 1
var key, value []byte
for {
if i >= len(p.key) {
if i >= len(buf) {
break
}
i, key = scanTo(p.key, i, '=')
i, value = scanTagValue(p.key, i+1)
i, key = scanTo(buf, i, '=')
i, value = scanTagValue(buf, i+1)
if len(value) == 0 {
continue

View File

@ -55,6 +55,9 @@ const (
DefaultIndexMinCompactionInterval = time.Minute
DefaultIndexMinCompactionFileCount = 5
DefaultIndexCompactionFullAge = 5 * time.Minute
DefaultCacheMaxMemorySize = 0 // No max memory limit
)
type Config struct {
@ -99,6 +102,10 @@ type Config struct {
// Query logging
QueryLogEnabled bool `toml:"query-log-enabled"`
// compaction options for tsm1dev
CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"`
}
func NewConfig() Config {
@ -128,6 +135,8 @@ func NewConfig() Config {
IndexMinCompactionInterval: DefaultIndexMinCompactionInterval,
QueryLogEnabled: true,
CacheMaxMemorySize: DefaultCacheMaxMemorySize,
}
}

View File

@ -603,7 +603,7 @@ func (d *indirectIndex) Type(key string) (byte, error) {
defer d.mu.RUnlock()
ofs := d.search(key)
if ofs < len(d.offsets) {
if ofs < len(d.b) {
n, _, err := readKey(d.b[ofs:])
if err != nil {
panic(fmt.Sprintf("error reading key: %v", err))

View File

@ -1,6 +1,7 @@
package tsm1
import (
"fmt"
"io"
"log"
"os"
@ -8,6 +9,7 @@ import (
"sync"
"time"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/tsdb"
)
@ -44,7 +46,7 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.WALMaxMemorySizeThreshold))
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize))
c := &Compactor{
Dir: path,
@ -101,6 +103,10 @@ func (e *DevEngine) Open() error {
return err
}
if err := e.reloadCache(); err != nil {
return err
}
go e.compact()
return nil
@ -121,6 +127,57 @@ func (e *DevEngine) SetLogOutput(w io.Writer) {}
// LoadMetadataIndex loads the shard metadata into memory.
func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
keys := e.FileStore.Keys()
for _, k := range keys {
seriesKey, field := seriesAndFieldFromCompositeKey(k)
measurement := tsdb.MeasurementFromSeriesKey(seriesKey)
m := index.CreateMeasurementIndexIfNotExists(measurement)
m.SetFieldName(field)
typ, err := e.FileStore.Type(k)
if err != nil {
return err
}
mf := measurementFields[measurement]
if mf == nil {
mf = &tsdb.MeasurementFields{
Fields: map[string]*tsdb.Field{},
}
measurementFields[measurement] = mf
}
switch typ {
case BlockFloat64:
if err := mf.CreateFieldIfNotExists(field, influxql.Float, false); err != nil {
return err
}
case BlockInt64:
if err := mf.CreateFieldIfNotExists(field, influxql.Integer, false); err != nil {
return err
}
case BlockBool:
if err := mf.CreateFieldIfNotExists(field, influxql.Boolean, false); err != nil {
return err
}
case BlockString:
if err := mf.CreateFieldIfNotExists(field, influxql.String, false); err != nil {
return err
}
default:
return fmt.Errorf("unkown block type for: %v. got %v", k, typ)
}
_, tags, err := models.ParseKey(seriesKey)
if err == nil {
return err
}
s := tsdb.NewSeries(seriesKey, tags)
s.InitializeShards()
index.CreateSeriesIndexIfNotExists(measurement, s)
}
return nil
}
@ -146,12 +203,12 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m
// DeleteSeries deletes the series from the engine.
func (e *DevEngine) DeleteSeries(seriesKeys []string) error {
panic("not implemented")
return fmt.Errorf("delete series not implemented")
}
// DeleteMeasurement deletes a measurement and all related series.
func (e *DevEngine) DeleteMeasurement(name string, seriesKeys []string) error {
panic("not implemented")
return fmt.Errorf("delete measurement not implemented")
}
// SeriesCount returns the number of series buckets on the shard.
@ -161,7 +218,7 @@ func (e *DevEngine) SeriesCount() (n int, err error) {
// Begin starts a new transaction on the engine.
func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) {
panic("not implemented")
return &devTx{engine: e}, nil
}
func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
@ -213,6 +270,50 @@ func (e *DevEngine) compact() {
}
}
func (e *DevEngine) reloadCache() error {
files, err := segmentFileNames(e.WAL.Path())
if err != nil {
return err
}
for _, fn := range files {
id, err := idFromFileName(fn)
if err != nil {
return err
}
f, err := os.Open(fn)
if err != nil {
return err
}
r := NewWALSegmentReader(f)
defer r.Close()
// Iterate over each reader in order. Later readers will overwrite earlier ones if values
// overlap.
for r.Next() {
entry, err := r.Read()
if err != nil {
return err
}
switch t := entry.(type) {
case *WriteWALEntry:
if err := e.Cache.WriteMulti(t.Values, uint64(id)); err != nil {
return err
}
case *DeleteWALEntry:
// FIXME: Implement this
// if err := e.Cache.Delete(t.Keys); err != nil {
// return err
// }
}
}
}
return nil
}
type devTx struct {
engine *DevEngine
}
@ -224,6 +325,10 @@ func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, asc
ascending: ascending,
}
}
func (t *devTx) Rollback() error { return nil }
func (t *devTx) Size() int64 { panic("not implemented") }
func (t *devTx) Commit() error { panic("not implemented") }
func (t *devTx) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") }
// devCursor is a cursor that combines both TSM and cached data.
type devCursor struct {

View File

@ -36,6 +36,11 @@ type TSMFile interface {
// Keys returns all keys contained in the file.
Keys() []string
// Type returns the block type of the values stored for the key. Returns one of
// BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist,
// an error is returned.
Type(key string) (byte, error)
// Delete removes the key from the set of keys available in this file.
Delete(key string) error
@ -153,6 +158,18 @@ func (f *FileStore) Keys() []string {
return keys
}
func (f *FileStore) Type(key string) (byte, error) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, f := range f.files {
if f.Contains(key) {
return f.Type(key)
}
}
return 0, fmt.Errorf("unknown type for %v", key)
}
func (f *FileStore) Delete(key string) error {
f.mu.Lock()
defer f.mu.Unlock()