parent
12a9d59f8f
commit
e6471f8e66
|
@ -116,6 +116,7 @@
|
|||
## Features
|
||||
|
||||
- [Issue #51](https://github.com/influxdb/influxdb/issues/51). Implement first and last aggregates
|
||||
- [Issue #35](https://github.com/influxdb/influxdb/issues/35). Support table aliases in Join Queries
|
||||
|
||||
## Bugfixes
|
||||
|
||||
|
|
|
@ -2,11 +2,13 @@ package common
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"protocol"
|
||||
"time"
|
||||
)
|
||||
|
||||
func StringToSeriesArray(seriesString string) ([]*protocol.Series, error) {
|
||||
func StringToSeriesArray(seriesString string, args ...interface{}) ([]*protocol.Series, error) {
|
||||
seriesString = fmt.Sprintf(seriesString, args...)
|
||||
series := []*protocol.Series{}
|
||||
err := json.Unmarshal([]byte(seriesString), &series)
|
||||
return series, err
|
||||
|
|
|
@ -137,6 +137,58 @@ func (self *DatastoreSuite) TestDeletingData(c *C) {
|
|||
c.Assert(err, ErrorMatches, ".*Field value doesn't exist.*")
|
||||
}
|
||||
|
||||
func (self *DatastoreSuite) TestCanWriteAndRetrievePointsWithAlias(c *C) {
|
||||
cleanup(nil)
|
||||
db := newDatastore(c)
|
||||
defer cleanup(db)
|
||||
mock := `
|
||||
{
|
||||
"points": [
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int64_value": 3
|
||||
}
|
||||
],
|
||||
"sequence_number": 1
|
||||
},
|
||||
{
|
||||
"values": [
|
||||
{
|
||||
"int64_value": 2
|
||||
}
|
||||
],
|
||||
"sequence_number": 2
|
||||
}
|
||||
],
|
||||
"name": "foo",
|
||||
"fields": ["value"]
|
||||
}`
|
||||
pointTime := time.Now().Unix()
|
||||
series := stringToSeries(mock, pointTime, c)
|
||||
err := db.WriteSeriesData("test", series)
|
||||
c.Assert(err, IsNil)
|
||||
q, errQ := parser.ParseQuery("select * from foo as f1 inner join foo as f2;")
|
||||
c.Assert(errQ, IsNil)
|
||||
resultSeries := map[string][]*protocol.Series{}
|
||||
yield := func(series *protocol.Series) error {
|
||||
resultSeries[*series.Name] = append(resultSeries[*series.Name], series)
|
||||
return nil
|
||||
}
|
||||
user := &MockUser{}
|
||||
err = db.ExecuteQuery(user, "test", q, yield)
|
||||
c.Assert(err, IsNil)
|
||||
// we should get the actual data and the end of series data
|
||||
// indicator , i.e. a series with no points
|
||||
c.Assert(resultSeries, HasLen, 2)
|
||||
c.Assert(resultSeries["f1"], HasLen, 2)
|
||||
c.Assert(resultSeries["f1"][0].Points, HasLen, 2)
|
||||
c.Assert(resultSeries["f1"][1].Points, HasLen, 0)
|
||||
c.Assert(resultSeries["f2"], HasLen, 2)
|
||||
c.Assert(resultSeries["f2"][0].Points, HasLen, 2)
|
||||
c.Assert(resultSeries["f2"][1].Points, HasLen, 0)
|
||||
}
|
||||
|
||||
func (self *DatastoreSuite) TestCanWriteAndRetrievePoints(c *C) {
|
||||
cleanup(nil)
|
||||
db := newDatastore(c)
|
||||
|
|
|
@ -21,7 +21,8 @@ func getExpressionValue(expr *parser.Expression, fields []string, point *protoco
|
|||
return &protocol.FieldValue{Int64Value: &value}, nil
|
||||
case parser.ValueString, parser.ValueRegex:
|
||||
return &protocol.FieldValue{StringValue: &value.Name}, nil
|
||||
case parser.ValueSimpleName:
|
||||
case parser.ValueTableName, parser.ValueSimpleName:
|
||||
|
||||
// TODO: optimize this so we don't have to lookup the column everytime
|
||||
fieldIdx := -1
|
||||
for idx, field := range fields {
|
||||
|
@ -51,6 +52,7 @@ func matchesExpression(expr *parser.BoolExpression, fields []string, point *prot
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
operator := registeredOperators[expr.Operation]
|
||||
return operator(leftValue, rightValue)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ func (self *FilteringSuite) TestEqualityFiltering(c *C) {
|
|||
queryStr := "select * from t where column_one == 100 and column_two != 6;"
|
||||
query, err := parser.ParseQuery(queryStr)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
series, err := common.StringToSeriesArray(`
|
||||
[
|
||||
{
|
||||
|
@ -36,6 +37,53 @@ func (self *FilteringSuite) TestEqualityFiltering(c *C) {
|
|||
c.Assert(*result.Points[0].Values[1].Int64Value, Equals, int64(5))
|
||||
}
|
||||
|
||||
func (self *FilteringSuite) TestFilteringNonExistentColumn(c *C) {
|
||||
queryStr := "select * from t where column_one == 100 and column_two != 6"
|
||||
query, err := parser.ParseQuery(queryStr)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
series, err := common.StringToSeriesArray(`
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
{"values": [{"int64_value": 100}], "timestamp": 1381346631, "sequence_number": 1},
|
||||
{"values": [{"int64_value": 90 }], "timestamp": 1381346632, "sequence_number": 1}
|
||||
],
|
||||
"name": "t",
|
||||
"fields": ["column_one"]
|
||||
}
|
||||
]
|
||||
`)
|
||||
c.Assert(err, IsNil)
|
||||
_, err = Filter(query, series[0])
|
||||
c.Assert(err, NotNil)
|
||||
}
|
||||
|
||||
func (self *FilteringSuite) TestFilteringWithJoin(c *C) {
|
||||
queryStr := "select * from t as bar inner join t as foo where bar.column_one == 100 and foo.column_two != 6;"
|
||||
query, err := parser.ParseQuery(queryStr)
|
||||
c.Assert(err, IsNil)
|
||||
series, err := common.StringToSeriesArray(`
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
{"values": [{"int64_value": 100},{"int64_value": 5 }], "timestamp": 1381346631, "sequence_number": 1},
|
||||
{"values": [{"int64_value": 100},{"int64_value": 6 }], "timestamp": 1381346631, "sequence_number": 1},
|
||||
{"values": [{"int64_value": 90 },{"int64_value": 15}], "timestamp": 1381346632, "sequence_number": 1}
|
||||
],
|
||||
"name": "foo_join_bar",
|
||||
"fields": ["bar.column_one", "foo.column_two"]
|
||||
}
|
||||
]
|
||||
`)
|
||||
c.Assert(err, IsNil)
|
||||
result, err := Filter(query, series[0])
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(result, NotNil)
|
||||
// no filtering should happen for join queries
|
||||
c.Assert(result.Points, HasLen, 1)
|
||||
}
|
||||
|
||||
func (self *FilteringSuite) TestReturnAllColumnsIfAskedForWildcard(c *C) {
|
||||
queryStr := "select * from t where column_one == 100 and column_two != 6;"
|
||||
query, err := parser.ParseQuery(queryStr)
|
||||
|
|
|
@ -459,12 +459,11 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
|
|||
|
||||
// check if we should send the batch along
|
||||
if resultByteCount > MAX_SERIES_SIZE || limit < 1 {
|
||||
lengthBeforeFiltering := len(result.Points)
|
||||
filteredResult, _ := Filter(query, result)
|
||||
limit += lengthBeforeFiltering - len(filteredResult.Points)
|
||||
if err := yield(filteredResult); err != nil {
|
||||
dropped, err := self.sendBatch(query, result, yield)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
limit += dropped
|
||||
resultByteCount = 0
|
||||
result = &protocol.Series{Name: &series, Fields: fieldNames, Points: make([]*protocol.Point, 0)}
|
||||
}
|
||||
|
@ -472,12 +471,40 @@ func (self *LevelDbDatastore) executeQueryForSeries(database, series string, col
|
|||
break
|
||||
}
|
||||
}
|
||||
filteredResult, _ := Filter(query, result)
|
||||
if err := yield(filteredResult); err != nil {
|
||||
if _, err := self.sendBatch(query, result, yield); err != nil {
|
||||
return err
|
||||
}
|
||||
emptyResult := &protocol.Series{Name: &series, Fields: fieldNames, Points: nil}
|
||||
return yield(emptyResult)
|
||||
_, err = self.sendBatch(query, emptyResult, yield)
|
||||
return err
|
||||
}
|
||||
|
||||
// Return the number of dropped ticks from filtering. if the series
|
||||
// had more than one alias, returns the min of all dropped ticks
|
||||
func (self *LevelDbDatastore) sendBatch(query *parser.Query, series *protocol.Series, yield func(series *protocol.Series) error) (int, error) {
|
||||
dropped := int(math.MaxInt32)
|
||||
|
||||
for _, alias := range query.GetTableAliases(*series.Name) {
|
||||
_alias := alias
|
||||
newSeries := &protocol.Series{Name: &_alias, Points: series.Points, Fields: series.Fields}
|
||||
|
||||
lengthBeforeFiltering := len(newSeries.Points)
|
||||
var filteredResult *protocol.Series
|
||||
if query.GetFromClause().Type == parser.FromClauseInnerJoin {
|
||||
filteredResult = newSeries
|
||||
} else {
|
||||
filteredResult, _ = Filter(query, newSeries)
|
||||
}
|
||||
_dropped := lengthBeforeFiltering - len(filteredResult.Points)
|
||||
if _dropped < dropped {
|
||||
dropped = _dropped
|
||||
}
|
||||
if err := yield(filteredResult); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return dropped, nil
|
||||
}
|
||||
|
||||
func (self *LevelDbDatastore) getSeriesForDb(database string, yield func(string) error) error {
|
||||
|
|
|
@ -51,11 +51,11 @@ func (self *QueryEngine) distributeQuery(user common.User, database string, quer
|
|||
// see if this is a merge query
|
||||
fromClause := query.GetFromClause()
|
||||
if fromClause.Type == parser.FromClauseMerge {
|
||||
yield = getMergeYield(fromClause.Names[0].Name, fromClause.Names[1].Name, query.Ascending, yield)
|
||||
yield = getMergeYield(fromClause.Names[0].Name.Name, fromClause.Names[1].Name.Name, query.Ascending, yield)
|
||||
}
|
||||
|
||||
if fromClause.Type == parser.FromClauseInnerJoin {
|
||||
yield = getJoinYield(fromClause.Names[0].Name, fromClause.Names[1].Name, query.Ascending, yield)
|
||||
yield = getJoinYield(query, yield)
|
||||
}
|
||||
|
||||
return self.coordinator.DistributeQuery(user, database, query, yield)
|
||||
|
|
|
@ -1393,6 +1393,54 @@ func (self *EngineSuite) TestQueryWithJoinedTablesDescendingOrder(c *C) {
|
|||
]`)
|
||||
}
|
||||
|
||||
func (self *EngineSuite) TestJoiningWithSelf(c *C) {
|
||||
engine := createEngine(c, `[
|
||||
{
|
||||
"points": [
|
||||
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346706000000, "sequence_number": 1 },
|
||||
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }
|
||||
],
|
||||
"name": "foo",
|
||||
"fields": ["value"]
|
||||
},
|
||||
{
|
||||
"points": [],
|
||||
"name": "foo",
|
||||
"fields": ["value"]
|
||||
},
|
||||
{
|
||||
"points": [
|
||||
{ "values": [{ "int64_value": 3 }], "timestamp": 1381346706000000, "sequence_number": 1 },
|
||||
{ "values": [{ "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }
|
||||
],
|
||||
"name": "bar",
|
||||
"fields": ["value"]
|
||||
},
|
||||
{
|
||||
"points": [],
|
||||
"name": "bar",
|
||||
"fields": ["value"]
|
||||
}
|
||||
]`)
|
||||
|
||||
runQuery(engine, "select * from t as foo inner join t as bar", c, `[
|
||||
{
|
||||
"points": [
|
||||
{ "values": [{ "int64_value": 3 }, { "int64_value": 3 }], "timestamp": 1381346706000000, "sequence_number": 1 }
|
||||
],
|
||||
"name": "foo_join_bar",
|
||||
"fields": ["foo.value", "bar.value"]
|
||||
},
|
||||
{
|
||||
"points": [
|
||||
{ "values": [{ "int64_value": 1 }, { "int64_value": 1 }], "timestamp": 1381346701000000, "sequence_number": 1 }
|
||||
],
|
||||
"name": "foo_join_bar",
|
||||
"fields": ["foo.value", "bar.value"]
|
||||
}
|
||||
]`)
|
||||
}
|
||||
|
||||
func (self *EngineSuite) TestQueryWithMergedTablesWithPointsAppend(c *C) {
|
||||
engine := createEngine(c, `[
|
||||
{
|
||||
|
|
|
@ -1,18 +1,22 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"datastore"
|
||||
"parser"
|
||||
"protocol"
|
||||
)
|
||||
|
||||
func getJoinYield(table1, table2 string, ascending bool, yield func(*protocol.Series) error) func(*protocol.Series) error {
|
||||
func getJoinYield(query *parser.Query, yield func(*protocol.Series) error) func(*protocol.Series) error {
|
||||
var lastPoint1 *protocol.Point
|
||||
var lastFields1 []string
|
||||
var lastPoint2 *protocol.Point
|
||||
var lastFields2 []string
|
||||
|
||||
table1 := query.GetFromClause().Names[0].GetAlias()
|
||||
table2 := query.GetFromClause().Names[1].GetAlias()
|
||||
name := table1 + "_join_" + table2
|
||||
|
||||
return mergeYield(table1, table2, false, ascending, func(s *protocol.Series) error {
|
||||
return mergeYield(table1, table2, false, query.Ascending, func(s *protocol.Series) error {
|
||||
if *s.Name == table1 {
|
||||
lastPoint1 = s.Points[len(s.Points)-1]
|
||||
if lastFields1 == nil {
|
||||
|
@ -50,7 +54,11 @@ func getJoinYield(table1, table2 string, ascending bool, yield func(*protocol.Se
|
|||
lastPoint1 = nil
|
||||
lastPoint2 = nil
|
||||
|
||||
return yield(newSeries)
|
||||
filteredSeries, _ := datastore.Filter(query, newSeries)
|
||||
if len(filteredSeries.Points) > 0 {
|
||||
return yield(newSeries)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -305,6 +305,36 @@ func (self *IntegrationSuite) TestFilterWithLimit(c *C) {
|
|||
c.Assert(data[0].Points, HasLen, 1)
|
||||
}
|
||||
|
||||
// issue #36
|
||||
func (self *IntegrationSuite) TestInnerJoin(c *C) {
|
||||
for i := 0; i < 3; i++ {
|
||||
host := "hosta"
|
||||
if i%2 == 0 {
|
||||
host = "hostb"
|
||||
}
|
||||
|
||||
err := self.server.WriteData(fmt.Sprintf(`
|
||||
[
|
||||
{
|
||||
"name": "test_join",
|
||||
"columns": ["cpu", "host"],
|
||||
"points": [[%d, "%s"]]
|
||||
}
|
||||
]
|
||||
`, 60+i*10, host))
|
||||
c.Assert(err, IsNil)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
bs, err := self.server.RunQuery("select * from test_join as f1 inner join test_join as f2 where f1.host == 'hostb'")
|
||||
c.Assert(err, IsNil)
|
||||
data := []*h.SerializedSeries{}
|
||||
err = json.Unmarshal(bs, &data)
|
||||
c.Assert(data, HasLen, 1)
|
||||
c.Assert(data[0].Name, Equals, "f1_join_f2")
|
||||
c.Assert(data[0].Columns, HasLen, 6)
|
||||
c.Assert(data[0].Points, HasLen, 2)
|
||||
}
|
||||
|
||||
func (self *IntegrationSuite) TestCountWithGroupBy(c *C) {
|
||||
for i := 0; i < 20; i++ {
|
||||
err := self.server.WriteData(fmt.Sprintf(`
|
||||
|
|
|
@ -10,10 +10,26 @@ free_array(array *array)
|
|||
free(array->elems);
|
||||
free(array);
|
||||
}
|
||||
void free_table_name(table_name *name)
|
||||
{
|
||||
free_value(name->name);
|
||||
free(name->alias);
|
||||
free(name);
|
||||
}
|
||||
void
|
||||
free_table_name_array(table_name_array *array)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < array->size; i++)
|
||||
free_table_name(array->elems[i]);
|
||||
free(array->elems);
|
||||
free(array);
|
||||
}
|
||||
|
||||
void
|
||||
free_from_clause(from_clause *f)
|
||||
{
|
||||
free_value_array(f->names);
|
||||
free_table_name_array(f->names);
|
||||
free(f);
|
||||
}
|
||||
|
||||
|
|
|
@ -57,9 +57,21 @@ const (
|
|||
FromClauseInnerJoin FromClauseType = C.FROM_INNER_JOIN
|
||||
)
|
||||
|
||||
type TableName struct {
|
||||
Name *Value
|
||||
Alias string
|
||||
}
|
||||
|
||||
func (self *TableName) GetAlias() string {
|
||||
if self.Alias != "" {
|
||||
return self.Alias
|
||||
}
|
||||
return self.Name.Name
|
||||
}
|
||||
|
||||
type FromClause struct {
|
||||
Type FromClauseType
|
||||
Names []*Value
|
||||
Names []*TableName
|
||||
}
|
||||
|
||||
type Expression struct {
|
||||
|
@ -213,8 +225,37 @@ func GetValue(value *C.value) (*Value, error) {
|
|||
return v, err
|
||||
}
|
||||
|
||||
func GetTableName(name *C.table_name) (*TableName, error) {
|
||||
value, err := GetValue(name.name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
table := &TableName{Name: value}
|
||||
if name.alias != nil {
|
||||
table.Alias = C.GoString(name.alias)
|
||||
}
|
||||
|
||||
return table, nil
|
||||
}
|
||||
|
||||
func GetTableNameArray(array *C.table_name_array) ([]*TableName, error) {
|
||||
var names []*C.table_name
|
||||
setupSlice((*reflect.SliceHeader)((unsafe.Pointer(&names))), unsafe.Pointer(array.elems), array.size)
|
||||
|
||||
tableNamesSlice := make([]*TableName, 0, array.size)
|
||||
for _, name := range names {
|
||||
tableName, err := GetTableName(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tableNamesSlice = append(tableNamesSlice, tableName)
|
||||
}
|
||||
return tableNamesSlice, nil
|
||||
}
|
||||
|
||||
func GetFromClause(fromClause *C.from_clause) (*FromClause, error) {
|
||||
arr, err := GetValueArray(fromClause.names)
|
||||
arr, err := GetTableNameArray(fromClause.names)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ func (self *QueryParserSuite) TestSimpleFromClause(c *C) {
|
|||
fromClause := q.GetFromClause()
|
||||
c.Assert(fromClause.Type, Equals, FromClauseArray)
|
||||
c.Assert(fromClause.Names, HasLen, 1)
|
||||
c.Assert(fromClause.Names[0].Name, Equals, "t")
|
||||
c.Assert(fromClause.Names[0].Name.Name, Equals, "t")
|
||||
}
|
||||
|
||||
func (self *QueryParserSuite) TestParseFromWithMergedTable(c *C) {
|
||||
|
@ -69,8 +69,8 @@ func (self *QueryParserSuite) TestParseFromWithMergedTable(c *C) {
|
|||
fromClause := q.GetFromClause()
|
||||
c.Assert(fromClause.Type, Equals, FromClauseMerge)
|
||||
c.Assert(fromClause.Names, HasLen, 2)
|
||||
c.Assert(fromClause.Names[0].Name, Equals, "newsletter.signups")
|
||||
c.Assert(fromClause.Names[1].Name, Equals, "user.signups")
|
||||
c.Assert(fromClause.Names[0].Name.Name, Equals, "newsletter.signups")
|
||||
c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups")
|
||||
}
|
||||
|
||||
func (self *QueryParserSuite) TestMultipleAggregateFunctions(c *C) {
|
||||
|
@ -88,8 +88,8 @@ func (self *QueryParserSuite) TestParseFromWithJoinedTable(c *C) {
|
|||
fromClause := q.GetFromClause()
|
||||
c.Assert(fromClause.Type, Equals, FromClauseInnerJoin)
|
||||
c.Assert(fromClause.Names, HasLen, 2)
|
||||
c.Assert(fromClause.Names[0].Name, Equals, "newsletter.signups")
|
||||
c.Assert(fromClause.Names[1].Name, Equals, "user.signups")
|
||||
c.Assert(fromClause.Names[0].Name.Name, Equals, "newsletter.signups")
|
||||
c.Assert(fromClause.Names[1].Name.Name, Equals, "user.signups")
|
||||
}
|
||||
|
||||
func (self *QueryParserSuite) TestParseSelectWithInsensitiveRegexTables(c *C) {
|
||||
|
@ -99,9 +99,9 @@ func (self *QueryParserSuite) TestParseSelectWithInsensitiveRegexTables(c *C) {
|
|||
fromClause := q.GetFromClause()
|
||||
c.Assert(fromClause.Type, Equals, FromClauseArray)
|
||||
c.Assert(fromClause.Names, HasLen, 1)
|
||||
c.Assert(fromClause.Names[0].Name, Equals, "users.*")
|
||||
c.Assert(fromClause.Names[0].Type, Equals, ValueRegex)
|
||||
c.Assert(fromClause.Names[0].IsCaseInsensitive, Equals, true)
|
||||
c.Assert(fromClause.Names[0].Name.Name, Equals, "users.*")
|
||||
c.Assert(fromClause.Names[0].Name.Type, Equals, ValueRegex)
|
||||
c.Assert(fromClause.Names[0].Name.IsCaseInsensitive, Equals, true)
|
||||
}
|
||||
|
||||
func (self *QueryParserSuite) TestParseSelectWithRegexTables(c *C) {
|
||||
|
@ -111,9 +111,9 @@ func (self *QueryParserSuite) TestParseSelectWithRegexTables(c *C) {
|
|||
fromClause := q.GetFromClause()
|
||||
c.Assert(fromClause.Type, Equals, FromClauseArray)
|
||||
c.Assert(fromClause.Names, HasLen, 1)
|
||||
c.Assert(fromClause.Names[0].Name, Equals, "users.*")
|
||||
c.Assert(fromClause.Names[0].Type, Equals, ValueRegex)
|
||||
c.Assert(fromClause.Names[0].IsCaseInsensitive, Equals, false)
|
||||
c.Assert(fromClause.Names[0].Name.Name, Equals, "users.*")
|
||||
c.Assert(fromClause.Names[0].Name.Type, Equals, ValueRegex)
|
||||
c.Assert(fromClause.Names[0].Name.IsCaseInsensitive, Equals, false)
|
||||
}
|
||||
|
||||
func (self *QueryParserSuite) TestMergeFromClause(c *C) {
|
||||
|
|
|
@ -27,6 +27,7 @@ static int yycolumn = 1;
|
|||
"join" { return JOIN; }
|
||||
"from" { return FROM; }
|
||||
"where" { return WHERE; }
|
||||
"as" { return AS; }
|
||||
"select" { return SELECT; }
|
||||
"limit" { return LIMIT; }
|
||||
"order" { return ORDER; }
|
||||
|
|
|
@ -60,8 +60,9 @@ value *create_value(char *name, int type, char is_case_insensitive, value_array
|
|||
%lex-param {void *scanner}
|
||||
|
||||
// define types of tokens (terminals)
|
||||
%token SELECT FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN
|
||||
%token <string> STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION
|
||||
%token SELECT FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS
|
||||
%token <string> STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP
|
||||
%token <string> NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION
|
||||
|
||||
// define the precedence of these operators
|
||||
%left OR
|
||||
|
@ -74,7 +75,7 @@ value *create_value(char *name, int type, char is_case_insensitive, value_array
|
|||
%type <from_clause> FROM_CLAUSE
|
||||
%type <condition> WHERE_CLAUSE
|
||||
%type <value_array> COLUMN_NAMES
|
||||
%type <string> BOOL_OPERATION
|
||||
%type <string> BOOL_OPERATION ALIAS_CLAUSE
|
||||
%type <condition> CONDITION
|
||||
%type <bool_expression> BOOL_EXPRESSION
|
||||
%type <value_array> VALUES
|
||||
|
@ -213,36 +214,69 @@ GROUP_BY_CLAUSE:
|
|||
COLUMN_NAMES:
|
||||
VALUES
|
||||
|
||||
ALIAS_CLAUSE:
|
||||
AS SIMPLE_TABLE_VALUE
|
||||
{
|
||||
$$ = $2->name;
|
||||
free($2);
|
||||
}
|
||||
|
|
||||
{
|
||||
$$ = NULL;
|
||||
}
|
||||
|
||||
FROM_CLAUSE:
|
||||
FROM TABLE_VALUE
|
||||
{
|
||||
$$ = malloc(sizeof(from_clause));
|
||||
$$->names = malloc(sizeof(value_array));
|
||||
$$->names->elems = malloc(sizeof(value*));
|
||||
$$->names = malloc(sizeof(table_name_array));
|
||||
$$->names->elems = malloc(sizeof(table_name*));
|
||||
$$->names->size = 1;
|
||||
$$->names->elems[0] = $2;
|
||||
$$->names->elems[0] = malloc(sizeof(table_name));
|
||||
$$->names->elems[0]->name = $2;
|
||||
$$->names->elems[0]->alias = NULL;
|
||||
$$->from_clause_type = FROM_ARRAY;
|
||||
}
|
||||
|
|
||||
FROM SIMPLE_TABLE_VALUE
|
||||
{
|
||||
$$ = malloc(sizeof(from_clause));
|
||||
$$->names = malloc(sizeof(table_name_array));
|
||||
$$->names->elems = malloc(sizeof(table_name*));
|
||||
$$->names->size = 1;
|
||||
$$->names->elems[0] = malloc(sizeof(table_name));
|
||||
$$->names->elems[0]->name = $2;
|
||||
$$->names->elems[0]->alias = NULL;
|
||||
$$->from_clause_type = FROM_ARRAY;
|
||||
}
|
||||
|
|
||||
FROM SIMPLE_TABLE_VALUE MERGE SIMPLE_TABLE_VALUE
|
||||
{
|
||||
$$ = malloc(sizeof(from_clause));
|
||||
$$->names = malloc(sizeof(value_array));
|
||||
$$->names->elems = malloc(2 * sizeof(value*));
|
||||
$$->names = malloc(sizeof(table_name_array));
|
||||
$$->names->elems = malloc(2 * sizeof(table_name*));
|
||||
$$->names->size = 2;
|
||||
$$->names->elems[0] = $2;
|
||||
$$->names->elems[1] = $4;
|
||||
$$->names->elems[0] = malloc(sizeof(table_name));
|
||||
$$->names->elems[0]->name = $2;
|
||||
$$->names->elems[0]->alias = NULL;
|
||||
$$->names->elems[1] = malloc(sizeof(table_name));
|
||||
$$->names->elems[1]->name = $4;
|
||||
$$->names->elems[1]->alias = NULL;
|
||||
$$->from_clause_type = FROM_MERGE;
|
||||
}
|
||||
|
|
||||
FROM SIMPLE_TABLE_VALUE INNER JOIN SIMPLE_TABLE_VALUE
|
||||
FROM SIMPLE_TABLE_VALUE ALIAS_CLAUSE INNER JOIN SIMPLE_TABLE_VALUE ALIAS_CLAUSE
|
||||
{
|
||||
$$ = malloc(sizeof(from_clause));
|
||||
$$->names = malloc(sizeof(value_array));
|
||||
$$->names = malloc(sizeof(table_name_array));
|
||||
$$->names->elems = malloc(2 * sizeof(value*));
|
||||
$$->names->size = 2;
|
||||
$$->names->elems[0] = $2;
|
||||
$$->names->elems[1] = $5;
|
||||
$$->names->elems[0] = malloc(sizeof(table_name));
|
||||
$$->names->elems[0]->name = $2;
|
||||
$$->names->elems[0]->alias = $3;
|
||||
$$->names->elems[1] = malloc(sizeof(table_name));
|
||||
$$->names->elems[1]->name = $6;
|
||||
$$->names->elems[1]->alias = $7;
|
||||
$$->from_clause_type = FROM_INNER_JOIN;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,64 @@ func uniq(slice []string) []string {
|
|||
return slice
|
||||
}
|
||||
|
||||
func (self *Query) GetTableAliases(name string) []string {
|
||||
names := self.GetFromClause().Names
|
||||
if len(names) == 1 && names[0].Name.Type == ValueRegex {
|
||||
return []string{name}
|
||||
}
|
||||
|
||||
aliases := []string{}
|
||||
|
||||
for _, fromName := range names {
|
||||
if fromName.Name.Name != name {
|
||||
continue
|
||||
}
|
||||
|
||||
if fromName.Alias == "" {
|
||||
aliases = append(aliases, name)
|
||||
continue
|
||||
}
|
||||
|
||||
aliases = append(aliases, fromName.Alias)
|
||||
}
|
||||
return aliases
|
||||
}
|
||||
|
||||
func (self *Query) revertAlias(mapping map[string][]string) {
|
||||
fromClause := self.GetFromClause()
|
||||
if fromClause.Type != FromClauseInnerJoin {
|
||||
return
|
||||
}
|
||||
|
||||
columns := make(map[string]map[string]bool)
|
||||
|
||||
for _, table := range fromClause.Names {
|
||||
name := table.Name.Name
|
||||
alias := name
|
||||
if table.Alias != "" {
|
||||
alias = table.Alias
|
||||
}
|
||||
|
||||
for _, column := range mapping[alias] {
|
||||
tableColumns := columns[name]
|
||||
if tableColumns == nil {
|
||||
tableColumns = make(map[string]bool)
|
||||
columns[name] = tableColumns
|
||||
}
|
||||
tableColumns[column] = true
|
||||
}
|
||||
|
||||
delete(mapping, alias)
|
||||
}
|
||||
|
||||
for table, tableColumns := range columns {
|
||||
mapping[table] = []string{}
|
||||
for column, _ := range tableColumns {
|
||||
mapping[table] = append(mapping[table], column)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a mapping from the time series names (or regex) to the
|
||||
// column names that are references
|
||||
func (self *Query) GetReferencedColumns() map[*Value][]string {
|
||||
|
@ -49,8 +107,13 @@ func (self *Query) GetReferencedColumns() map[*Value][]string {
|
|||
|
||||
notPrefixedColumns = uniq(notPrefixedColumns)
|
||||
|
||||
self.revertAlias(mapping)
|
||||
|
||||
addedTables := make(map[string]bool)
|
||||
|
||||
returnedMapping := make(map[*Value][]string)
|
||||
for _, value := range self.GetFromClause().Names {
|
||||
for _, tableName := range self.GetFromClause().Names {
|
||||
value := tableName.Name
|
||||
if _, ok := value.GetCompiledRegex(); ok {
|
||||
// this is a regex table, cannot be referenced, only unreferenced
|
||||
// columns will be attached to regex table names
|
||||
|
@ -59,6 +122,10 @@ func (self *Query) GetReferencedColumns() map[*Value][]string {
|
|||
}
|
||||
|
||||
name := value.Name
|
||||
if addedTables[name] {
|
||||
continue
|
||||
}
|
||||
addedTables[name] = true
|
||||
returnedMapping[value] = uniq(append(mapping[name], notPrefixedColumns...))
|
||||
if len(returnedMapping[value]) > 1 && returnedMapping[value][0] == "*" {
|
||||
returnedMapping[value] = returnedMapping[value][:1]
|
||||
|
@ -146,7 +213,7 @@ func parseTime(expr *Expression) (int64, error) {
|
|||
|
||||
func getReferencedColumnsFromValue(v *Value, mapping map[string][]string) (notAssigned []string) {
|
||||
switch v.Type {
|
||||
case ValueSimpleName:
|
||||
case ValueSimpleName, ValueTableName:
|
||||
if idx := strings.LastIndex(v.Name, "."); idx != -1 {
|
||||
tableName := v.Name[:idx]
|
||||
columnName := v.Name[idx+1:]
|
||||
|
|
|
@ -115,6 +115,38 @@ func (self *QueryApiSuite) TestGetReferencedColumnsWithWhereClause(c *C) {
|
|||
}
|
||||
}
|
||||
|
||||
func (self *QueryApiSuite) TestGetReferencedColumnsWithInnerJoin(c *C) {
|
||||
queryStr := "select f2.b from foo as f1 inner join foo as f2 where f1.a == 5 and f2.a == 6;"
|
||||
query, err := ParseQuery(queryStr)
|
||||
c.Assert(err, IsNil)
|
||||
columns := query.GetReferencedColumns()
|
||||
c.Assert(columns, HasLen, 1)
|
||||
aliases := make(map[string][]string)
|
||||
for v, columns := range columns {
|
||||
if _, ok := aliases[v.Name]; ok {
|
||||
c.Fail()
|
||||
}
|
||||
aliases[v.Name] = columns
|
||||
}
|
||||
c.Assert(aliases["foo"], DeepEquals, []string{"a", "b"})
|
||||
}
|
||||
|
||||
func (self *QueryApiSuite) TestGetReferencedColumnsWithInnerJoinAndWildcard(c *C) {
|
||||
queryStr := "select * from foo as f1 inner join foo as f2"
|
||||
query, err := ParseQuery(queryStr)
|
||||
c.Assert(err, IsNil)
|
||||
columns := query.GetReferencedColumns()
|
||||
c.Assert(columns, HasLen, 1)
|
||||
aliases := make(map[string][]string)
|
||||
for v, columns := range columns {
|
||||
if _, ok := aliases[v.Name]; ok {
|
||||
c.Fail()
|
||||
}
|
||||
aliases[v.Name] = columns
|
||||
}
|
||||
c.Assert(aliases["foo"], DeepEquals, []string{"*"})
|
||||
}
|
||||
|
||||
func (self *QueryApiSuite) TestDefaultStartTime(c *C) {
|
||||
for queryStr, t := range map[string]time.Time{
|
||||
"select * from t where time < now() - 1d;": time.Now().Add(-24 * time.Hour).Add(-1 * time.Hour).Round(time.Minute),
|
||||
|
@ -168,3 +200,18 @@ func (self *QueryApiSuite) TestErrorInStartTime(c *C) {
|
|||
c.Assert(err, ErrorMatches, error)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *QueryApiSuite) TestAliasing(c *C) {
|
||||
query, err := ParseQuery("select * from user.events")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(query.GetTableAliases("user.events"), DeepEquals, []string{"user.events"})
|
||||
|
||||
query, err = ParseQuery("select * from user.events as events inner join user.events as clicks")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(query.GetTableAliases("user.events"), DeepEquals, []string{"events", "clicks"})
|
||||
|
||||
// aliasing is ignored in case of a regex
|
||||
query, err = ParseQuery("select * from /.*events.*/i")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(query.GetTableAliases("user.events"), DeepEquals, []string{"user.events"})
|
||||
}
|
||||
|
|
|
@ -57,16 +57,25 @@ typedef struct {
|
|||
char *err;
|
||||
} error;
|
||||
|
||||
typedef struct {
|
||||
value *name;
|
||||
char *alias;
|
||||
} table_name;
|
||||
|
||||
typedef struct {
|
||||
size_t size;
|
||||
table_name **elems;
|
||||
} table_name_array;
|
||||
|
||||
typedef struct {
|
||||
enum {
|
||||
FROM_ARRAY,
|
||||
FROM_MERGE,
|
||||
FROM_INNER_JOIN
|
||||
} from_clause_type;
|
||||
// in case of merge or join, it's guaranteed
|
||||
// that the names array will have two table
|
||||
// names only and they aren't regex.
|
||||
value_array *names;
|
||||
// in case of merge or join, it's guaranteed that the names array
|
||||
// will have two table names only and they aren't regex.
|
||||
table_name_array *names;
|
||||
} from_clause;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -33,6 +33,9 @@ int main(int argc, char **argv) {
|
|||
q = parse_query("select email from users.events where email =~ /gmail\\\\.com/i and time>now()-2d;");
|
||||
close_query(&q);
|
||||
|
||||
q = parse_query("select email from users.events as events where email === /gmail\\\\.com/i and time>now()-2d;");
|
||||
close_query(&q);
|
||||
|
||||
return 0;
|
||||
}
|
||||
EOF
|
||||
|
|
Loading…
Reference in New Issue