Make store open every shard on load. Fix shard to set measurementFields on load.

Fixes issue where queries wouldn't be able to hit anything because the index does't load until the shard is open.

Fix an issue where field codecs weren't populated in the shard when loading.
pull/2710/head
Paul Dix 2015-05-29 17:15:05 -04:00
parent 2fec857988
commit 15d37fd388
3 changed files with 78 additions and 34 deletions

View File

@ -11,19 +11,11 @@ import (
"github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/meta"
) )
func TestWritePointsAndExecuteQuery(t *testing.T) { var shardID = uint64(1)
path, _ := ioutil.TempDir("", "")
defer os.RemoveAll(path)
store := NewStore(path) func TestWritePointsAndExecuteQuery(t *testing.T) {
err := store.Open() store, executor := testStoreAndExecutor()
if err != nil { defer os.RemoveAll(store.path)
t.Fatalf("error opening store: %s", err.Error())
}
database := "foo"
retentionPolicy := "bar"
shardID := uint64(1)
store.CreateShard(database, retentionPolicy, shardID)
pt := NewPoint( pt := NewPoint(
"cpu", "cpu",
@ -32,7 +24,7 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
time.Unix(1, 2), time.Unix(1, 2),
) )
err = store.WriteToShard(shardID, []Point{pt}) err := store.WriteToShard(shardID, []Point{pt})
if err != nil { if err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -43,25 +35,84 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
got := executeAndGetJSON("select * from cpu", executor)
exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
store.Close()
store = NewStore(store.path)
err = store.Open()
if err != nil {
t.Fatalf(err.Error())
}
executor.store = store
got = executeAndGetJSON("select * from cpu", executor)
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
}
func TestDropSeriesStatement(t *testing.T) {
store, executor := testStoreAndExecutor()
defer os.RemoveAll(store.path)
pt := NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := store.WriteToShard(shardID, []Point{pt})
if err != nil {
t.Fatalf(err.Error())
}
got := executeAndGetJSON("select * from cpu", executor)
exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
store.Close()
store = NewStore(store.path)
store.Open()
}
func testStoreAndExecutor() (*Store, *QueryExecutor) {
path, _ := ioutil.TempDir("", "")
store := NewStore(path)
err := store.Open()
if err != nil {
panic(err)
}
database := "foo"
retentionPolicy := "bar"
shardID := uint64(1)
store.CreateShard(database, retentionPolicy, shardID)
executor := NewQueryExecutor(store) executor := NewQueryExecutor(store)
executor.MetaStore = &testMetastore{} executor.MetaStore = &testMetastore{}
executor.Stats = &fakeStats{} executor.Stats = &fakeStats{}
ch, err := executor.ExecuteQuery(mustParseQuery("select * from cpu"), "foo", 20) return store, executor
}
func executeAndGetJSON(query string, executor *QueryExecutor) string {
ch, err := executor.ExecuteQuery(mustParseQuery(query), "foo", 20)
if err != nil { if err != nil {
t.Fatalf(err.Error()) panic(err.Error())
} }
var results []*influxql.Result var results []*influxql.Result
for r := range ch { for r := range ch {
results = append(results, r) results = append(results, r)
} }
return string(mustMarshalJSON(results))
exepected := `[{"series":[{"name":"cpu","columns":["time","value"],"values":[["1970-01-01T00:00:01.000000002Z",1],["1970-01-01T00:00:02.000000003Z",1]]}]}]`
got := string(mustMarshalJSON(results))
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
} }
type testMetastore struct{} type testMetastore struct{}

View File

@ -339,6 +339,7 @@ func (s *Shard) loadMetadataIndex() error {
m.FieldNames[name] = struct{}{} m.FieldNames[name] = struct{}{}
} }
mf.codec = newFieldCodec(mf.Fields) mf.codec = newFieldCodec(mf.Fields)
s.measurementFields[string(k)] = mf
} }
// load series metadata // load series metadata

View File

@ -57,6 +57,9 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shardPath := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(db, shardPath) shard := NewShard(db, shardPath)
if err := shard.Open(); err != nil {
return err
}
s.shards[shardID] = shard s.shards[shardID] = shard
@ -111,7 +114,6 @@ func (s *Store) loadIndexes() error {
func (s *Store) loadShards() error { func (s *Store) loadShards() error {
// loop through the current database indexes // loop through the current database indexes
for db := range s.databaseIndexes { for db := range s.databaseIndexes {
rps, err := ioutil.ReadDir(filepath.Join(s.path, db)) rps, err := ioutil.ReadDir(filepath.Join(s.path, db))
if err != nil { if err != nil {
return err return err
@ -139,6 +141,7 @@ func (s *Store) loadShards() error {
} }
shard := NewShard(s.databaseIndexes[db], path) shard := NewShard(s.databaseIndexes[db], path)
shard.Open()
s.shards[shardID] = shard s.shards[shardID] = shard
} }
} }
@ -176,19 +179,8 @@ func (s *Store) WriteToShard(shardID uint64, points []Point) error {
if !ok { if !ok {
return ErrShardNotFound return ErrShardNotFound
} }
fmt.Printf("> WriteShard %d, %d points\n", shardID, len(points))
// Lazily open shards when written. If the shard is already open,
// this will do nothing.
if err := sh.Open(); err != nil {
return err
}
if err := sh.WritePoints(points); err != nil {
return err
}
return nil
return sh.WritePoints(points)
} }
func (s *Store) Close() error { func (s *Store) Close() error {