add tsm1 wal quickcheck
This commit adds quickcheck testing for the tsm1 WAL.pull/4431/head
parent
b88dd21581
commit
f2d23b070b
|
@ -59,6 +59,7 @@
|
|||
- [#4360](https://github.com/influxdb/influxdb/issues/4360): Aggregate Selectors overwrite values during post-processing
|
||||
- [#4421](https://github.com/influxdb/influxdb/issues/4421): Fix line protocol accepting tags with no values
|
||||
- [#4434](https://github.com/influxdb/influxdb/pull/4434): Allow 'E' for scientific values. Fixes [#4433](https://github.com/influxdb/influxdb/issues/4433)
|
||||
- [#4431](https://github.com/influxdb/influxdb/issues/4431): Add tsm1 WAL QuickCheck
|
||||
|
||||
## v0.9.4 [2015-09-14]
|
||||
|
||||
|
|
|
@ -207,7 +207,7 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision))
|
||||
pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)).UTC()
|
||||
}
|
||||
return pt, nil
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
package tsm1_test
|
||||
|
||||
import (
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestCombinedEngineCursor_Quick(t *testing.T) {
|
||||
|
@ -173,3 +175,30 @@ func MergeCursorItems(a, b []CursorItem) []CursorItem {
|
|||
}
|
||||
return items
|
||||
}
|
||||
|
||||
// ReadAllCursor slurps all values from a cursor.
|
||||
func ReadAllCursor(c tsdb.Cursor) tsm1.Values {
|
||||
var values tsm1.Values
|
||||
for k, v := c.Next(); k != tsdb.EOF; k, v = c.Next() {
|
||||
values = append(values, tsm1.NewValue(time.Unix(0, k).UTC(), v))
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// DedupeValues returns a list of values with duplicate times removed.
|
||||
func DedupeValues(a tsm1.Values) tsm1.Values {
|
||||
other := make(tsm1.Values, 0, len(a))
|
||||
m := map[int64]struct{}{}
|
||||
|
||||
for i := len(a) - 1; i >= 0; i-- {
|
||||
value := a[i]
|
||||
if _, ok := m[value.UnixNano()]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
other = append(other, value)
|
||||
m[value.UnixNano()] = struct{}{}
|
||||
}
|
||||
|
||||
return other
|
||||
}
|
||||
|
|
|
@ -161,7 +161,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
|
|||
MaxPointsPerBlock: DefaultMaxPointsPerBlock,
|
||||
RotateBlockSize: DefaultRotateBlockSize,
|
||||
}
|
||||
e.WAL.Index = e
|
||||
e.WAL.IndexWriter = e
|
||||
|
||||
return e
|
||||
}
|
||||
|
|
|
@ -100,8 +100,8 @@ type Log struct {
|
|||
// MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected
|
||||
MaxMemorySizeThreshold int
|
||||
|
||||
// Index is the database series will be flushed to
|
||||
Index IndexWriter
|
||||
// IndexWriter is the database series will be flushed to
|
||||
IndexWriter IndexWriter
|
||||
|
||||
// LoggingEnabled specifies if detailed logs should be output
|
||||
LoggingEnabled bool
|
||||
|
@ -136,6 +136,9 @@ func NewLog(path string) *Log {
|
|||
}
|
||||
}
|
||||
|
||||
// Path returns the path the log was initialized with.
|
||||
func (l *Log) Path() string { return l.path }
|
||||
|
||||
// Open opens and initializes the Log. Will recover from previous unclosed shutdowns
|
||||
func (l *Log) Open() error {
|
||||
|
||||
|
@ -393,8 +396,8 @@ func (l *Log) readFileToCache(fileName string) error {
|
|||
if err := json.Unmarshal(data, &d); err != nil {
|
||||
return err
|
||||
}
|
||||
l.Index.MarkDeletes(d.Keys)
|
||||
l.Index.MarkMeasurementDelete(d.MeasurementName)
|
||||
l.IndexWriter.MarkDeletes(d.Keys)
|
||||
l.IndexWriter.MarkMeasurementDelete(d.MeasurementName)
|
||||
l.deleteKeysFromCache(d.Keys)
|
||||
if d.MeasurementName != "" {
|
||||
l.deleteMeasurementFromCache(d.MeasurementName)
|
||||
|
@ -505,28 +508,11 @@ func (l *Log) Close() error {
|
|||
l.cache = nil
|
||||
l.measurementFieldsCache = nil
|
||||
l.seriesToCreateCache = nil
|
||||
if l.currentSegmentFile == nil {
|
||||
return nil
|
||||
}
|
||||
if err := l.currentSegmentFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
l.currentSegmentFile = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// close all the open Log partitions and file handles
|
||||
func (l *Log) close() error {
|
||||
l.cache = nil
|
||||
l.cacheDirtySort = nil
|
||||
if l.currentSegmentFile == nil {
|
||||
return nil
|
||||
if l.currentSegmentFile != nil {
|
||||
l.currentSegmentFile.Close()
|
||||
l.currentSegmentFile = nil
|
||||
}
|
||||
if err := l.currentSegmentFile.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
l.currentSegmentFile = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -614,7 +600,7 @@ func (l *Log) flush(flush flushType) error {
|
|||
}
|
||||
|
||||
startTime := time.Now()
|
||||
if err := l.Index.Write(l.flushCache, mfc, scc); err != nil {
|
||||
if err := l.IndexWriter.Write(l.flushCache, mfc, scc); err != nil {
|
||||
return err
|
||||
}
|
||||
if l.LoggingEnabled {
|
||||
|
|
|
@ -2,30 +2,35 @@ package tsm1_test
|
|||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
func TestWAL_TestWriteQueryOpen(t *testing.T) {
|
||||
w := NewWAL()
|
||||
defer w.Cleanup()
|
||||
func TestLog_TestWriteQueryOpen(t *testing.T) {
|
||||
w := NewLog()
|
||||
defer w.Close()
|
||||
|
||||
// Mock call to the index.
|
||||
var vals map[string]tsm1.Values
|
||||
var fields map[string]*tsdb.MeasurementFields
|
||||
var series []*tsdb.SeriesCreate
|
||||
|
||||
w.Index = &MockIndexWriter{
|
||||
fn: func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
vals = valuesByKey
|
||||
fields = measurementFieldsToSave
|
||||
series = seriesToCreate
|
||||
return nil
|
||||
},
|
||||
w.IndexWriter.WriteFn = func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
vals = valuesByKey
|
||||
fields = measurementFieldsToSave
|
||||
series = seriesToCreate
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := w.Open(); err != nil {
|
||||
|
@ -103,7 +108,7 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
|
|||
}
|
||||
|
||||
// ensure we close and after open it flushes to the index
|
||||
if err := w.Close(); err != nil {
|
||||
if err := w.Log.Close(); err != nil {
|
||||
t.Fatalf("failed to close: %s", err.Error())
|
||||
}
|
||||
|
||||
|
@ -140,39 +145,277 @@ func TestWAL_TestWriteQueryOpen(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type Log struct {
|
||||
*tsm1.Log
|
||||
path string
|
||||
// Ensure the log can handle random data.
|
||||
func TestLog_Quick(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("short mode")
|
||||
}
|
||||
|
||||
quick.Check(func(pointsSlice PointsSlice) bool {
|
||||
l := NewLog()
|
||||
l.FlushMemorySizeThreshold = 4096 // low threshold
|
||||
defer l.Close()
|
||||
|
||||
var mu sync.Mutex
|
||||
index := make(map[string]tsm1.Values)
|
||||
|
||||
// Ignore flush to the index.
|
||||
l.IndexWriter.WriteFn = func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
for key, values := range valuesByKey {
|
||||
index[key] = append(index[key], values...)
|
||||
}
|
||||
|
||||
// Simulate slow index writes.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open the log.
|
||||
if err := l.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Generate fields and series to create.
|
||||
fieldsToWrite := pointsSlice.MeasurementFields()
|
||||
seriesToWrite := pointsSlice.SeriesCreate()
|
||||
|
||||
// Write each set of points separately.
|
||||
for _, points := range pointsSlice {
|
||||
if err := l.WritePoints(points.Encode(), fieldsToWrite, seriesToWrite); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate over each series and read out cursor.
|
||||
for _, series := range pointsSlice.Series() {
|
||||
mu.Lock()
|
||||
if got := mergeIndexCursor(series, l, index); !reflect.DeepEqual(got, series.Values) {
|
||||
t.Fatalf("mismatch:\n\ngot=%v\n\nexp=%v\n\n", len(got), len(series.Values))
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
// Reopen log.
|
||||
if err := l.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Iterate over each series and read out cursor again.
|
||||
for _, series := range pointsSlice.Series() {
|
||||
mu.Lock()
|
||||
if got := mergeIndexCursor(series, l, index); !reflect.DeepEqual(got, series.Values) {
|
||||
t.Fatalf("mismatch(reopen):\n\ngot=%v\n\nexp=%v\n\n", len(got), len(series.Values))
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
return true
|
||||
}, &quick.Config{
|
||||
MaxCount: 10,
|
||||
Values: func(values []reflect.Value, rand *rand.Rand) {
|
||||
values[0] = reflect.ValueOf(GeneratePointsSlice(rand))
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func NewWAL() *Log {
|
||||
dir, err := ioutil.TempDir("", "tsm1-test")
|
||||
func mergeIndexCursor(series *Series, l *Log, index map[string]tsm1.Values) tsm1.Values {
|
||||
c := l.Cursor(series.Name, series.FieldsSlice(), &tsdb.FieldCodec{}, true)
|
||||
a := ReadAllCursor(c)
|
||||
a = append(index[series.Name+"#!~#value"], a...)
|
||||
a = DedupeValues(a)
|
||||
sort.Sort(a)
|
||||
return a
|
||||
}
|
||||
|
||||
type Log struct {
|
||||
*tsm1.Log
|
||||
IndexWriter IndexWriter
|
||||
}
|
||||
|
||||
// NewLog returns a new instance of Log
|
||||
func NewLog() *Log {
|
||||
path, err := ioutil.TempDir("", "tsm1-test")
|
||||
if err != nil {
|
||||
panic("couldn't get temp dir")
|
||||
panic(err)
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
Log: tsm1.NewLog(dir),
|
||||
path: dir,
|
||||
}
|
||||
l := &Log{Log: tsm1.NewLog(path)}
|
||||
l.Log.IndexWriter = &l.IndexWriter
|
||||
l.LoggingEnabled = true
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *Log) Cleanup() error {
|
||||
l.Close()
|
||||
os.RemoveAll(l.path)
|
||||
// Close closes the log and removes the underlying temporary path.
|
||||
func (l *Log) Close() error {
|
||||
defer os.RemoveAll(l.Path())
|
||||
return l.Log.Close()
|
||||
}
|
||||
|
||||
// Reopen closes and reopens the log.
|
||||
func (l *Log) Reopen() error {
|
||||
if err := l.Log.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := l.Log.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type MockIndexWriter struct {
|
||||
fn func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
|
||||
// IndexWriter represents a mock implementation of tsm1.IndexWriter.
|
||||
type IndexWriter struct {
|
||||
WriteFn func(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
|
||||
MarkDeletesFn func(keys []string)
|
||||
MarkMeasurementDeleteFn func(name string)
|
||||
}
|
||||
|
||||
func (m *MockIndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
return m.fn(valuesByKey, measurementFieldsToSave, seriesToCreate)
|
||||
func (w *IndexWriter) Write(valuesByKey map[string]tsm1.Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
return w.WriteFn(valuesByKey, measurementFieldsToSave, seriesToCreate)
|
||||
}
|
||||
|
||||
func (m *MockIndexWriter) MarkDeletes(keys []string) {}
|
||||
func (w *IndexWriter) MarkDeletes(keys []string) {
|
||||
w.MarkDeletesFn(keys)
|
||||
}
|
||||
|
||||
func (m *MockIndexWriter) MarkMeasurementDelete(name string) {}
|
||||
func (w *IndexWriter) MarkMeasurementDelete(name string) {
|
||||
w.MarkMeasurementDeleteFn(name)
|
||||
}
|
||||
|
||||
// PointsSlice represents a slice of point slices.
|
||||
type PointsSlice []Points
|
||||
|
||||
// GeneratePointsSlice randomly generates a slice of slice of points.
|
||||
func GeneratePointsSlice(rand *rand.Rand) PointsSlice {
|
||||
var pointsSlice PointsSlice
|
||||
for i, pointsN := 0, rand.Intn(100); i < pointsN; i++ {
|
||||
var points Points
|
||||
for j, pointN := 0, rand.Intn(1000); j < pointN; j++ {
|
||||
points = append(points, Point{
|
||||
Name: strconv.Itoa(rand.Intn(10)),
|
||||
Fields: models.Fields{"value": rand.Int63n(100000)},
|
||||
Time: time.Unix(0, rand.Int63n(int64(24*time.Hour))).UTC(),
|
||||
})
|
||||
}
|
||||
pointsSlice = append(pointsSlice, points)
|
||||
}
|
||||
return pointsSlice
|
||||
}
|
||||
|
||||
// MeasurementFields returns a set of fields used across all points.
|
||||
func (a PointsSlice) MeasurementFields() map[string]*tsdb.MeasurementFields {
|
||||
mfs := map[string]*tsdb.MeasurementFields{}
|
||||
for _, points := range a {
|
||||
for _, p := range points {
|
||||
pp := p.Encode()
|
||||
|
||||
// Create measurement field, if not exists.
|
||||
mf := mfs[string(pp.Key())]
|
||||
if mf == nil {
|
||||
mf = &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
|
||||
mfs[string(pp.Key())] = mf
|
||||
}
|
||||
|
||||
// Add all fields on the point.
|
||||
for name, value := range p.Fields {
|
||||
mf.CreateFieldIfNotExists(name, influxql.InspectDataType(value), false)
|
||||
}
|
||||
}
|
||||
}
|
||||
return mfs
|
||||
}
|
||||
|
||||
// SeriesCreate returns a list of series to create across all points.
|
||||
func (a PointsSlice) SeriesCreate() []*tsdb.SeriesCreate {
|
||||
// Create unique set of series.
|
||||
m := map[string]*tsdb.SeriesCreate{}
|
||||
for _, points := range a {
|
||||
for _, p := range points {
|
||||
if pp := p.Encode(); m[string(pp.Key())] == nil {
|
||||
m[string(pp.Key())] = &tsdb.SeriesCreate{Measurement: pp.Name(), Series: tsdb.NewSeries(string(string(pp.Key())), pp.Tags())}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to slice.
|
||||
slice := make([]*tsdb.SeriesCreate, 0, len(m))
|
||||
for _, v := range m {
|
||||
slice = append(slice, v)
|
||||
}
|
||||
return slice
|
||||
}
|
||||
|
||||
// Series returns a set of per-series data.
|
||||
func (a PointsSlice) Series() map[string]*Series {
|
||||
m := map[string]*Series{}
|
||||
for _, points := range a {
|
||||
for _, p := range points {
|
||||
pp := p.Encode()
|
||||
|
||||
// Create series if not exists.
|
||||
s := m[string(pp.Key())]
|
||||
if s == nil {
|
||||
s = &Series{
|
||||
Name: string(pp.Key()),
|
||||
Fields: make(map[string]struct{}),
|
||||
}
|
||||
m[string(pp.Key())] = s
|
||||
}
|
||||
|
||||
// Append point data.
|
||||
s.Values = append(s.Values, tsm1.NewValue(p.Time, p.Fields["value"]))
|
||||
|
||||
// Add fields.
|
||||
for k := range p.Fields {
|
||||
s.Fields[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate & sort items in each series.
|
||||
for _, s := range m {
|
||||
s.Values = DedupeValues(s.Values)
|
||||
sort.Sort(s.Values)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// Points represents a slice of points.
|
||||
type Points []Point
|
||||
|
||||
func (a Points) Encode() []models.Point {
|
||||
other := make([]models.Point, len(a))
|
||||
for i := range a {
|
||||
other[i] = a[i].Encode()
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
// Point represents a test point
|
||||
type Point struct {
|
||||
Name string
|
||||
Tags models.Tags
|
||||
Fields models.Fields
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
func (p *Point) Encode() models.Point { return models.NewPoint(p.Name, p.Tags, p.Fields, p.Time) }
|
||||
|
||||
type Series struct {
|
||||
Name string
|
||||
Fields map[string]struct{}
|
||||
Values tsm1.Values
|
||||
}
|
||||
|
||||
// FieldsSlice returns a list of field names.
|
||||
func (s *Series) FieldsSlice() []string {
|
||||
a := make([]string, 0, len(s.Fields))
|
||||
for k := range s.Fields {
|
||||
a = append(a, k)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue