diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index 92b64eda83..fffd2542d9 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -11,19 +11,11 @@ import ( "github.com/influxdb/influxdb/meta" ) -func TestWritePointsAndExecuteQuery(t *testing.T) { - path, _ := ioutil.TempDir("", "") - defer os.RemoveAll(path) +var shardID = uint64(1) - store := NewStore(path) - err := store.Open() - if err != nil { - t.Fatalf("error opening store: %s", err.Error()) - } - database := "foo" - retentionPolicy := "bar" - shardID := uint64(1) - store.CreateShard(database, retentionPolicy, shardID) +func TestWritePointsAndExecuteQuery(t *testing.T) { + store, executor := testStoreAndExecutor() + defer os.RemoveAll(store.path) pt := NewPoint( "cpu", @@ -32,7 +24,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { time.Unix(1, 2), ) - err = store.WriteToShard(shardID, []Point{pt}) + err := store.WriteToShard(shardID, []Point{pt}) if err != nil { t.Fatalf(err.Error()) } @@ -43,25 +35,84 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { t.Fatalf(err.Error()) } + got := executeAndGetJSON("select * from cpu", executor) + exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } + + store.Close() + store = NewStore(store.path) + err = store.Open() + if err != nil { + t.Fatalf(err.Error()) + } + executor.store = store + + got = executeAndGetJSON("select * from cpu", executor) + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } +} + +func TestDropSeriesStatement(t *testing.T) { + store, executor := testStoreAndExecutor() + defer os.RemoveAll(store.path) + + pt := NewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := store.WriteToShard(shardID, []Point{pt}) + if err != nil { + t.Fatalf(err.Error()) + } + + got := executeAndGetJSON("select * from cpu", executor) + exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1]]}]}]` + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } + + store.Close() + store = NewStore(store.path) + store.Open() +} + +func testStoreAndExecutor() (*Store, *QueryExecutor) { + path, _ := ioutil.TempDir("", "") + + store := NewStore(path) + err := store.Open() + if err != nil { + panic(err) + } + database := "foo" + retentionPolicy := "bar" + shardID := uint64(1) + store.CreateShard(database, retentionPolicy, shardID) + executor := NewQueryExecutor(store) executor.MetaStore = &testMetastore{} executor.Stats = &fakeStats{} - ch, err := executor.ExecuteQuery(mustParseQuery("select * from cpu"), "foo", 20) + return store, executor +} + +func executeAndGetJSON(query string, executor *QueryExecutor) string { + ch, err := executor.ExecuteQuery(mustParseQuery(query), "foo", 20) if err != nil { - t.Fatalf(err.Error()) + panic(err.Error()) } var results []*influxql.Result for r := range ch { results = append(results, r) } - - exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` - got := string(mustMarshalJSON(results)) - if exepected != got { - t.Fatalf("exp: %s\ngot: %s", exepected, got) - } + return string(mustMarshalJSON(results)) } type testMetastore struct{} diff --git a/tsdb/shard.go b/tsdb/shard.go index 27c40192d8..d5658563b1 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -339,6 +339,7 @@ func (s *Shard) loadMetadataIndex() error { m.FieldNames[name] = struct{}{} } mf.codec = newFieldCodec(mf.Fields) + s.measurementFields[string(k)] = mf } // load series metadata diff --git a/tsdb/store.go b/tsdb/store.go index e45de41da5..43df666542 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -57,6 +57,9 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shard := NewShard(db, shardPath) + if err := shard.Open(); err != nil { + return err + } s.shards[shardID] = shard @@ -111,7 +114,6 @@ func (s *Store) loadIndexes() error { func (s *Store) loadShards() error { // loop through the current database indexes for db := range s.databaseIndexes { - rps, err := ioutil.ReadDir(filepath.Join(s.path, db)) if err != nil { return err @@ -139,6 +141,7 @@ func (s *Store) loadShards() error { } shard := NewShard(s.databaseIndexes[db], path) + shard.Open() s.shards[shardID] = shard } } @@ -176,19 +179,8 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error { if !ok { return ErrShardNotFound } - fmt.Printf("> WriteShard %d, %d points\n", shardID, len(points)) - - // Lazily open shards when written. If the shard is already open, - // this will do nothing. - if err := sh.Open(); err != nil { - return err - } - - if err := sh.WritePoints(points); err != nil { - return err - } - return nil + return sh.WritePoints(points) } func (s *Store) Close() error {