Merge pull request #3815 from benbjohnson/query-shards

SHOW SHARDS
pull/3981/head
Ben Johnson 2015-09-03 15:53:18 -06:00
commit 5f314d80e7
7 changed files with 136 additions and 0 deletions

View File

@ -105,6 +105,7 @@ func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {} func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {} func (*ShowMeasurementsStatement) node() {}
func (*ShowSeriesStatement) node() {} func (*ShowSeriesStatement) node() {}
func (*ShowShardsStatement) node() {}
func (*ShowStatsStatement) node() {} func (*ShowStatsStatement) node() {}
func (*ShowDiagnosticsStatement) node() {} func (*ShowDiagnosticsStatement) node() {}
func (*ShowTagKeysStatement) node() {} func (*ShowTagKeysStatement) node() {}
@ -206,6 +207,7 @@ func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {} func (*ShowMeasurementsStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {} func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {} func (*ShowSeriesStatement) stmt() {}
func (*ShowShardsStatement) stmt() {}
func (*ShowStatsStatement) stmt() {} func (*ShowStatsStatement) stmt() {}
func (*ShowDiagnosticsStatement) stmt() {} func (*ShowDiagnosticsStatement) stmt() {}
func (*ShowTagKeysStatement) stmt() {} func (*ShowTagKeysStatement) stmt() {}
@ -1841,6 +1843,17 @@ func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}} return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
} }
// ShowShardsStatement represents a command for displaying shards in the cluster.
type ShowShardsStatement struct{}
// String returns a string representation.
func (s *ShowShardsStatement) String() string { return "SHOW SHARDS" }
// RequiredPrivileges returns the privileges required to execute the statement.
func (s *ShowShardsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}
// ShowDiagnosticsStatement represents a command for show node diagnostics. // ShowDiagnosticsStatement represents a command for show node diagnostics.
type ShowDiagnosticsStatement struct{} type ShowDiagnosticsStatement struct{}

View File

@ -129,6 +129,8 @@ func (p *Parser) parseShowStatement() (Statement, error) {
return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos) return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos)
case SERIES: case SERIES:
return p.parseShowSeriesStatement() return p.parseShowSeriesStatement()
case SHARDS:
return p.parseShowShardsStatement()
case STATS: case STATS:
return p.parseShowStatsStatement() return p.parseShowStatsStatement()
case DIAGNOSTICS: case DIAGNOSTICS:
@ -1409,6 +1411,12 @@ func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) {
return return
} }
// parseShowShardsStatement parses a string for "SHOW SHARDS" statement.
// This function assumes the "SHOW SHARDS" tokens have already been consumed.
func (p *Parser) parseShowShardsStatement() (*ShowShardsStatement, error) {
return &ShowShardsStatement{}, nil
}
// parseShowStatsStatement parses a string and returns a ShowStatsStatement. // parseShowStatsStatement parses a string and returns a ShowStatsStatement.
// This function assumes the "SHOW STATS" tokens have already been consumed. // This function assumes the "SHOW STATS" tokens have already been consumed.
func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) { func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) {

View File

@ -1231,6 +1231,12 @@ func TestParser_ParseStatement(t *testing.T) {
}, },
}, },
// SHOW SHARDS
{
s: `SHOW SHARDS`,
stmt: &influxql.ShowShardsStatement{},
},
// SHOW DIAGNOSTICS // SHOW DIAGNOSTICS
{ {
s: `SHOW DIAGNOSTICS`, s: `SHOW DIAGNOSTICS`,

View File

@ -136,6 +136,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `KEYS`, tok: influxql.KEYS}, {s: `KEYS`, tok: influxql.KEYS},
{s: `LIMIT`, tok: influxql.LIMIT}, {s: `LIMIT`, tok: influxql.LIMIT},
{s: `SHOW`, tok: influxql.SHOW}, {s: `SHOW`, tok: influxql.SHOW},
{s: `SHARDS`, tok: influxql.SHARDS},
{s: `MEASUREMENT`, tok: influxql.MEASUREMENT}, {s: `MEASUREMENT`, tok: influxql.MEASUREMENT},
{s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS}, {s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS},
{s: `NOT`, tok: influxql.NOT}, {s: `NOT`, tok: influxql.NOT},

View File

@ -111,6 +111,7 @@ const (
SERVERS SERVERS
SET SET
SHOW SHOW
SHARDS
SLIMIT SLIMIT
STATS STATS
DIAGNOSTICS DIAGNOSTICS
@ -220,6 +221,7 @@ var tokens = [...]string{
SERVERS: "SERVERS", SERVERS: "SERVERS",
SET: "SET", SET: "SET",
SHOW: "SHOW", SHOW: "SHOW",
SHARDS: "SHARDS",
SLIMIT: "SLIMIT", SLIMIT: "SLIMIT",
SOFFSET: "SOFFSET", SOFFSET: "SOFFSET",
STATS: "STATS", STATS: "STATS",

View File

@ -1,7 +1,10 @@
package meta package meta
import ( import (
"bytes"
"fmt" "fmt"
"strconv"
"time"
"github.com/influxdb/influxdb/influxql" "github.com/influxdb/influxdb/influxql"
) )
@ -80,6 +83,10 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql.
return e.executeDropContinuousQueryStatement(stmt) return e.executeDropContinuousQueryStatement(stmt)
case *influxql.ShowContinuousQueriesStatement: case *influxql.ShowContinuousQueriesStatement:
return e.executeShowContinuousQueriesStatement(stmt) return e.executeShowContinuousQueriesStatement(stmt)
case *influxql.ShowShardsStatement:
return e.executeShowShardsStatement(stmt)
case *influxql.ShowStatsStatement:
return e.executeShowStatsStatement(stmt)
default: default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt)) panic(fmt.Sprintf("unsupported statement type: %T", stmt))
} }
@ -281,3 +288,51 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql
} }
return &influxql.Result{Series: rows} return &influxql.Result{Series: rows}
} }
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) *influxql.Result {
dis, err := e.Store.Databases()
if err != nil {
return &influxql.Result{Err: err}
}
rows := []*influxql.Row{}
for _, di := range dis {
row := &influxql.Row{Columns: []string{"id", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
for _, rpi := range di.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
for _, si := range sgi.Shards {
ownerIDs := make([]uint64, len(si.Owners))
for i, owner := range si.Owners {
ownerIDs[i] = owner.NodeID
}
row.Values = append(row.Values, []interface{}{
si.ID,
sgi.StartTime.UTC().Format(time.RFC3339),
sgi.EndTime.UTC().Format(time.RFC3339),
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
joinUint64(ownerIDs),
})
}
}
}
rows = append(rows, row)
}
return &influxql.Result{Series: rows}
}
func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result {
return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")}
}
// joinUint64 returns a comma-delimited string of uint64 numbers.
func joinUint64(a []uint64) string {
var buf bytes.Buffer
for i, x := range a {
buf.WriteString(strconv.FormatUint(x, 10))
if i < len(a)-1 {
buf.WriteRune(',')
}
}
return buf.String()
}

View File

@ -765,6 +765,57 @@ func TestStatementExecutor_ExecuteStatement_Unsupported(t *testing.T) {
} }
} }
// Ensure a SHOW SHARDS statement can be executed.
func TestStatementExecutor_ExecuteStatement_ShowShards(t *testing.T) {
e := NewStatementExecutor()
e.Store.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
return []meta.DatabaseInfo{
{
Name: "foo",
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Duration: time.Second,
ShardGroups: []meta.ShardGroupInfo{
{
StartTime: time.Unix(0, 0),
EndTime: time.Unix(1, 0),
Shards: []meta.ShardInfo{
{
ID: 1,
Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
},
},
{
ID: 2,
},
},
},
},
},
},
},
}, nil
}
if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SHARDS`)); res.Err != nil {
t.Fatal(res.Err)
} else if !reflect.DeepEqual(res.Series, influxql.Rows{
{
Name: "foo",
Columns: []string{"id", "start_time", "end_time", "expiry_time", "owners"},
Values: [][]interface{}{
{uint64(1), "1970-01-01T00:00:00Z", "1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z", "1,2,3"},
{uint64(2), "1970-01-01T00:00:00Z", "1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z", ""},
},
},
}) {
t.Fatalf("unexpected rows: %s", spew.Sdump(res.Series))
}
}
// StatementExecutor represents a test wrapper for meta.StatementExecutor. // StatementExecutor represents a test wrapper for meta.StatementExecutor.
type StatementExecutor struct { type StatementExecutor struct {
*meta.StatementExecutor *meta.StatementExecutor