diff --git a/influxql/ast.go b/influxql/ast.go index f2803c49fa..696fdfc384 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -105,6 +105,7 @@ func (*ShowFieldKeysStatement) node() {} func (*ShowRetentionPoliciesStatement) node() {} func (*ShowMeasurementsStatement) node() {} func (*ShowSeriesStatement) node() {} +func (*ShowShardsStatement) node() {} func (*ShowStatsStatement) node() {} func (*ShowDiagnosticsStatement) node() {} func (*ShowTagKeysStatement) node() {} @@ -206,6 +207,7 @@ func (*ShowFieldKeysStatement) stmt() {} func (*ShowMeasurementsStatement) stmt() {} func (*ShowRetentionPoliciesStatement) stmt() {} func (*ShowSeriesStatement) stmt() {} +func (*ShowShardsStatement) stmt() {} func (*ShowStatsStatement) stmt() {} func (*ShowDiagnosticsStatement) stmt() {} func (*ShowTagKeysStatement) stmt() {} @@ -1841,6 +1843,17 @@ func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges { return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}} } +// ShowShardsStatement represents a command for displaying shards in the cluster. +type ShowShardsStatement struct{} + +// String returns a string representation. +func (s *ShowShardsStatement) String() string { return "SHOW SHARDS" } + +// RequiredPrivileges returns the privileges required to execute the statement. +func (s *ShowShardsStatement) RequiredPrivileges() ExecutionPrivileges { + return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}} +} + // ShowDiagnosticsStatement represents a command for show node diagnostics. type ShowDiagnosticsStatement struct{} diff --git a/influxql/parser.go b/influxql/parser.go index e0a76a50b7..0f356d044b 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -129,6 +129,8 @@ func (p *Parser) parseShowStatement() (Statement, error) { return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos) case SERIES: return p.parseShowSeriesStatement() + case SHARDS: + return p.parseShowShardsStatement() case STATS: return p.parseShowStatsStatement() case DIAGNOSTICS: @@ -1409,6 +1411,12 @@ func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) { return } +// parseShowShardsStatement parses a string for "SHOW SHARDS" statement. +// This function assumes the "SHOW SHARDS" tokens have already been consumed. +func (p *Parser) parseShowShardsStatement() (*ShowShardsStatement, error) { + return &ShowShardsStatement{}, nil +} + // parseShowStatsStatement parses a string and returns a ShowStatsStatement. // This function assumes the "SHOW STATS" tokens have already been consumed. func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index fee4f791b0..97382e3660 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1231,6 +1231,12 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SHOW SHARDS + { + s: `SHOW SHARDS`, + stmt: &influxql.ShowShardsStatement{}, + }, + // SHOW DIAGNOSTICS { s: `SHOW DIAGNOSTICS`, diff --git a/influxql/scanner_test.go b/influxql/scanner_test.go index eb50fd09c2..b365a1b559 100644 --- a/influxql/scanner_test.go +++ b/influxql/scanner_test.go @@ -136,6 +136,7 @@ func TestScanner_Scan(t *testing.T) { {s: `KEYS`, tok: influxql.KEYS}, {s: `LIMIT`, tok: influxql.LIMIT}, {s: `SHOW`, tok: influxql.SHOW}, + {s: `SHARDS`, tok: influxql.SHARDS}, {s: `MEASUREMENT`, tok: influxql.MEASUREMENT}, {s: `MEASUREMENTS`, tok: influxql.MEASUREMENTS}, {s: `NOT`, tok: influxql.NOT}, diff --git a/influxql/token.go b/influxql/token.go index ae8e7b3e46..795c7b169e 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -111,6 +111,7 @@ const ( SERVERS SET SHOW + SHARDS SLIMIT STATS DIAGNOSTICS @@ -220,6 +221,7 @@ var tokens = [...]string{ SERVERS: "SERVERS", SET: "SET", SHOW: "SHOW", + SHARDS: "SHARDS", SLIMIT: "SLIMIT", SOFFSET: "SOFFSET", STATS: "STATS", diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 291010ab31..a5e5655c70 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -1,7 +1,10 @@ package meta import ( + "bytes" "fmt" + "strconv" + "time" "github.com/influxdb/influxdb/influxql" ) @@ -80,6 +83,10 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql. return e.executeDropContinuousQueryStatement(stmt) case *influxql.ShowContinuousQueriesStatement: return e.executeShowContinuousQueriesStatement(stmt) + case *influxql.ShowShardsStatement: + return e.executeShowShardsStatement(stmt) + case *influxql.ShowStatsStatement: + return e.executeShowStatsStatement(stmt) default: panic(fmt.Sprintf("unsupported statement type: %T", stmt)) } @@ -281,3 +288,51 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql } return &influxql.Result{Series: rows} } + +func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) *influxql.Result { + dis, err := e.Store.Databases() + if err != nil { + return &influxql.Result{Err: err} + } + + rows := []*influxql.Row{} + for _, di := range dis { + row := &influxql.Row{Columns: []string{"id", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name} + for _, rpi := range di.RetentionPolicies { + for _, sgi := range rpi.ShardGroups { + for _, si := range sgi.Shards { + ownerIDs := make([]uint64, len(si.Owners)) + for i, owner := range si.Owners { + ownerIDs[i] = owner.NodeID + } + + row.Values = append(row.Values, []interface{}{ + si.ID, + sgi.StartTime.UTC().Format(time.RFC3339), + sgi.EndTime.UTC().Format(time.RFC3339), + sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339), + joinUint64(ownerIDs), + }) + } + } + } + rows = append(rows, row) + } + return &influxql.Result{Series: rows} +} + +func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsStatement) *influxql.Result { + return &influxql.Result{Err: fmt.Errorf("SHOW STATS is not implemented yet")} +} + +// joinUint64 returns a comma-delimited string of uint64 numbers. +func joinUint64(a []uint64) string { + var buf bytes.Buffer + for i, x := range a { + buf.WriteString(strconv.FormatUint(x, 10)) + if i < len(a)-1 { + buf.WriteRune(',') + } + } + return buf.String() +} diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index 64894aaead..a22610ea4a 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -765,6 +765,57 @@ func TestStatementExecutor_ExecuteStatement_Unsupported(t *testing.T) { } } +// Ensure a SHOW SHARDS statement can be executed. +func TestStatementExecutor_ExecuteStatement_ShowShards(t *testing.T) { + e := NewStatementExecutor() + e.Store.DatabasesFn = func() ([]meta.DatabaseInfo, error) { + return []meta.DatabaseInfo{ + { + Name: "foo", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Duration: time.Second, + ShardGroups: []meta.ShardGroupInfo{ + { + StartTime: time.Unix(0, 0), + EndTime: time.Unix(1, 0), + Shards: []meta.ShardInfo{ + { + ID: 1, + Owners: []meta.ShardOwner{ + {NodeID: 1}, + {NodeID: 2}, + {NodeID: 3}, + }, + }, + { + ID: 2, + }, + }, + }, + }, + }, + }, + }, + }, nil + } + + if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SHARDS`)); res.Err != nil { + t.Fatal(res.Err) + } else if !reflect.DeepEqual(res.Series, influxql.Rows{ + { + Name: "foo", + Columns: []string{"id", "start_time", "end_time", "expiry_time", "owners"}, + Values: [][]interface{}{ + {uint64(1), "1970-01-01T00:00:00Z", "1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z", "1,2,3"}, + {uint64(2), "1970-01-01T00:00:00Z", "1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z", ""}, + }, + }, + }) { + t.Fatalf("unexpected rows: %s", spew.Sdump(res.Series)) + } +} + // StatementExecutor represents a test wrapper for meta.StatementExecutor. type StatementExecutor struct { *meta.StatementExecutor