parent
bf65e967aa
commit
6722e9ff14
|
@ -38,7 +38,7 @@ type DevEngine struct {
|
|||
MaxFileSize uint32
|
||||
MaxPointsPerBlock int
|
||||
|
||||
MinCacheFlushThreshold uint64
|
||||
CacheFlushMemorySizeThreshold uint64
|
||||
}
|
||||
|
||||
// NewDevEngine returns a new instance of Engine.
|
||||
|
@ -72,7 +72,7 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi
|
|||
MaxFileSize: MaxDataFileSize,
|
||||
MaxPointsPerBlock: DefaultMaxPointsPerBlock,
|
||||
|
||||
MinCacheFlushThreshold: uint64(opt.Config.WALFlushMemorySizeThreshold),
|
||||
CacheFlushMemorySizeThreshold: uint64(opt.Config.WALFlushMemorySizeThreshold),
|
||||
}
|
||||
|
||||
return e
|
||||
|
@ -185,9 +185,6 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd
|
|||
// WritePoints writes metadata and point data into the engine.
|
||||
// Returns an error if new points are added to an existing key.
|
||||
func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
values := map[string][]Value{}
|
||||
for _, p := range points {
|
||||
for k, v := range p.Fields() {
|
||||
|
@ -196,6 +193,9 @@ func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave m
|
|||
}
|
||||
}
|
||||
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
|
||||
// first try to write to the cache
|
||||
err := e.Cache.WriteMulti(values)
|
||||
if err != nil {
|
||||
|
@ -279,7 +279,7 @@ func (e *DevEngine) writeSnapshot() error {
|
|||
|
||||
func (e *DevEngine) compact() {
|
||||
for {
|
||||
if e.Cache.Size() > e.MinCacheFlushThreshold {
|
||||
if e.Cache.Size() > e.CacheFlushMemorySizeThreshold {
|
||||
err := e.writeSnapshot()
|
||||
if err != nil {
|
||||
e.logger.Printf("error writing snapshot: %v", err)
|
||||
|
|
|
@ -70,6 +70,53 @@ func TestDevEngine_CacheQueryAscending(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure an engine containing cached values responds correctly to queries.
|
||||
func Test_DevEngineCacheQueryDescending(t *testing.T) {
|
||||
// TODO: fix the cursor so this test passes
|
||||
t.Skip("pending")
|
||||
|
||||
// Generate temporary file.
|
||||
f, _ := ioutil.TempFile("", "tsm1dev")
|
||||
f.Close()
|
||||
os.Remove(f.Name())
|
||||
walPath := filepath.Join(f.Name(), "wal")
|
||||
os.MkdirAll(walPath, 0777)
|
||||
defer os.RemoveAll(f.Name())
|
||||
|
||||
// Create a few points.
|
||||
p1 := parsePoint("cpu,host=A value=1.1 1000000000")
|
||||
p2 := parsePoint("cpu,host=A value=1.2 2000000000")
|
||||
p3 := parsePoint("cpu,host=A value=1.3 3000000000")
|
||||
|
||||
// Write those points to the engine.
|
||||
e := NewDevEngine(f.Name(), walPath, tsdb.NewEngineOptions())
|
||||
if err := e.Open(); err != nil {
|
||||
t.Fatalf("failed to open tsm1dev engine: %s", err.Error())
|
||||
}
|
||||
if err := e.WritePoints([]models.Point{p1, p2, p3}, nil, nil); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
// Start a query transactions and get a cursor.
|
||||
tx := devTx{engine: e.(*DevEngine)}
|
||||
descCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, false)
|
||||
|
||||
k, v := descCursor.SeekTo(4000000000)
|
||||
if k != 3000000000 {
|
||||
t.Fatalf("failed to seek to before last key: %v %v", k, v)
|
||||
}
|
||||
|
||||
k, v = descCursor.Next()
|
||||
if k != 2000000000 {
|
||||
t.Fatalf("failed to get next key: %v %v", k, v)
|
||||
}
|
||||
|
||||
k, v = descCursor.SeekTo(1)
|
||||
if k != -1 {
|
||||
t.Fatalf("failed to seek to after first key: %v %v", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
func parsePoints(buf string) []models.Point {
|
||||
points, err := models.ParsePointsString(buf)
|
||||
if err != nil {
|
||||
|
|
|
@ -371,3 +371,43 @@ func BenchmarkWALSegmentWriter(b *testing.B) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWALSegmentReader(b *testing.B) {
|
||||
points := map[string][]tsm1.Value{}
|
||||
for i := 0; i < 5000; i++ {
|
||||
k := "cpu,host=A#!~#value"
|
||||
points[k] = append(points[k], tsm1.NewValue(time.Unix(int64(i), 0), 1.1))
|
||||
}
|
||||
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
f := MustTempFile(dir)
|
||||
w := tsm1.NewWALSegmentWriter(f)
|
||||
|
||||
write := &tsm1.WriteWALEntry{
|
||||
Values: points,
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
if err := w.Write(write); err != nil {
|
||||
b.Fatalf("unexpected error writing entry: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
r := tsm1.NewWALSegmentReader(f)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
f.Seek(0, os.SEEK_SET)
|
||||
b.StartTimer()
|
||||
|
||||
for r.Next() {
|
||||
_, err := r.Read()
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error reading entry: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue