From 15d37fd3881aecdef2093af59841be540fa9c0cd Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 29 May 2015 17:15:05 -0400 Subject: [PATCH] Make store open every shard on load. Fix shard to set measurementFields on load. Fixes issue where queries wouldn't be able to hit anything because the index does't load until the shard is open. Fix an issue where field codecs weren't populated in the shard when loading. --- tsdb/query_executor_test.go | 93 ++++++++++++++++++++++++++++--------- tsdb/shard.go | 1 + tsdb/store.go | 18 ++----- 3 files changed, 78 insertions(+), 34 deletions(-) diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index 92b64eda83..fffd2542d9 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -11,19 +11,11 @@ import ( "github.com/influxdb/influxdb/meta" ) -func TestWritePointsAndExecuteQuery(t *testing.T) { - path, _ := ioutil.TempDir("", "") - defer os.RemoveAll(path) +var shardID = uint64(1) - store := NewStore(path) - err := store.Open() - if err != nil { - t.Fatalf("error opening store: %s", err.Error()) - } - database := "foo" - retentionPolicy := "bar" - shardID := uint64(1) - store.CreateShard(database, retentionPolicy, shardID) +func TestWritePointsAndExecuteQuery(t *testing.T) { + store, executor := testStoreAndExecutor() + defer os.RemoveAll(store.path) pt := NewPoint( "cpu", @@ -32,7 +24,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { time.Unix(1, 2), ) - err = store.WriteToShard(shardID, []Point{pt}) + err := store.WriteToShard(shardID, []Point{pt}) if err != nil { t.Fatalf(err.Error()) } @@ -43,25 +35,84 @@ func TestWritePointsAndExecuteQuery(t *testing.T) { t.Fatalf(err.Error()) } + got := executeAndGetJSON("select * from cpu", executor) + exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } + + store.Close() + store = NewStore(store.path) + err = store.Open() + if err != nil { + t.Fatalf(err.Error()) + } + executor.store = store + + got = executeAndGetJSON("select * from cpu", executor) + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } +} + +func TestDropSeriesStatement(t *testing.T) { + store, executor := testStoreAndExecutor() + defer os.RemoveAll(store.path) + + pt := NewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := store.WriteToShard(shardID, []Point{pt}) + if err != nil { + t.Fatalf(err.Error()) + } + + got := executeAndGetJSON("select * from cpu", executor) + exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1]]}]}]` + if exepected != got { + t.Fatalf("exp: %s\ngot: %s", exepected, got) + } + + store.Close() + store = NewStore(store.path) + store.Open() +} + +func testStoreAndExecutor() (*Store, *QueryExecutor) { + path, _ := ioutil.TempDir("", "") + + store := NewStore(path) + err := store.Open() + if err != nil { + panic(err) + } + database := "foo" + retentionPolicy := "bar" + shardID := uint64(1) + store.CreateShard(database, retentionPolicy, shardID) + executor := NewQueryExecutor(store) executor.MetaStore = &testMetastore{} executor.Stats = &fakeStats{} - ch, err := executor.ExecuteQuery(mustParseQuery("select * from cpu"), "foo", 20) + return store, executor +} + +func executeAndGetJSON(query string, executor *QueryExecutor) string { + ch, err := executor.ExecuteQuery(mustParseQuery(query), "foo", 20) if err != nil { - t.Fatalf(err.Error()) + panic(err.Error()) } var results []*influxql.Result for r := range ch { results = append(results, r) } - - exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]` - got := string(mustMarshalJSON(results)) - if exepected != got { - t.Fatalf("exp: %s\ngot: %s", exepected, got) - } + return string(mustMarshalJSON(results)) } type testMetastore struct{} diff --git a/tsdb/shard.go b/tsdb/shard.go index 27c40192d8..d5658563b1 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -339,6 +339,7 @@ func (s *Shard) loadMetadataIndex() error { m.FieldNames[name] = struct{}{} } mf.codec = newFieldCodec(mf.Fields) + s.measurementFields[string(k)] = mf } // load series metadata diff --git a/tsdb/store.go b/tsdb/store.go index e45de41da5..43df666542 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -57,6 +57,9 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shard := NewShard(db, shardPath) + if err := shard.Open(); err != nil { + return err + } s.shards[shardID] = shard @@ -111,7 +114,6 @@ func (s *Store) loadIndexes() error { func (s *Store) loadShards() error { // loop through the current database indexes for db := range s.databaseIndexes { - rps, err := ioutil.ReadDir(filepath.Join(s.path, db)) if err != nil { return err @@ -139,6 +141,7 @@ func (s *Store) loadShards() error { } shard := NewShard(s.databaseIndexes[db], path) + shard.Open() s.shards[shardID] = shard } } @@ -176,19 +179,8 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error { if !ok { return ErrShardNotFound } - fmt.Printf("> WriteShard %d, %d points\n", shardID, len(points)) - - // Lazily open shards when written. If the shard is already open, - // this will do nothing. - if err := sh.Open(); err != nil { - return err - } - - if err := sh.WritePoints(points); err != nil { - return err - } - return nil + return sh.WritePoints(points) } func (s *Store) Close() error {