Make writes to historical areas possible

pull/4308/head
Paul Dix 2015-09-25 17:11:27 -04:00
parent 982c28b947
commit 0770ccc87d
4 changed files with 151 additions and 32 deletions

View File

@ -1,8 +1,6 @@
package pd1 package pd1
import ( import (
"encoding/binary"
"math"
"time" "time"
"github.com/dgryski/go-tsz" "github.com/dgryski/go-tsz"
@ -10,9 +8,8 @@ import (
) )
type Value interface { type Value interface {
TimeBytes() []byte
ValueBytes() []byte
Time() time.Time Time() time.Time
UnixNano() int64
Value() interface{} Value() interface{}
Size() int Size() int
} }
@ -34,8 +31,7 @@ func NewValue(t time.Time, value interface{}) Value {
type EmptyValue struct { type EmptyValue struct {
} }
func (e *EmptyValue) TimeBytes() []byte { return nil } func (e *EmptyValue) UnixNano() int64 { return tsdb.EOF }
func (e *EmptyValue) ValueBytes() []byte { return nil }
func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) } func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) }
func (e *EmptyValue) Value() interface{} { return nil } func (e *EmptyValue) Value() interface{} { return nil }
func (e *EmptyValue) Size() int { return 0 } func (e *EmptyValue) Size() int { return 0 }
@ -93,20 +89,14 @@ func (f *FloatValue) Time() time.Time {
return f.time return f.time
} }
func (f *FloatValue) UnixNano() int64 {
return f.time.UnixNano()
}
func (f *FloatValue) Value() interface{} { func (f *FloatValue) Value() interface{} {
return f.value return f.value
} }
func (f *FloatValue) TimeBytes() []byte {
return u64tob(uint64(f.Time().UnixNano()))
}
func (f *FloatValue) ValueBytes() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, math.Float64bits(f.value))
return buf
}
func (f *FloatValue) Size() int { func (f *FloatValue) Size() int {
return 16 return 16
} }

View File

@ -53,7 +53,7 @@ const (
// DefaultBlockSize is the default size of uncompressed points blocks. // DefaultBlockSize is the default size of uncompressed points blocks.
DefaultBlockSize = 512 * 1024 // 512KB DefaultBlockSize = 512 * 1024 // 512KB
DefaultMaxFileSize = 10 * 1024 * 1024 // 10MB DefaultRotateFileSize = 10 * 1024 * 1024 // 10MB
DefaultMaxPointsPerBlock = 1000 DefaultMaxPointsPerBlock = 1000
@ -86,6 +86,8 @@ type Engine struct {
WAL *Log WAL *Log
RotateFileSize uint32
filesLock sync.RWMutex filesLock sync.RWMutex
files dataFiles files dataFiles
currentFileID int currentFileID int
@ -106,6 +108,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
// TODO: this is the function where we can inject a check against the in memory collisions // TODO: this is the function where we can inject a check against the in memory collisions
HashSeriesField: hashSeriesField, HashSeriesField: hashSeriesField,
WAL: w, WAL: w,
RotateFileSize: DefaultRotateFileSize,
} }
e.WAL.Index = e e.WAL.Index = e
@ -172,6 +175,13 @@ func (e *Engine) Close() error {
return nil return nil
} }
// DataFileCount returns the number of data files in the database
func (e *Engine) DataFileCount() int {
e.filesLock.RLock()
defer e.filesLock.RUnlock()
return len(e.files)
}
// SetLogOutput is a no-op. // SetLogOutput is a no-op.
func (e *Engine) SetLogOutput(w io.Writer) {} func (e *Engine) SetLogOutput(w io.Writer) {}
@ -296,32 +306,89 @@ func (e *Engine) WriteAndCompact(pointsByKey map[string]Values, measurementField
} }
} }
// TODO: handle values written in the past that force an old data file to get rewritten if len(e.files) == 0 {
return e.rewriteFile(nil, valuesByID)
// we keep track of the newest data file and if it should be
// rewritten with new data.
var newestDataFile *dataFile
overwriteNewestFile := false
if len(e.files) > 0 {
newestDataFile = e.files[len(e.files)-1]
overwriteNewestFile = newestDataFile.size < DefaultMaxFileSize
} }
// flush values by id to either a new file or rewrite the old one maxTime := int64(math.MaxInt64)
if overwriteNewestFile { // reverse through the data files and write in the data
if err := e.rewriteFile(newestDataFile, valuesByID); err != nil { for i := len(e.files) - 1; i >= 0; i-- {
f := e.files[i]
// max times are exclusive, so add 1 to it
fileMax := f.MaxTime() + 1
fileMin := f.MinTime()
// if the file is < rotate, write all data between fileMin and maxTime
if f.size < e.RotateFileSize {
if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, maxTime)); err != nil {
return err
}
continue
}
// if the file is > rotate:
// write all data between fileMax and maxTime into new file
// write all data between fileMin and fileMax into old file
if err := e.rewriteFile(nil, e.filterDataBetweenTimes(valuesByID, fileMax, maxTime)); err != nil {
return err return err
} }
} else if err := e.rewriteFile(nil, valuesByID); err != nil { if err := e.rewriteFile(f, e.filterDataBetweenTimes(valuesByID, fileMin, fileMax)); err != nil {
return err return err
}
maxTime = fileMin
} }
// for any data leftover, write into a new file since it's all older
// than any file we currently have
return e.rewriteFile(nil, valuesByID)
}
return nil // filterDataBetweenTimes will create a new map with data between
// the minTime (inclusive) and maxTime (exclusive) while removing that
// data from the passed in map. It is assume that the Values arrays
// are sorted in time ascending order
func (e *Engine) filterDataBetweenTimes(valuesByID map[uint64]Values, minTime, maxTime int64) map[uint64]Values {
filteredValues := make(map[uint64]Values)
for id, values := range valuesByID {
maxIndex := len(values)
minIndex := 0
// find the index of the first value in the range
for i, v := range values {
t := v.UnixNano()
if t >= minTime && t < maxTime {
minIndex = i
break
}
}
// go backwards to find the index of the last value in the range
for i := len(values) - 1; i >= 0; i-- {
t := values[i].UnixNano()
if t < maxTime {
maxIndex = i + 1
break
}
}
// write into the result map and filter the passed in map
filteredValues[id] = values[minIndex:maxIndex]
// if we grabbed all the values, remove them from the passed in map
if minIndex == len(values) || (minIndex == 0 && maxIndex == len(values)) {
delete(valuesByID, id)
continue
}
valuesByID[id] = values[0:minIndex]
if maxIndex < len(values) {
valuesByID[id] = append(valuesByID[id], values[maxIndex:]...)
}
}
return filteredValues
} }
// rewriteFile will read in the old data file, if provided and merge the values // rewriteFile will read in the old data file, if provided and merge the values
// in the passed map into a new data file // in the passed map into a new data file
func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error { func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error {
if len(valuesByID) == 0 {
return nil
}
// we need the values in sorted order so that we can merge them into the // we need the values in sorted order so that we can merge them into the
// new file as we read the old file // new file as we read the old file
ids := make([]uint64, 0, len(valuesByID)) ids := make([]uint64, 0, len(valuesByID))

View File

@ -125,6 +125,65 @@ func TestEngine_WriteAndReadFloats(t *testing.T) {
func TestEngine_WriteIndexWithCollision(t *testing.T) { func TestEngine_WriteIndexWithCollision(t *testing.T) {
} }
func TestEngine_WriteIndexQueryAcrossDataFiles(t *testing.T) {
e := OpenDefaultEngine()
defer e.Cleanup()
e.Shard = newFieldCodecMock(map[string]influxql.DataType{"value": influxql.Float})
e.RotateFileSize = 10
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
p2 := parsePoint("cpu,host=B value=1.1 1000000000")
p3 := parsePoint("cpu,host=A value=2.4 4000000000")
p4 := parsePoint("cpu,host=B value=2.4 4000000000")
if err := e.WritePoints([]models.Point{p1, p2, p3, p4}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
p5 := parsePoint("cpu,host=A value=1.5 5000000000")
p6 := parsePoint("cpu,host=B value=2.5 5000000000")
p7 := parsePoint("cpu,host=A value=1.3 3000000000")
p8 := parsePoint("cpu,host=B value=2.3 3000000000")
if err := e.WritePoints([]models.Point{p5, p6, p7, p8}, nil, nil); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
if count := e.DataFileCount(); count != 2 {
t.Fatalf("expected 2 data files to exist but got %d", count)
}
fields := []string{"value"}
var codec *tsdb.FieldCodec
verify := func(series string, points []models.Point, seek int64) {
c := e.Cursor(series, fields, codec, true)
// we we want to seek, do it and verify the first point matches
if seek != 0 {
k, v := c.SeekTo(seek)
p := points[0]
val := p.Fields()["value"]
if p.UnixNano() != k || val != v {
t.Fatalf("expected to seek to first point\n\texp: %d %f\n\tgot: %d %f", p.UnixNano(), val, k, v)
}
points = points[1:]
}
for _, p := range points {
k, v := c.Next()
val := p.Fields()["value"]
if p.UnixNano() != k || val != v {
t.Fatalf("expected to seek to first point\n\texp: %d %f\n\tgot: %d %f", p.UnixNano(), val, k, v.(float64))
}
}
}
verify("cpu,host=A", []models.Point{p1, p7, p3, p5}, 0)
verify("cpu,host=B", []models.Point{p2, p8, p4, p6}, 0)
}
func TestEngine_WriteIndexBenchmarkNames(t *testing.T) { func TestEngine_WriteIndexBenchmarkNames(t *testing.T) {
t.Skip("whatevs") t.Skip("whatevs")

View File

@ -521,6 +521,9 @@ func (l *Log) flush(flush flushType) error {
// copy the cache items to new maps so we can empty them out // copy the cache items to new maps so we can empty them out
l.flushCache = l.cache l.flushCache = l.cache
l.cache = make(map[string]Values) l.cache = make(map[string]Values)
for k, _ := range l.cacheDirtySort {
sort.Sort(l.flushCache[k])
}
l.cacheDirtySort = make(map[string]bool) l.cacheDirtySort = make(map[string]bool)
valuesByKey := make(map[string]Values) valuesByKey := make(map[string]Values)