Merge pull request #2897 from influxdb/graphite_db

Ensure target Graphite database exists
pull/2898/head
Philip O'Toole 2015-06-10 20:05:13 -07:00
commit f60c816acb
4 changed files with 34 additions and 2 deletions

View File

@ -5,7 +5,7 @@
- [#2869](https://github.com/influxdb/influxdb/issues/2869): Adding field to existing measurement causes panic
- [#2849](https://github.com/influxdb/influxdb/issues/2849): RC32: Frequent write errors
- [#2700](https://github.com/influxdb/influxdb/issues/2700): Incorrect error message in database EncodeFields
- [#2897](https://github.com/influxdb/influxdb/pull/2897): Ensure target Graphite database exists
## v0.9.0-rc33 [2015-06-09]

View File

@ -172,6 +172,7 @@ func (s *Server) appendGraphiteService(c graphite.Config) error {
}
srv.PointsWriter = s.PointsWriter
srv.MetaStore = s.MetaStore
s.Services = append(s.Services, srv)
return nil
}

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tsdb"
)
@ -41,6 +42,9 @@ type Service struct {
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}
MetaStore interface {
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
}
}
// NewService returns an instance of the Graphite service.
@ -73,6 +77,12 @@ func NewService(c Config) (*Service, error) {
func (s *Service) Open() error {
var err error
if _, err := s.MetaStore.CreateDatabaseIfNotExists(s.database); err != nil {
s.logger.Printf("failed to ensure target database %s exists: %s", s.database, err.Error())
return err
}
s.logger.Printf("ensured target database %s exists", s.database)
if strings.ToLower(s.protocol) == "tcp" {
s.addr, err = s.openTCPServer()
} else if strings.ToLower(s.protocol) == "udp" {
@ -80,7 +90,6 @@ func (s *Service) Open() error {
} else {
return fmt.Errorf("unrecognized Graphite input protocol %s", s.protocol)
}
if err != nil {
return err
}

View File

@ -11,6 +11,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/toml"
"github.com/influxdb/influxdb/tsdb"
@ -272,11 +273,17 @@ func Test_ServerGraphiteTCP(t *testing.T) {
},
}
service.PointsWriter = &pointsWriter
dbCreator := DatabaseCreator{}
service.MetaStore = &dbCreator
if err := service.Open(); err != nil {
t.Fatalf("failed to open Graphite service: %s", err.Error())
}
if !dbCreator.Created {
t.Fatalf("failed to create target database")
}
// Connect to the graphite endpoint we just spun up
_, port, _ := net.SplitHostPort(service.Addr().String())
conn, err := net.Dial("tcp", "127.0.0.1:"+port)
@ -339,11 +346,17 @@ func Test_ServerGraphiteUDP(t *testing.T) {
},
}
service.PointsWriter = &pointsWriter
dbCreator := DatabaseCreator{}
service.MetaStore = &dbCreator
if err := service.Open(); err != nil {
t.Fatalf("failed to open Graphite service: %s", err.Error())
}
if !dbCreator.Created {
t.Fatalf("failed to create target database")
}
// Connect to the graphite endpoint we just spun up
_, port, _ := net.SplitHostPort(service.Addr().String())
conn, err := net.Dial("udp", "127.0.0.1:"+port)
@ -371,6 +384,15 @@ func (w *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
return w.WritePointsFn(p)
}
type DatabaseCreator struct {
Created bool
}
func (d *DatabaseCreator) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
d.Created = true
return nil, nil
}
// Test Helpers
func errstr(err error) string {
if err != nil {