fix #99. support list series

pull/150/merge
John Shahid 2013-12-18 12:28:38 -05:00
parent 31bd297c18
commit 9ff14b0e8e
18 changed files with 269 additions and 14 deletions

View File

@ -156,6 +156,7 @@
- [Issue #94](https://github.com/influxdb/influxdb/issues/94). delete queries
- [Issue #116](https://github.com/influxdb/influxdb/issues/116). Use proper logging
- [Issue #40](https://github.com/influxdb/influxdb/issues/40). Use TOML instead of JSON in the config file
- [Issue #99](https://github.com/influxdb/influxdb/issues/99). Support list series in the query language
## Bugfixes

View File

@ -110,6 +110,10 @@ func (self *MockCoordinator) ReplicateDelete(request *protocol.Request) error {
return nil
}
func (self *MockCoordinator) ListSeries(_ common.User, _ string) ([]*string, error) {
return nil, nil
}
func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error {
self.series = append(self.series, series)
return nil

View File

@ -37,12 +37,14 @@ var (
// shorter constants for readability
var (
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
queryRequest = protocol.Request_QUERY
listSeriesRequest = protocol.Request_LIST_SERIES
listSeriesResponse = protocol.Response_LIST_SERIES
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
)
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl {
@ -154,8 +156,10 @@ func (self *CoordinatorImpl) streamResultsFromChannels(isSingleSeriesQuery, isAs
}
}
}
leftovers = self.yieldResults(isSingleSeriesQuery, isAscending, leftovers, responses, yield)
responses = make([]*protocol.Response, 0)
if len(responses) > 0 {
leftovers = self.yieldResults(isSingleSeriesQuery, isAscending, leftovers, responses, yield)
responses = make([]*protocol.Response, 0)
}
}
for _, leftover := range leftovers {
if len(leftover.Points) > 0 {
@ -663,6 +667,96 @@ func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error
return dbs, nil
}
func seriesFromListSeries(series []string) *protocol.Series {
name := "series"
now := common.CurrentTime()
points := make([]*protocol.Point, 0, len(series))
for _, s := range series {
_s := s
points = append(points, &protocol.Point{
Timestamp: &now,
Values: []*protocol.FieldValue{
&protocol.FieldValue{StringValue: &_s},
},
})
}
return &protocol.Series{
Name: &name,
Fields: []string{"name"},
Points: points,
}
}
func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*string, error) {
if self.clusterConfiguration.IsSingleServer() {
dbs := []*string{}
self.datastore.GetSeriesForDatabase(database, func(db string) error {
_db := db
dbs = append(dbs, &_db)
return nil
})
return dbs, nil
}
servers, replicationFactor := self.clusterConfiguration.GetServersToMakeQueryTo(self.localHostId, &database)
id := atomic.AddUint32(&self.requestId, uint32(1))
userName := user.GetName()
responseChannels := make([]chan *protocol.Response, 0, len(servers)+1)
for _, server := range servers {
if server.server.Id == self.localHostId {
continue
}
request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName}
if server.ringLocationsToQuery != replicationFactor {
r := server.ringLocationsToQuery
request.RingLocationsToQuery = &r
}
responseChan := make(chan *protocol.Response, 3)
server.server.protobufClient.MakeRequest(request, responseChan)
responseChannels = append(responseChannels, responseChan)
}
local := make(chan *protocol.Response)
responseChannels = append(responseChannels, local)
go func() {
dbs := []string{}
self.datastore.GetSeriesForDatabase(database, func(db string) error {
dbs = append(dbs, db)
return nil
})
local <- &protocol.Response{Type: &listSeriesResponse, Series: seriesFromListSeries(dbs)}
local <- &protocol.Response{Type: &endStreamResponse}
close(local)
}()
names := map[string]bool{}
self.streamResultsFromChannels(true, true, responseChannels, func(series *protocol.Series) error {
if *series.Name != "series" {
return fmt.Errorf("received an unexpected series with name '%s'", *series.Name)
}
if len(series.Fields) != 1 || series.Fields[0] != "name" {
return fmt.Errorf("expected a series with one column called 'name' but received %v", series.Fields)
}
for _, p := range series.Points {
if v := p.Values[0].StringValue; v != nil {
names[*v] = true
continue
}
return fmt.Errorf("First column should be a string value but wasn't: %v", p.Values[0])
}
return nil
})
returnedNames := make([]*string, 0, len(names))
for name, _ := range names {
_name := name
returnedNames = append(returnedNames, &_name)
}
return returnedNames, nil
}
func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permission to drop database")
@ -676,13 +770,14 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
}
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {
log.Debug("(raft:%s) Authenticating password for %s;%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
dbUsers := self.clusterConfiguration.dbUsers[db]
if dbUsers == nil || dbUsers[username] == nil {
return nil, common.NewAuthorizationError("Invalid username/password")
}
user := dbUsers[username]
if user.isValidPwd(password) {
log.Debug("(raft:%s) User %s authenticated succesfuly", self.raftServer.(*RaftServer).raftServer.Name(), username)
return user, nil
}
return nil, common.NewAuthorizationError("Invalid username/password")

View File

@ -22,6 +22,7 @@ type Coordinator interface {
DropDatabase(user common.User, db string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ListDatabases(user common.User) ([]*Database, error)
ListSeries(user common.User, database string) ([]*string, error)
ReplicateWrite(request *protocol.Request) error
ReplicateDelete(request *protocol.Request) error
ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64)

View File

@ -98,6 +98,8 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
return self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery)
} else if *request.Type == protocol.Request_QUERY {
go self.handleQuery(request, conn)
} else if *request.Type == protocol.Request_LIST_SERIES {
go self.handleListSeries(request, conn)
} else if *request.Type == protocol.Request_REPLICATION_REPLAY {
self.handleReplay(request, conn)
} else {
@ -179,6 +181,19 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
self.WriteResponse(conn, response)
}
func (self *ProtobufRequestHandler) handleListSeries(request *protocol.Request, conn net.Conn) {
dbs := []string{}
self.db.GetSeriesForDatabase(*request.Database, func(db string) error {
dbs = append(dbs, db)
return nil
})
response := &protocol.Response{RequestId: request.Id, Type: &listSeriesResponse, Series: seriesFromListSeries(dbs)}
self.WriteResponse(conn, response)
response = &protocol.Response{RequestId: request.Id, Type: &endStreamResponse}
self.WriteResponse(conn, response)
}
func (self *ProtobufRequestHandler) WriteResponse(conn net.Conn, response *protocol.Response) error {
data, err := response.Encode()
if err != nil {

View File

@ -19,6 +19,7 @@ type Datastore interface {
AtomicIncrement(name string, val int) (uint64, error)
WriteSeriesData(database string, series *protocol.Series) error
DeleteSeriesData(database string, query *parser.DeleteQuery) error
GetSeriesForDatabase(database string, yield func(string) error) error
DropDatabase(database string) error
Close()
}

View File

@ -342,7 +342,7 @@ func (self *LevelDbDatastore) DropDatabase(database string) error {
wb := levigo.NewWriteBatch()
defer wb.Close()
err := self.getSeriesForDb(database, func(name string) error {
err := self.GetSeriesForDatabase(database, func(name string) error {
if err := self.dropSeries(database, name); err != nil {
return err
}
@ -897,7 +897,7 @@ func (self *LevelDbDatastore) sendBatch(query *parser.SelectQuery, series *proto
return dropped, nil
}
func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string) error) error {
func (self *LevelDbDatastore) GetSeriesForDatabase(database string, yield func(string) error) error {
it := self.db.NewIterator(self.readOptions)
defer it.Close()
@ -926,7 +926,7 @@ func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string)
func (self *LevelDbDatastore) getSeriesForDbAndRegex(database string, regex *regexp.Regexp) []string {
names := []string{}
self.getSeriesForDb(database, func(name string) error {
self.GetSeriesForDatabase(database, func(name string) error {
if regex.MatchString(name) {
names = append(names, name)
}

View File

@ -36,6 +36,37 @@ func (self *QueryEngine) RunQuery(user common.User, database string, queryString
continue
}
if query.IsListQuery() {
series, err := self.coordinator.ListSeries(user, database)
if err != nil {
return err
}
seriesName := "series"
points := make([]*protocol.Point, 0, len(series))
timestamp := time.Now().UnixNano() / 1000
sequenceNumber := uint64(1)
for _, s := range series {
points = append(points, &protocol.Point{
Timestamp: &timestamp,
SequenceNumber: &sequenceNumber,
Values: []*protocol.FieldValue{
&protocol.FieldValue{
StringValue: s,
},
},
})
}
returnedSeries := &protocol.Series{
Name: &seriesName,
Fields: []string{"name"},
Points: points,
}
if err := yield(returnedSeries); err != nil {
return err
}
continue
}
selectQuery := query.SelectQuery
if isAggregateQuery(selectQuery) {
return self.executeCountQueryWithGroupBy(user, database, selectQuery, yield)

View File

@ -50,6 +50,10 @@ func (self *MockCoordinator) WriteSeriesData(user common.User, database string,
return nil
}
func (self *MockCoordinator) ListSeries(_ common.User, _ string) ([]*string, error) {
return nil, nil
}
func (self *MockCoordinator) DeleteSeriesData(user common.User, database string, query *parser.DeleteQuery) error {
return nil
}

View File

@ -261,6 +261,34 @@ func (self *IntegrationSuite) TestDbUserAuthentication(c *C) {
c.Assert(resp.StatusCode, Equals, http.StatusUnauthorized)
}
func (self *IntegrationSuite) TestSeriesListing(c *C) {
err := self.server.WriteData(`
[
{
"name": "test_series_listing",
"columns": ["cpu", "host"],
"points": [[99.2, "hosta"], [55.6, "hostb"]]
}
]
`)
c.Assert(err, IsNil)
bs, err := self.server.RunQuery("list series")
c.Assert(err, IsNil)
data := []*h.SerializedSeries{}
err = json.Unmarshal(bs, &data)
c.Assert(data, HasLen, 1)
series := data[0]
c.Assert(series.Columns, HasLen, 3)
points := toMap(series)
for _, p := range points {
if p["name"] == "test_series_listing" {
return
}
}
c.Fail()
}
func (self *IntegrationSuite) TestArithmeticOperations(c *C) {
queries := map[string][9]float64{
"select input + output from test_arithmetic_3.0;": [9]float64{1, 2, 3, 4, 5, 9, 6, 7, 13},

View File

@ -143,6 +143,8 @@ type SelectQuery struct {
Ascending bool
}
type ListQuery struct{}
type DeleteQuery struct {
SelectDeleteCommonQuery
}
@ -150,6 +152,7 @@ type DeleteQuery struct {
type Query struct {
SelectQuery *SelectQuery
DeleteQuery *DeleteQuery
ListQuery *ListQuery
}
func (self *Query) GetQueryString() string {
@ -159,6 +162,10 @@ func (self *Query) GetQueryString() string {
return self.DeleteQuery.GetQueryString()
}
func (self *Query) IsListQuery() bool {
return self.ListQuery != nil
}
func (self *BasicQuery) GetQueryString() string {
return self.queryString
}
@ -416,6 +423,10 @@ func ParseQuery(query string) ([]*Query, error) {
return nil, err
}
if q.list_series_query != 0 {
return []*Query{&Query{ListQuery: &ListQuery{}}}, nil
}
if q.select_query != nil {
selectQuery, err := parseSelectQuery(query, q.select_query)
if err != nil {

View File

@ -127,6 +127,13 @@ func (self *QueryParserSuite) TestParseFromWithJoinedTable(c *C) {
c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups")
}
func (self *QueryParserSuite) TestParseListSeries(c *C) {
queries, err := ParseQuery("list series")
c.Assert(err, IsNil)
c.Assert(queries, HasLen, 1)
c.Assert(queries[0].IsListQuery(), Equals, true)
}
func (self *QueryParserSuite) TestParseSelectWithInsensitiveRegexTables(c *C) {
q, err := ParseSelectQuery("select email from /users.*/i where time>now()-2d;")
c.Assert(err, IsNil)

View File

@ -24,6 +24,8 @@ static int yycolumn = 1;
; { return *yytext; }
, { return *yytext; }
"merge" { return MERGE; }
"list" { return LIST; }
"series" { return SERIES; }
"inner" { return INNER; }
"join" { return JOIN; }
"from" { return FROM; }

View File

@ -69,7 +69,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
%lex-param {void *scanner}
// define types of tokens (terminals)
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES
%token <string> STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP
%token <string> NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION
@ -144,6 +144,12 @@ QUERY:
$$ = calloc(1, sizeof(query));
$$->delete_query = $1;
}
|
LIST SERIES
{
$$ = calloc(1, sizeof(query));
$$->list_series_query = TRUE;
}
DELETE_QUERY:
DELETE FROM_CLAUSE WHERE_CLAUSE
@ -505,7 +511,7 @@ void yy_delete_buffer(void *, void *);
query
parse_query(char *const query_s)
{
query q = {NULL, NULL, NULL};
query q = {NULL, NULL, FALSE, NULL};
void *scanner;
yylex_init(&scanner);
#ifdef DEBUG

View File

@ -92,6 +92,7 @@ typedef struct {
typedef struct {
select_query *select_query;
delete_query *delete_query;
char list_series_query;
error *error;
} query;

View File

@ -22,6 +22,7 @@ const (
Request_REPLICATION_DELETE Request_Type = 4
Request_PROXY_DELETE Request_Type = 5
Request_REPLICATION_REPLAY Request_Type = 6
Request_LIST_SERIES Request_Type = 7
)
var Request_Type_name = map[int32]string{
@ -31,6 +32,7 @@ var Request_Type_name = map[int32]string{
4: "REPLICATION_DELETE",
5: "PROXY_DELETE",
6: "REPLICATION_REPLAY",
7: "LIST_SERIES",
}
var Request_Type_value = map[string]int32{
"QUERY": 1,
@ -39,6 +41,7 @@ var Request_Type_value = map[string]int32{
"REPLICATION_DELETE": 4,
"PROXY_DELETE": 5,
"REPLICATION_REPLAY": 6,
"LIST_SERIES": 7,
}
func (x Request_Type) Enum() *Request_Type {
@ -66,6 +69,7 @@ const (
Response_END_STREAM Response_Type = 3
Response_REPLICATION_REPLAY Response_Type = 4
Response_REPLICATION_REPLAY_END Response_Type = 5
Response_LIST_SERIES Response_Type = 6
)
var Response_Type_name = map[int32]string{
@ -74,6 +78,7 @@ var Response_Type_name = map[int32]string{
3: "END_STREAM",
4: "REPLICATION_REPLAY",
5: "REPLICATION_REPLAY_END",
6: "LIST_SERIES",
}
var Response_Type_value = map[string]int32{
"QUERY": 1,
@ -81,6 +86,7 @@ var Response_Type_value = map[string]int32{
"END_STREAM": 3,
"REPLICATION_REPLAY": 4,
"REPLICATION_REPLAY_END": 5,
"LIST_SERIES": 6,
}
func (x Response_Type) Enum() *Response_Type {

View File

@ -32,6 +32,7 @@ message Request {
REPLICATION_DELETE = 4;
PROXY_DELETE = 5;
REPLICATION_REPLAY = 6;
LIST_SERIES = 7;
}
required uint32 id = 1;
required Type type = 2;
@ -67,6 +68,7 @@ message Response {
END_STREAM = 3;
REPLICATION_REPLAY = 4;
REPLICATION_REPLAY_END = 5;
LIST_SERIES = 6;
}
enum ErrorCode {
REQUEST_TOO_LARGE = 1;

View File

@ -256,6 +256,46 @@ func (self *ServerSuite) TestDeleteReplication(c *C) {
}
}
func (self *ServerSuite) TestListSeries(c *C) {
// create a new db
resp, err := self.postToServer(self.servers[0], "/db?u=root&p=root", `{"name": "list_series", "replicationFactor": 2}`, c)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusCreated)
resp, err = self.postToServer(self.servers[0], "/db/list_series/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
time.Sleep(time.Millisecond * 50)
data := `[{
"name": "cluster_query",
"columns": ["val1"],
"points": [[1]]
}]`
resp, err = self.postToServer(self.servers[0], "/db/list_series/series?u=paul&p=pass", data, c)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
time.Sleep(50 * time.Millisecond)
for _, s := range self.servers {
query := "list series"
encodedQuery := url.QueryEscape(query)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/db/list_series/series?u=paul&p=pass&q=%s", s.Config.ApiHttpPort, encodedQuery))
c.Assert(err, IsNil)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
resp.Body.Close()
var results []map[string]interface{}
err = json.Unmarshal(body, &results)
c.Assert(err, IsNil)
c.Assert(results, HasLen, 1)
points := results[0]["points"].([]interface{})
c.Assert(points, HasLen, 1)
point := points[0].([]interface{})
c.Assert(point[len(point)-1].(string), Equals, "cluster_query")
}
}
func (self *ServerSuite) TestCrossClusterQueries(c *C) {
data := `[{
"name": "cluster_query",