From 9ff14b0e8ef9c6f6172c1aebe39dffd35f7b73cc Mon Sep 17 00:00:00 2001 From: John Shahid Date: Wed, 18 Dec 2013 12:28:38 -0500 Subject: [PATCH] fix #99. support list series --- CHANGELOG.md | 1 + src/api/http/api_test.go | 4 + src/coordinator/coordinator.go | 113 ++++++++++++++++++-- src/coordinator/interface.go | 1 + src/coordinator/protobuf_request_handler.go | 15 +++ src/datastore/interface.go | 1 + src/datastore/leveldb_datastore.go | 6 +- src/engine/engine.go | 31 ++++++ src/engine/engine_test.go | 4 + src/integration/benchmark_test.go | 28 +++++ src/parser/parser.go | 11 ++ src/parser/parser_test.go | 7 ++ src/parser/query.lex | 2 + src/parser/query.yacc | 10 +- src/parser/query_types.h | 1 + src/protocol/protocol.pb.go | 6 ++ src/protocol/protocol.proto | 2 + src/server/server_test.go | 40 +++++++ 18 files changed, 269 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89875295e3..d35affff5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,6 +156,7 @@ - [Issue #94](https://github.com/influxdb/influxdb/issues/94). delete queries - [Issue #116](https://github.com/influxdb/influxdb/issues/116). Use proper logging - [Issue #40](https://github.com/influxdb/influxdb/issues/40). Use TOML instead of JSON in the config file +- [Issue #99](https://github.com/influxdb/influxdb/issues/99). Support list series in the query language ## Bugfixes diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 2a1ea80a29..c1caca8d9d 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -110,6 +110,10 @@ func (self *MockCoordinator) ReplicateDelete(request *protocol.Request) error { return nil } +func (self *MockCoordinator) ListSeries(_ common.User, _ string) ([]*string, error) { + return nil, nil +} + func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error { self.series = append(self.series, series) return nil diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index dca29286a3..1baa0a3735 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -37,12 +37,14 @@ var ( // shorter constants for readability var ( - proxyWrite = protocol.Request_PROXY_WRITE - proxyDelete = protocol.Request_PROXY_DELETE - queryRequest = protocol.Request_QUERY - endStreamResponse = protocol.Response_END_STREAM - queryResponse = protocol.Response_QUERY - replayReplication = protocol.Request_REPLICATION_REPLAY + proxyWrite = protocol.Request_PROXY_WRITE + proxyDelete = protocol.Request_PROXY_DELETE + queryRequest = protocol.Request_QUERY + listSeriesRequest = protocol.Request_LIST_SERIES + listSeriesResponse = protocol.Response_LIST_SERIES + endStreamResponse = protocol.Response_END_STREAM + queryResponse = protocol.Response_QUERY + replayReplication = protocol.Request_REPLICATION_REPLAY ) func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl { @@ -154,8 +156,10 @@ func (self *CoordinatorImpl) streamResultsFromChannels(isSingleSeriesQuery, isAs } } } - leftovers = self.yieldResults(isSingleSeriesQuery, isAscending, leftovers, responses, yield) - responses = make([]*protocol.Response, 0) + if len(responses) > 0 { + leftovers = self.yieldResults(isSingleSeriesQuery, isAscending, leftovers, responses, yield) + responses = make([]*protocol.Response, 0) + } } for _, leftover := range leftovers { if len(leftover.Points) > 0 { @@ -663,6 +667,96 @@ func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error return dbs, nil } +func seriesFromListSeries(series []string) *protocol.Series { + name := "series" + now := common.CurrentTime() + points := make([]*protocol.Point, 0, len(series)) + for _, s := range series { + _s := s + points = append(points, &protocol.Point{ + Timestamp: &now, + Values: []*protocol.FieldValue{ + &protocol.FieldValue{StringValue: &_s}, + }, + }) + } + + return &protocol.Series{ + Name: &name, + Fields: []string{"name"}, + Points: points, + } +} + +func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*string, error) { + if self.clusterConfiguration.IsSingleServer() { + dbs := []*string{} + self.datastore.GetSeriesForDatabase(database, func(db string) error { + _db := db + dbs = append(dbs, &_db) + return nil + }) + return dbs, nil + } + servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(self.localHostId, &database) + id := atomic.AddUint32(&self.requestId, uint32(1)) + userName := user.GetName() + responseChannels := make([]chan *protocol.Response, 0, len(servers)+1) + for _, server := range servers { + if server.server.Id == self.localHostId { + continue + } + request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName} + if server.ringLocationsToQuery != replicationFactor { + r := server.ringLocationsToQuery + request.RingLocationsToQuery = &r + } + responseChan := make(chan *protocol.Response, 3) + server.server.protobufClient.MakeRequest(request, responseChan) + responseChannels = append(responseChannels, responseChan) + } + + local := make(chan *protocol.Response) + + responseChannels = append(responseChannels, local) + + go func() { + dbs := []string{} + self.datastore.GetSeriesForDatabase(database, func(db string) error { + dbs = append(dbs, db) + return nil + }) + local <- &protocol.Response{Type: &listSeriesResponse, Series: seriesFromListSeries(dbs)} + local <- &protocol.Response{Type: &endStreamResponse} + close(local) + }() + names := map[string]bool{} + self.streamResultsFromChannels(true, true, responseChannels, func(series *protocol.Series) error { + if *series.Name != "series" { + return fmt.Errorf("received an unexpected series with name '%s'", *series.Name) + } + + if len(series.Fields) != 1 || series.Fields[0] != "name" { + return fmt.Errorf("expected a series with one column called 'name' but received %v", series.Fields) + } + + for _, p := range series.Points { + if v := p.Values[0].StringValue; v != nil { + names[*v] = true + continue + } + return fmt.Errorf("First column should be a string value but wasn't: %v", p.Values[0]) + } + return nil + }) + returnedNames := make([]*string, 0, len(names)) + for name, _ := range names { + _name := name + returnedNames = append(returnedNames, &_name) + } + return returnedNames, nil +} + func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { if !user.IsClusterAdmin() { return common.NewAuthorizationError("Insufficient permission to drop database") @@ -676,13 +770,14 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { } func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) { - log.Debug("(raft:%s) Authenticating password for %s;%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) + log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) dbUsers := self.clusterConfiguration.dbUsers[db] if dbUsers == nil || dbUsers[username] == nil { return nil, common.NewAuthorizationError("Invalid username/password") } user := dbUsers[username] if user.isValidPwd(password) { + log.Debug("(raft:%s) User %s authenticated succesfuly", self.raftServer.(*RaftServer).raftServer.Name(), username) return user, nil } return nil, common.NewAuthorizationError("Invalid username/password") diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index adfa4bc0ec..c01389eee0 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -22,6 +22,7 @@ type Coordinator interface { DropDatabase(user common.User, db string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ListDatabases(user common.User) ([]*Database, error) + ListSeries(user common.User, database string) ([]*string, error) ReplicateWrite(request *protocol.Request) error ReplicateDelete(request *protocol.Request) error ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index fd34590400..7510e951f8 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -98,6 +98,8 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con return self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery) } else if *request.Type == protocol.Request_QUERY { go self.handleQuery(request, conn) + } else if *request.Type == protocol.Request_LIST_SERIES { + go self.handleListSeries(request, conn) } else if *request.Type == protocol.Request_REPLICATION_REPLAY { self.handleReplay(request, conn) } else { @@ -179,6 +181,19 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn self.WriteResponse(conn, response) } +func (self *ProtobufRequestHandler) handleListSeries(request *protocol.Request, conn net.Conn) { + dbs := []string{} + self.db.GetSeriesForDatabase(*request.Database, func(db string) error { + dbs = append(dbs, db) + return nil + }) + + response := &protocol.Response{RequestId: request.Id, Type: &listSeriesResponse, Series: seriesFromListSeries(dbs)} + self.WriteResponse(conn, response) + response = &protocol.Response{RequestId: request.Id, Type: &endStreamResponse} + self.WriteResponse(conn, response) +} + func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error { data, err := response.Encode() if err != nil { diff --git a/src/datastore/interface.go b/src/datastore/interface.go index 8b4ef325cb..d23e755608 100644 --- a/src/datastore/interface.go +++ b/src/datastore/interface.go @@ -19,6 +19,7 @@ type Datastore interface { AtomicIncrement(name string, val int) (uint64, error) WriteSeriesData(database string, series *protocol.Series) error DeleteSeriesData(database string, query *parser.DeleteQuery) error + GetSeriesForDatabase(database string, yield func(string) error) error DropDatabase(database string) error Close() } diff --git a/src/datastore/leveldb_datastore.go b/src/datastore/leveldb_datastore.go index e93ea98ae4..ae45d4eb68 100644 --- a/src/datastore/leveldb_datastore.go +++ b/src/datastore/leveldb_datastore.go @@ -342,7 +342,7 @@ func (self *LevelDbDatastore) DropDatabase(database string) error { wb := levigo.NewWriteBatch() defer wb.Close() - err := self.getSeriesForDb(database, func(name string) error { + err := self.GetSeriesForDatabase(database, func(name string) error { if err := self.dropSeries(database, name); err != nil { return err } @@ -897,7 +897,7 @@ func (self *LevelDbDatastore) sendBatch(query *parser.SelectQuery, series *proto return dropped, nil } -func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string) error) error { +func (self *LevelDbDatastore) GetSeriesForDatabase(database string, yield func(string) error) error { it := self.db.NewIterator(self.readOptions) defer it.Close() @@ -926,7 +926,7 @@ func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string) func (self *LevelDbDatastore) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string { names := []string{} - self.getSeriesForDb(database, func(name string) error { + self.GetSeriesForDatabase(database, func(name string) error { if regex.MatchString(name) { names = append(names, name) } diff --git a/src/engine/engine.go b/src/engine/engine.go index 000cd335e8..2ae02bc53c 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -36,6 +36,37 @@ func (self *QueryEngine) RunQuery(user common.User, database string, queryString continue } + if query.IsListQuery() { + series, err := self.coordinator.ListSeries(user, database) + if err != nil { + return err + } + seriesName := "series" + points := make([]*protocol.Point, 0, len(series)) + timestamp := time.Now().UnixNano() / 1000 + sequenceNumber := uint64(1) + for _, s := range series { + points = append(points, &protocol.Point{ + Timestamp: ×tamp, + SequenceNumber: &sequenceNumber, + Values: []*protocol.FieldValue{ + &protocol.FieldValue{ + StringValue: s, + }, + }, + }) + } + returnedSeries := &protocol.Series{ + Name: &seriesName, + Fields: []string{"name"}, + Points: points, + } + if err := yield(returnedSeries); err != nil { + return err + } + continue + } + selectQuery := query.SelectQuery if isAggregateQuery(selectQuery) { return self.executeCountQueryWithGroupBy(user, database, selectQuery, yield) diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index 3dfaf334fe..7b31c9f008 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -50,6 +50,10 @@ func (self *MockCoordinator) WriteSeriesData(user common.User, database string, return nil } +func (self *MockCoordinator) ListSeries(_ common.User, _ string) ([]*string, error) { + return nil, nil +} + func (self *MockCoordinator) DeleteSeriesData(user common.User, database string, query *parser.DeleteQuery) error { return nil } diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index 4130b950cf..87deb4d8a3 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -261,6 +261,34 @@ func (self *IntegrationSuite) TestDbUserAuthentication(c *C) { c.Assert(resp.StatusCode, Equals, http.StatusUnauthorized) } +func (self *IntegrationSuite) TestSeriesListing(c *C) { + err := self.server.WriteData(` +[ + { + "name": "test_series_listing", + "columns": ["cpu", "host"], + "points": [[99.2, "hosta"], [55.6, "hostb"]] + } +] +`) + c.Assert(err, IsNil) + + bs, err := self.server.RunQuery("list series") + c.Assert(err, IsNil) + data := []*h.SerializedSeries{} + err = json.Unmarshal(bs, &data) + c.Assert(data, HasLen, 1) + series := data[0] + c.Assert(series.Columns, HasLen, 3) + points := toMap(series) + for _, p := range points { + if p["name"] == "test_series_listing" { + return + } + } + c.Fail() +} + func (self *IntegrationSuite) TestArithmeticOperations(c *C) { queries := map[string][9]float64{ "select input + output from test_arithmetic_3.0;": [9]float64{1, 2, 3, 4, 5, 9, 6, 7, 13}, diff --git a/src/parser/parser.go b/src/parser/parser.go index 0a3954c36f..605f5db9e0 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -143,6 +143,8 @@ type SelectQuery struct { Ascending bool } +type ListQuery struct{} + type DeleteQuery struct { SelectDeleteCommonQuery } @@ -150,6 +152,7 @@ type DeleteQuery struct { type Query struct { SelectQuery *SelectQuery DeleteQuery *DeleteQuery + ListQuery *ListQuery } func (self *Query) GetQueryString() string { @@ -159,6 +162,10 @@ func (self *Query) GetQueryString() string { return self.DeleteQuery.GetQueryString() } +func (self *Query) IsListQuery() bool { + return self.ListQuery != nil +} + func (self *BasicQuery) GetQueryString() string { return self.queryString } @@ -416,6 +423,10 @@ func ParseQuery(query string) ([]*Query, error) { return nil, err } + if q.list_series_query != 0 { + return []*Query{&Query{ListQuery: &ListQuery{}}}, nil + } + if q.select_query != nil { selectQuery, err := parseSelectQuery(query, q.select_query) if err != nil { diff --git a/src/parser/parser_test.go b/src/parser/parser_test.go index 160d28616e..8c0d7163c0 100644 --- a/src/parser/parser_test.go +++ b/src/parser/parser_test.go @@ -127,6 +127,13 @@ func (self *QueryParserSuite) TestParseFromWithJoinedTable(c *C) { c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups") } +func (self *QueryParserSuite) TestParseListSeries(c *C) { + queries, err := ParseQuery("list series") + c.Assert(err, IsNil) + c.Assert(queries, HasLen, 1) + c.Assert(queries[0].IsListQuery(), Equals, true) +} + func (self *QueryParserSuite) TestParseSelectWithInsensitiveRegexTables(c *C) { q, err := ParseSelectQuery("select email from /users.*/i where time>now()-2d;") c.Assert(err, IsNil) diff --git a/src/parser/query.lex b/src/parser/query.lex index 605f7b9a39..74110ff579 100644 --- a/src/parser/query.lex +++ b/src/parser/query.lex @@ -24,6 +24,8 @@ static int yycolumn = 1; ; { return *yytext; } , { return *yytext; } "merge" { return MERGE; } +"list" { return LIST; } +"series" { return SERIES; } "inner" { return INNER; } "join" { return JOIN; } "from" { return FROM; } diff --git a/src/parser/query.yacc b/src/parser/query.yacc index 5f34c06b9e..dd31b6b978 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -69,7 +69,7 @@ value *create_expression_value(char *operator, size_t size, ...) { %lex-param {void *scanner} // define types of tokens (terminals) -%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS +%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES %token STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP %token NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION @@ -144,6 +144,12 @@ QUERY: $$ = calloc(1, sizeof(query)); $$->delete_query = $1; } + | + LIST SERIES + { + $$ = calloc(1, sizeof(query)); + $$->list_series_query = TRUE; + } DELETE_QUERY: DELETE FROM_CLAUSE WHERE_CLAUSE @@ -505,7 +511,7 @@ void yy_delete_buffer(void *, void *); query parse_query(char *const query_s) { - query q = {NULL, NULL, NULL}; + query q = {NULL, NULL, FALSE, NULL}; void *scanner; yylex_init(&scanner); #ifdef DEBUG diff --git a/src/parser/query_types.h b/src/parser/query_types.h index c49957e06c..338779fcb0 100644 --- a/src/parser/query_types.h +++ b/src/parser/query_types.h @@ -92,6 +92,7 @@ typedef struct { typedef struct { select_query *select_query; delete_query *delete_query; + char list_series_query; error *error; } query; diff --git a/src/protocol/protocol.pb.go b/src/protocol/protocol.pb.go index 4a03c1bca1..b516f709bc 100644 --- a/src/protocol/protocol.pb.go +++ b/src/protocol/protocol.pb.go @@ -22,6 +22,7 @@ const ( Request_REPLICATION_DELETE Request_Type = 4 Request_PROXY_DELETE Request_Type = 5 Request_REPLICATION_REPLAY Request_Type = 6 + Request_LIST_SERIES Request_Type = 7 ) var Request_Type_name = map[int32]string{ @@ -31,6 +32,7 @@ var Request_Type_name = map[int32]string{ 4: "REPLICATION_DELETE", 5: "PROXY_DELETE", 6: "REPLICATION_REPLAY", + 7: "LIST_SERIES", } var Request_Type_value = map[string]int32{ "QUERY": 1, @@ -39,6 +41,7 @@ var Request_Type_value = map[string]int32{ "REPLICATION_DELETE": 4, "PROXY_DELETE": 5, "REPLICATION_REPLAY": 6, + "LIST_SERIES": 7, } func (x Request_Type) Enum() *Request_Type { @@ -66,6 +69,7 @@ const ( Response_END_STREAM Response_Type = 3 Response_REPLICATION_REPLAY Response_Type = 4 Response_REPLICATION_REPLAY_END Response_Type = 5 + Response_LIST_SERIES Response_Type = 6 ) var Response_Type_name = map[int32]string{ @@ -74,6 +78,7 @@ var Response_Type_name = map[int32]string{ 3: "END_STREAM", 4: "REPLICATION_REPLAY", 5: "REPLICATION_REPLAY_END", + 6: "LIST_SERIES", } var Response_Type_value = map[string]int32{ "QUERY": 1, @@ -81,6 +86,7 @@ var Response_Type_value = map[string]int32{ "END_STREAM": 3, "REPLICATION_REPLAY": 4, "REPLICATION_REPLAY_END": 5, + "LIST_SERIES": 6, } func (x Response_Type) Enum() *Response_Type { diff --git a/src/protocol/protocol.proto b/src/protocol/protocol.proto index b1a4120f74..37e41faed9 100644 --- a/src/protocol/protocol.proto +++ b/src/protocol/protocol.proto @@ -32,6 +32,7 @@ message Request { REPLICATION_DELETE = 4; PROXY_DELETE = 5; REPLICATION_REPLAY = 6; + LIST_SERIES = 7; } required uint32 id = 1; required Type type = 2; @@ -67,6 +68,7 @@ message Response { END_STREAM = 3; REPLICATION_REPLAY = 4; REPLICATION_REPLAY_END = 5; + LIST_SERIES = 6; } enum ErrorCode { REQUEST_TOO_LARGE = 1; diff --git a/src/server/server_test.go b/src/server/server_test.go index ace56b4011..183c74d977 100644 --- a/src/server/server_test.go +++ b/src/server/server_test.go @@ -256,6 +256,46 @@ func (self *ServerSuite) TestDeleteReplication(c *C) { } } +func (self *ServerSuite) TestListSeries(c *C) { + // create a new db + resp, err := self.postToServer(self.servers[0], "/db?u=root&p=root", `{"name": "list_series", "replicationFactor": 2}`, c) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusCreated) + resp, err = self.postToServer(self.servers[0], "/db/list_series/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(time.Millisecond * 50) + + data := `[{ + "name": "cluster_query", + "columns": ["val1"], + "points": [[1]] + }]` + resp, err = self.postToServer(self.servers[0], "/db/list_series/series?u=paul&p=pass", data, c) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + time.Sleep(50 * time.Millisecond) + + for _, s := range self.servers { + query := "list series" + encodedQuery := url.QueryEscape(query) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/db/list_series/series?u=paul&p=pass&q=%s", s.Config.ApiHttpPort, encodedQuery)) + c.Assert(err, IsNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + resp.Body.Close() + var results []map[string]interface{} + err = json.Unmarshal(body, &results) + c.Assert(err, IsNil) + c.Assert(results, HasLen, 1) + points := results[0]["points"].([]interface{}) + c.Assert(points, HasLen, 1) + point := points[0].([]interface{}) + c.Assert(point[len(point)-1].(string), Equals, "cluster_query") + } +} + func (self *ServerSuite) TestCrossClusterQueries(c *C) { data := `[{ "name": "cluster_query",