Merge pull request #5524 from seiflotfy/fix5516
Remove redundant error return of (Client) Database(name string)pull/6539/head
commit
0ad2d6fd2c
|
@ -15,8 +15,8 @@ type MetaClient interface {
|
|||
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
CreateSubscription(database, rp, name, mode string, destinations []string) error
|
||||
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
|
||||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
Databases() ([]meta.DatabaseInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
Databases() []meta.DatabaseInfo
|
||||
DropShard(id uint64) error
|
||||
DropContinuousQuery(database, name string) error
|
||||
DropDatabase(name string) error
|
||||
|
|
|
@ -15,8 +15,8 @@ type MetaClient struct {
|
|||
CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
|
||||
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
|
||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
DatabasesFn func() ([]meta.DatabaseInfo, error)
|
||||
DatabaseFn func(name string) *meta.DatabaseInfo
|
||||
DatabasesFn func() []meta.DatabaseInfo
|
||||
DataNodeFn func(id uint64) (*meta.NodeInfo, error)
|
||||
DataNodesFn func() ([]meta.NodeInfo, error)
|
||||
DeleteDataNodeFn func(id uint64) error
|
||||
|
@ -67,11 +67,11 @@ func (c *MetaClient) CreateUser(name, password string, admin bool) (*meta.UserIn
|
|||
return c.CreateUserFn(name, password, admin)
|
||||
}
|
||||
|
||||
func (c *MetaClient) Database(name string) (*meta.DatabaseInfo, error) {
|
||||
func (c *MetaClient) Database(name string) *meta.DatabaseInfo {
|
||||
return c.DatabaseFn(name)
|
||||
}
|
||||
|
||||
func (c *MetaClient) Databases() ([]meta.DatabaseInfo, error) {
|
||||
func (c *MetaClient) Databases() []meta.DatabaseInfo {
|
||||
return c.DatabasesFn()
|
||||
}
|
||||
|
||||
|
@ -156,9 +156,9 @@ func (c *MetaClient) Users() []meta.UserInfo {
|
|||
}
|
||||
|
||||
// DefaultMetaClientDatabaseFn returns a single database (db0) with a retention policy.
|
||||
func DefaultMetaClientDatabaseFn(name string) (*meta.DatabaseInfo, error) {
|
||||
func DefaultMetaClientDatabaseFn(name string) *meta.DatabaseInfo {
|
||||
return &meta.DatabaseInfo{
|
||||
Name: DefaultDatabase,
|
||||
DefaultRetentionPolicy: DefaultRetentionPolicy,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ type PointsWriter struct {
|
|||
Node *influxdb.Node
|
||||
|
||||
MetaClient interface {
|
||||
Database(name string) (di *meta.DatabaseInfo, err error)
|
||||
Database(name string) (di *meta.DatabaseInfo)
|
||||
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
|
||||
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
|
||||
|
@ -197,10 +197,8 @@ func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistency
|
|||
w.statMap.Add(statPointWriteReq, int64(len(points)))
|
||||
|
||||
if retentionPolicy == "" {
|
||||
db, err := w.MetaClient.Database(database)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db == nil {
|
||||
db := w.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
retentionPolicy = db.DefaultRetentionPolicy
|
||||
|
|
|
@ -212,8 +212,8 @@ func TestPointsWriter_WritePoints(t *testing.T) {
|
|||
}
|
||||
|
||||
ms := NewPointsWriterMetaClient()
|
||||
ms.DatabaseFn = func(database string) (*meta.DatabaseInfo, error) {
|
||||
return nil, nil
|
||||
ms.DatabaseFn = func(database string) *meta.DatabaseInfo {
|
||||
return nil
|
||||
}
|
||||
ms.NodeIDFn = func() uint64 { return 1 }
|
||||
|
||||
|
@ -314,7 +314,7 @@ type PointsWriterMetaClient struct {
|
|||
NodeIDFn func() uint64
|
||||
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
|
||||
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
DatabaseFn func(database string) (*meta.DatabaseInfo, error)
|
||||
DatabaseFn func(database string) *meta.DatabaseInfo
|
||||
ShardOwnerFn func(shardID uint64) (string, string, *meta.ShardGroupInfo)
|
||||
}
|
||||
|
||||
|
@ -328,7 +328,7 @@ func (m PointsWriterMetaClient) CreateShardGroup(database, policy string, timest
|
|||
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
|
||||
}
|
||||
|
||||
func (m PointsWriterMetaClient) Database(database string) (*meta.DatabaseInfo, error) {
|
||||
func (m PointsWriterMetaClient) Database(database string) *meta.DatabaseInfo {
|
||||
return m.DatabaseFn(database)
|
||||
}
|
||||
|
||||
|
|
|
@ -257,9 +257,7 @@ func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserSta
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSeriesStatement, database string) error {
|
||||
if dbi, err := e.MetaClient.Database(database); err != nil {
|
||||
return err
|
||||
} else if dbi == nil {
|
||||
if dbi := e.MetaClient.Database(database); dbi == nil {
|
||||
return influxql.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
|
@ -288,9 +286,7 @@ func (e *StatementExecutor) executeDropDatabaseStatement(stmt *influxql.DropData
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
|
||||
if dbi, err := e.MetaClient.Database(database); err != nil {
|
||||
return err
|
||||
} else if dbi == nil {
|
||||
if dbi := e.MetaClient.Database(database); dbi == nil {
|
||||
return influxql.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
|
@ -299,9 +295,7 @@ func (e *StatementExecutor) executeDropMeasurementStatement(stmt *influxql.DropM
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string) error {
|
||||
if dbi, err := e.MetaClient.Database(database); err != nil {
|
||||
return err
|
||||
} else if dbi == nil {
|
||||
if dbi := e.MetaClient.Database(database); dbi == nil {
|
||||
return influxql.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
||||
|
@ -545,10 +539,7 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
|
||||
dis, err := e.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dis := e.MetaClient.Databases()
|
||||
|
||||
rows := []*models.Row{}
|
||||
for _, di := range dis {
|
||||
|
@ -562,10 +553,7 @@ func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement) (models.Rows, error) {
|
||||
dis, err := e.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dis := e.MetaClient.Databases()
|
||||
|
||||
row := &models.Row{Name: "databases", Columns: []string{"name"}}
|
||||
for _, di := range dis {
|
||||
|
@ -616,10 +604,8 @@ func (e *StatementExecutor) executeShowGrantsForUserStatement(q *influxql.ShowGr
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement) (models.Rows, error) {
|
||||
di, err := e.MetaClient.Database(q.Database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if di == nil {
|
||||
di := e.MetaClient.Database(q.Database)
|
||||
if di == nil {
|
||||
return nil, influxdb.ErrDatabaseNotFound(q.Database)
|
||||
}
|
||||
|
||||
|
@ -631,10 +617,7 @@ func (e *StatementExecutor) executeShowRetentionPoliciesStatement(q *influxql.Sh
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShardsStatement) (models.Rows, error) {
|
||||
dis, err := e.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dis := e.MetaClient.Databases()
|
||||
|
||||
rows := []*models.Row{}
|
||||
for _, di := range dis {
|
||||
|
@ -672,10 +655,7 @@ func (e *StatementExecutor) executeShowShardsStatement(stmt *influxql.ShowShards
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowShardGroupsStatement(stmt *influxql.ShowShardGroupsStatement) (models.Rows, error) {
|
||||
dis, err := e.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dis := e.MetaClient.Databases()
|
||||
|
||||
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "start_time", "end_time", "expiry_time"}, Name: "shard groups"}
|
||||
for _, di := range dis {
|
||||
|
@ -727,10 +707,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt
|
|||
}
|
||||
|
||||
func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.ShowSubscriptionsStatement) (models.Rows, error) {
|
||||
dis, err := e.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dis := e.MetaClient.Databases()
|
||||
|
||||
rows := []*models.Row{}
|
||||
for _, di := range dis {
|
||||
|
@ -861,10 +838,8 @@ func (e *StatementExecutor) normalizeMeasurement(m *influxql.Measurement, defaul
|
|||
}
|
||||
|
||||
// Find database.
|
||||
di, err := e.MetaClient.Database(m.Database)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if di == nil {
|
||||
di := e.MetaClient.Database(m.Database)
|
||||
if di == nil {
|
||||
return influxdb.ErrDatabaseNotFound(m.Database)
|
||||
}
|
||||
|
||||
|
|
|
@ -390,11 +390,7 @@ func (s *Server) startServerReporting() {
|
|||
|
||||
// reportServer reports anonymous statistics about the system.
|
||||
func (s *Server) reportServer() {
|
||||
dis, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.Logger.Printf("failed to retrieve databases for reporting: %s", err.Error())
|
||||
return
|
||||
}
|
||||
dis := s.MetaClient.Databases()
|
||||
numDatabases := len(dis)
|
||||
|
||||
numMeasurements := 0
|
||||
|
@ -415,11 +411,6 @@ func (s *Server) reportServer() {
|
|||
}
|
||||
|
||||
clusterID := s.MetaClient.ClusterID()
|
||||
if err != nil {
|
||||
s.Logger.Printf("failed to retrieve cluster ID for reporting: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
cl := client.New("")
|
||||
usage := client.Usage{
|
||||
Product: "influxdb",
|
||||
|
|
|
@ -39,8 +39,8 @@ type ContinuousQuerier interface {
|
|||
// metaClient is an internal interface to make testing easier.
|
||||
type metaClient interface {
|
||||
AcquireLease(name string) (l *meta.Lease, err error)
|
||||
Databases() ([]meta.DatabaseInfo, error)
|
||||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
Databases() []meta.DatabaseInfo
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
}
|
||||
|
||||
// RunRequest is a request to run one or more CQs.
|
||||
|
@ -140,20 +140,14 @@ func (s *Service) Run(database, name string, t time.Time) error {
|
|||
|
||||
if database != "" {
|
||||
// Find the requested database.
|
||||
db, err := s.MetaClient.Database(database)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if db == nil {
|
||||
db := s.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxql.ErrDatabaseNotFound(database)
|
||||
}
|
||||
dbs = append(dbs, *db)
|
||||
} else {
|
||||
// Get all databases.
|
||||
var err error
|
||||
dbs, err = s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbs = s.MetaClient.Databases()
|
||||
}
|
||||
|
||||
// Loop through databases.
|
||||
|
@ -209,11 +203,7 @@ func (s *Service) backgroundLoop() {
|
|||
// hasContinuousQueries returns true if any CQs exist.
|
||||
func (s *Service) hasContinuousQueries() bool {
|
||||
// Get list of all databases.
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.Logger.Println("error getting databases")
|
||||
return false
|
||||
}
|
||||
dbs := s.MetaClient.Databases()
|
||||
// Loop through all databases executing CQs.
|
||||
for _, db := range dbs {
|
||||
if len(db.ContinuousQueries) > 0 {
|
||||
|
@ -226,11 +216,7 @@ func (s *Service) hasContinuousQueries() bool {
|
|||
// runContinuousQueries gets CQs from the meta store and runs them.
|
||||
func (s *Service) runContinuousQueries(req *RunRequest) {
|
||||
// Get list of all databases.
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.Logger.Println("error getting databases")
|
||||
return
|
||||
}
|
||||
dbs := s.MetaClient.Databases()
|
||||
// Loop through all databases executing CQs.
|
||||
for _, db := range dbs {
|
||||
// TODO: distribute across nodes
|
||||
|
|
|
@ -100,10 +100,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) {
|
|||
mc.CreateContinuousQuery("db", "cq", `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 2m BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(1m) END`)
|
||||
s.MetaClient = mc
|
||||
|
||||
db, err := s.MetaClient.Database("db")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db := s.MetaClient.Database("db")
|
||||
|
||||
cq, err := NewContinuousQuery(db.Name, &db.ContinuousQueries[0])
|
||||
if err != nil {
|
||||
|
@ -294,7 +291,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
|
|||
return errUnexpected
|
||||
},
|
||||
}
|
||||
dbis, _ := s.MetaClient.Databases()
|
||||
dbis := s.MetaClient.Databases()
|
||||
dbi := dbis[0]
|
||||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
|
@ -328,7 +325,7 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
dbis, _ := s.MetaClient.Databases()
|
||||
dbis := s.MetaClient.Databases()
|
||||
dbi := dbis[0]
|
||||
cqi := dbi.ContinuousQueries[0]
|
||||
|
||||
|
@ -399,29 +396,29 @@ func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) {
|
|||
}
|
||||
|
||||
// Databases returns a list of database info about each database in the cluster.
|
||||
func (ms *MetaClient) Databases() ([]meta.DatabaseInfo, error) {
|
||||
func (ms *MetaClient) Databases() []meta.DatabaseInfo {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
return ms.DatabaseInfos, ms.Err
|
||||
return ms.DatabaseInfos
|
||||
}
|
||||
|
||||
// Database returns a single database by name.
|
||||
func (ms *MetaClient) Database(name string) (*meta.DatabaseInfo, error) {
|
||||
func (ms *MetaClient) Database(name string) *meta.DatabaseInfo {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
return ms.database(name)
|
||||
}
|
||||
|
||||
func (ms *MetaClient) database(name string) (*meta.DatabaseInfo, error) {
|
||||
func (ms *MetaClient) database(name string) *meta.DatabaseInfo {
|
||||
if ms.Err != nil {
|
||||
return nil, ms.Err
|
||||
return nil
|
||||
}
|
||||
for i := range ms.DatabaseInfos {
|
||||
if ms.DatabaseInfos[i].Name == name {
|
||||
return &ms.DatabaseInfos[i], nil
|
||||
return &ms.DatabaseInfos[i]
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("database not found: %s", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateDatabase adds a new database to the meta store.
|
||||
|
@ -456,10 +453,8 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
|
|||
return ms.Err
|
||||
}
|
||||
|
||||
dbi, err := ms.database(database)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if dbi == nil {
|
||||
dbi := ms.database(database)
|
||||
if dbi == nil {
|
||||
return fmt.Errorf("database not found: %s", database)
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ type Handler struct {
|
|||
Version string
|
||||
|
||||
MetaClient interface {
|
||||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
Authenticate(username, password string) (ui *meta.UserInfo, err error)
|
||||
Users() []meta.UserInfo
|
||||
}
|
||||
|
@ -419,10 +419,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
return
|
||||
}
|
||||
|
||||
if di, err := h.MetaClient.Database(database); err != nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("metastore database error: %s", err)}, http.StatusInternalServerError)
|
||||
return
|
||||
} else if di == nil {
|
||||
if di := h.MetaClient.Database(database); di == nil {
|
||||
resultError(w, influxql.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -300,7 +300,7 @@ func NewHandler(requireAuthentication bool) *Handler {
|
|||
// HandlerMetaStore is a mock implementation of Handler.MetaClient.
|
||||
type HandlerMetaStore struct {
|
||||
PingFn func(d time.Duration) error
|
||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
DatabaseFn func(name string) *meta.DatabaseInfo
|
||||
AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error)
|
||||
UsersFn func() []meta.UserInfo
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ func (s *HandlerMetaStore) Ping(b bool) error {
|
|||
return s.Ping(b)
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) Database(name string) (*meta.DatabaseInfo, error) {
|
||||
func (s *HandlerMetaStore) Database(name string) *meta.DatabaseInfo {
|
||||
return s.DatabaseFn(name)
|
||||
}
|
||||
|
||||
|
|
|
@ -151,31 +151,31 @@ func (c *Client) ClusterID() uint64 {
|
|||
}
|
||||
|
||||
// Database returns info for the requested database.
|
||||
func (c *Client) Database(name string) (*DatabaseInfo, error) {
|
||||
func (c *Client) Database(name string) *DatabaseInfo {
|
||||
c.mu.RLock()
|
||||
data := c.cacheData.Clone()
|
||||
c.mu.RUnlock()
|
||||
|
||||
for _, d := range data.Databases {
|
||||
if d.Name == name {
|
||||
return &d, nil
|
||||
return &d
|
||||
}
|
||||
}
|
||||
|
||||
return nil, influxdb.ErrDatabaseNotFound(name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Databases returns a list of all database infos.
|
||||
func (c *Client) Databases() ([]DatabaseInfo, error) {
|
||||
func (c *Client) Databases() []DatabaseInfo {
|
||||
c.mu.RLock()
|
||||
data := c.cacheData.Clone()
|
||||
c.mu.RUnlock()
|
||||
|
||||
dbs := data.Databases
|
||||
if dbs == nil {
|
||||
return []DatabaseInfo{}, nil
|
||||
return []DatabaseInfo{}
|
||||
}
|
||||
return dbs, nil
|
||||
return dbs
|
||||
}
|
||||
|
||||
// CreateDatabase creates a database or returns it if it already exists
|
||||
|
|
|
@ -30,10 +30,8 @@ func TestMetaClient_CreateDatabaseOnly(t *testing.T) {
|
|||
t.Fatalf("database name mismatch. exp: db0, got %s", db.Name)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -61,10 +59,8 @@ func TestMetaClient_CreateDatabaseIfNotExists(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -92,19 +88,15 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
}
|
||||
|
||||
rp := db.RetentionPolicy("rp0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rp.Name != "rp0" {
|
||||
if rp.Name != "rp0" {
|
||||
t.Fatalf("rp name wrong: %s", rp.Name)
|
||||
} else if rp.Duration != time.Hour {
|
||||
t.Fatalf("rp duration wrong: %v", rp.Duration)
|
||||
|
@ -166,7 +158,7 @@ func TestMetaClient_Databases(t *testing.T) {
|
|||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
}
|
||||
|
||||
dbs, err := c.Databases()
|
||||
dbs := c.Databases()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -190,10 +182,8 @@ func TestMetaClient_DropDatabase(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatalf("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -203,7 +193,7 @@ func TestMetaClient_DropDatabase(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if db, _ = c.Database("db0"); db != nil {
|
||||
if db = c.Database("db0"); db != nil {
|
||||
t.Fatalf("expected database to not return: %v", db)
|
||||
}
|
||||
|
||||
|
@ -224,10 +214,8 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -306,10 +294,8 @@ func TestMetaClient_SetDefaultRetentionPolicy(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("datbase not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -343,10 +329,8 @@ func TestMetaClient_DropRetentionPolicy(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -503,10 +487,8 @@ func TestMetaClient_CreateUser(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db.Name != "db0" {
|
||||
db := c.Database("db0")
|
||||
if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
}
|
||||
|
||||
|
@ -567,10 +549,8 @@ func TestMetaClient_ContinuousQueries(t *testing.T) {
|
|||
if _, err := c.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatalf("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
@ -620,10 +600,8 @@ func TestMetaClient_Subscriptions_Create(t *testing.T) {
|
|||
if _, err := c.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db, err := c.Database("db0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if db == nil {
|
||||
db := c.Database("db0")
|
||||
if db == nil {
|
||||
t.Fatal("database not found")
|
||||
} else if db.Name != "db0" {
|
||||
t.Fatalf("db name wrong: %s", db.Name)
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
// Service represents the retention policy enforcement service.
|
||||
type Service struct {
|
||||
MetaClient interface {
|
||||
Databases() ([]meta.DatabaseInfo, error)
|
||||
Databases() []meta.DatabaseInfo
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
}
|
||||
TSDBStore interface {
|
||||
|
@ -72,12 +72,7 @@ func (s *Service) deleteShardGroups() {
|
|||
return
|
||||
|
||||
case <-ticker.C:
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.logger.Printf("error getting databases: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
dbs := s.MetaClient.Databases()
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
|
||||
|
@ -113,10 +108,7 @@ func (s *Service) deleteShards() {
|
|||
rp string
|
||||
}
|
||||
deletedShardIDs := make(map[uint64]deletionInfo, 0)
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.logger.Printf("error getting databases: %s", err.Error())
|
||||
}
|
||||
dbs := s.MetaClient.Databases()
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
for _, g := range r.DeletedShardGroups() {
|
||||
|
|
|
@ -37,7 +37,7 @@ type Service struct {
|
|||
|
||||
MetaClient interface {
|
||||
encoding.BinaryMarshaler
|
||||
Database(name string) (*meta.DatabaseInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
}
|
||||
|
||||
TSDBStore *tsdb.Store
|
||||
|
@ -176,10 +176,7 @@ func (s *Service) writeMetaStore(conn net.Conn) error {
|
|||
// this server into the connection
|
||||
func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
|
||||
res := Response{}
|
||||
db, err := s.MetaClient.Database(database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db := s.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
@ -213,10 +210,7 @@ func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
|
|||
// this server into the connection
|
||||
func (s *Service) writeRetentionPolicyInfo(conn net.Conn, database, retentionPolicy string) error {
|
||||
res := Response{}
|
||||
db, err := s.MetaClient.Database(database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db := s.MetaClient.Database(database)
|
||||
if db == nil {
|
||||
return influxdb.ErrDatabaseNotFound(database)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ type subEntry struct {
|
|||
type Service struct {
|
||||
subs map[subEntry]PointsWriter
|
||||
MetaClient interface {
|
||||
Databases() ([]meta.DatabaseInfo, error)
|
||||
Databases() []meta.DatabaseInfo
|
||||
WaitForDataChanged() chan struct{}
|
||||
}
|
||||
NewPointsWriter func(u url.URL) (PointsWriter, error)
|
||||
|
@ -138,10 +138,7 @@ func (s *Service) waitForMetaUpdates() {
|
|||
|
||||
// Update will start new and stop deleted subscriptions.
|
||||
func (s *Service) Update() error {
|
||||
dbis, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbis := s.MetaClient.Databases()
|
||||
allEntries := make(map[subEntry]bool, 0)
|
||||
// Add in new subscriptions
|
||||
for _, dbi := range dbis {
|
||||
|
|
|
@ -11,11 +11,11 @@ import (
|
|||
)
|
||||
|
||||
type MetaClient struct {
|
||||
DatabasesFn func() ([]meta.DatabaseInfo, error)
|
||||
DatabasesFn func() []meta.DatabaseInfo
|
||||
WaitForDataChangedFn func() chan struct{}
|
||||
}
|
||||
|
||||
func (m MetaClient) Databases() ([]meta.DatabaseInfo, error) {
|
||||
func (m MetaClient) Databases() []meta.DatabaseInfo {
|
||||
return m.DatabasesFn()
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@ func TestService_IgnoreNonMatch(t *testing.T) {
|
|||
ms.WaitForDataChangedFn = func() chan struct{} {
|
||||
return dataChanged
|
||||
}
|
||||
ms.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
|
@ -50,7 +50,7 @@ func TestService_IgnoreNonMatch(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
|
@ -112,7 +112,7 @@ func TestService_ModeALL(t *testing.T) {
|
|||
ms.WaitForDataChangedFn = func() chan struct{} {
|
||||
return dataChanged
|
||||
}
|
||||
ms.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
|
@ -125,7 +125,7 @@ func TestService_ModeALL(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
|
@ -190,7 +190,7 @@ func TestService_ModeANY(t *testing.T) {
|
|||
ms.WaitForDataChangedFn = func() chan struct{} {
|
||||
return dataChanged
|
||||
}
|
||||
ms.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
|
@ -203,7 +203,7 @@ func TestService_ModeANY(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 2)
|
||||
|
@ -272,7 +272,7 @@ func TestService_Multiple(t *testing.T) {
|
|||
ms.WaitForDataChangedFn = func() chan struct{} {
|
||||
return dataChanged
|
||||
}
|
||||
ms.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
|
@ -291,7 +291,7 @@ func TestService_Multiple(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
prs := make(chan *cluster.WritePointsRequest, 4)
|
||||
|
@ -391,9 +391,9 @@ func TestService_WaitForDataChanged(t *testing.T) {
|
|||
return dataChanged
|
||||
}
|
||||
calls := make(chan bool, 2)
|
||||
ms.DatabasesFn = func() ([]meta.DatabaseInfo, error) {
|
||||
ms.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
calls <- true
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
s := subscriber.NewService(subscriber.NewConfig())
|
||||
|
|
Loading…
Reference in New Issue