remove Engine.Flush()
parent
a7f50ae03c
commit
cc0607a5cf
|
@ -32,8 +32,6 @@ type Engine interface {
|
|||
DeleteSeries(keys []string) error
|
||||
DeleteMeasurement(name string, seriesKeys []string) error
|
||||
SeriesCount() (n int, err error)
|
||||
|
||||
Flush(partitionFlushDelay time.Duration) error
|
||||
}
|
||||
|
||||
// NewEngineFunc creates a new engine.
|
||||
|
|
|
@ -61,63 +61,6 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that points can be written and flushed even after a restart.
|
||||
func TestWritePointsAndExecuteQuery_FlushRestart(t *testing.T) {
|
||||
store, executor := testStoreAndExecutor()
|
||||
defer os.RemoveAll(store.Path())
|
||||
|
||||
// Write first point.
|
||||
if err := store.WriteToShard(shardID, []tsdb.Point{tsdb.NewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(1, 2),
|
||||
)}); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
// Write second point.
|
||||
if err := store.WriteToShard(shardID, []tsdb.Point{tsdb.NewPoint(
|
||||
"cpu",
|
||||
map[string]string{"host": "server"},
|
||||
map[string]interface{}{"value": 1.0},
|
||||
time.Unix(2, 3),
|
||||
)}); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
// Restart the store.
|
||||
if err := store.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err = store.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Flush WAL data to the index.
|
||||
if err := store.Flush(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := executeAndGetJSON("select * from cpu", executor)
|
||||
exepected := `[{"series":[{"name":"cpu","tags":{"host":"server"},"columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]`
|
||||
if exepected != got {
|
||||
t.Fatalf("exp: %s\ngot: %s", exepected, got)
|
||||
}
|
||||
|
||||
store.Close()
|
||||
store = tsdb.NewStore(store.Path())
|
||||
if err := store.Open(); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
executor.Store = store
|
||||
executor.ShardMapper = &testShardMapper{store: store}
|
||||
|
||||
got = executeAndGetJSON("select * from cpu", executor)
|
||||
if exepected != got {
|
||||
t.Fatalf("exp: %s\ngot: %s", exepected, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDropSeriesStatement(t *testing.T) {
|
||||
store, executor := testStoreAndExecutor()
|
||||
defer os.RemoveAll(store.Path())
|
||||
|
|
|
@ -281,18 +281,6 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error {
|
|||
return sh.WritePoints(points)
|
||||
}
|
||||
|
||||
// Flush forces all shards to write their WAL data to the index.
|
||||
func (s *Store) Flush() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for shardID, sh := range s.shards {
|
||||
if err := sh.engine.Flush(s.EngineOptions.WALPartitionFlushDelay); err != nil {
|
||||
return fmt.Errorf("flush: shard=%d, err=%s", shardID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error) {
|
||||
q, err := influxql.NewParser(strings.NewReader(query)).ParseStatement()
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue