diff --git a/CHANGELOG.md b/CHANGELOG.md index 07c3ce234b..02ae283036 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,16 @@ ## v0.9.0-rc11 [unreleased] +### Bugfixes +- [#1917](https://github.com/influxdb/influxdb/pull/1902): Creating Infinite Retention Policy Failed. +- [#1758](https://github.com/influxdb/influxdb/pull/1758): Add Graphite Integration Test. +- [#1929](https://github.com/influxdb/influxdb/pull/1929): Default Retention Policy incorrectly auto created. +- [#1930](https://github.com/influxdb/influxdb/pull/1930): Auto create database for graphite if not specified. +- [#1931](https://github.com/influxdb/influxdb/pull/1931): Add default column to SHOW RETENTION POLICIES. + ### Features -- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duraton. +- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duration. +- [#1906](https://github.com/influxdb/influxdb/pull/1906): Add show servers to query language. +- [#1925](https://github.com/influxdb/influxdb/pull/1925): Add `fill(none)`, `fill(previous)`, and `fill()` to queries. ## v0.9.0-rc10 [2015-03-09] diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 22981bd125..e0928f9fb7 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -67,9 +67,14 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B // We want to make sure we are spun up before we exit this function, so we manually listen and serve listener, err := net.Listen("tcp", config.BrokerAddr()) if err != nil { - log.Fatal(err) + log.Fatalf("Broker failed to listen on %s. %s ", config.BrokerAddr(), err) } - go func() { log.Fatal(http.Serve(listener, h)) }() + go func() { + err := http.Serve(listener, h) + if err != nil { + log.Fatalf("Broker failed to server on %s.: %s", config.BrokerAddr(), err) + } + }() log.Printf("broker listening on %s", config.BrokerAddr()) // have it occasionally tell a data node in the cluster to run continuous queries @@ -162,6 +167,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B if strings.ToLower(c.Protocol) == "tcp" { g := graphite.NewTCPServer(parser, s) g.Database = c.Database + g.SetLogOutput(logWriter) err := g.ListenAndServe(c.ConnectionString(config.BindAddress)) if err != nil { log.Printf("failed to start TCP Graphite Server: %v\n", err.Error()) @@ -169,12 +175,13 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B } else if strings.ToLower(c.Protocol) == "udp" { g := graphite.NewUDPServer(parser, s) g.Database = c.Database + g.SetLogOutput(logWriter) err := g.ListenAndServe(c.ConnectionString(config.BindAddress)) if err != nil { log.Printf("failed to start UDP Graphite Server: %v\n", err.Error()) } } else { - log.Fatalf("unrecognized Graphite Server prototcol %s", c.Protocol) + log.Fatalf("unrecognized Graphite Server protocol %s", c.Protocol) } } } diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index eac242cf63..632b7a749b 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -5,12 +5,15 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" + "net" "net/http" "net/url" "os" "path/filepath" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -66,7 +69,7 @@ type Cluster []*Node // the testing is marked as failed. // // This function returns a slice of nodes, the first of which will be the leader. -func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster { +func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int, baseConfig *main.Config) Cluster { t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName) if nNodes < 1 { t.Fatalf("Test %s: asked to create nonsense cluster", testName) @@ -85,7 +88,10 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba _ = os.RemoveAll(tmpDataDir) // Create the first node, special case. - c := main.NewConfig() + c := baseConfig + if c == nil { + c = main.NewConfig() + } c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort)) c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort)) c.Broker.Port = basePort @@ -167,9 +173,6 @@ func write(t *testing.T, node *Node, data string) { body, _ := ioutil.ReadAll(resp.Body) t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body)) } - - // Until races are solved. - time.Sleep(3 * time.Second) } // query executes the given query against all nodes in the cluster, and verifies no errors occured, and @@ -202,6 +205,38 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected string) (string, return "", true } +// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and +// ensures the returned data is as expected until the timeout occurs +func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeout time.Duration) (string, bool) { + v := url.Values{"q": []string{q}} + if urlDb != "" { + v.Set("db", urlDb) + } + + var ( + timedOut int32 + timer = time.NewTimer(time.Duration(math.MaxInt64)) + ) + defer timer.Stop() + if timeout > 0 { + timer.Reset(time.Duration(timeout)) + go func() { + <-timer.C + atomic.StoreInt32(&timedOut, 1) + }() + } + + for { + if got, ok := query(t, nodes, urlDb, q, expected); ok { + return got, ok + } else if atomic.LoadInt32(&timedOut) == 1 { + return fmt.Sprintf("timed out before expected result was found: got: %s", got), false + } else { + time.Sleep(10 * time.Millisecond) + } + } +} + // runTests_Errors tests some basic error cases. func runTests_Errors(t *testing.T, nodes Cluster) { t.Logf("Running tests against %d-node cluster", len(nodes)) @@ -260,6 +295,11 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent query: `SELECT * FROM "%DB%"."%RP%".cpu`, expected: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2015-02-28T01:03:36.703820946Z",100]]}]}]}`, }, + { + name: "single point count query with timestamp", + query: `SELECT count(value) FROM "%DB%"."%RP%".cpu`, + expected: `{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`, + }, { name: "single string point with timestamp", write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "logs", "timestamp": "2015-02-28T01:03:36.703820946Z", "tags": {"host": "server01"}, "fields": {"value": "disk full"}}]}`, @@ -290,6 +330,30 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"error":"unknown field or tag name in select clause: abc"}]}`, }, + { + name: "single string point with second precision timestamp", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu_s_precision", "timestamp": 1, "precision": "s", "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu_s_precision`, + expected: `{"results":[{"series":[{"name":"cpu_s_precision","columns":["time","value"],"values":[["1970-01-01T00:00:01Z",100]]}]}]}`, + }, + { + name: "single string point with millisecond precision timestamp", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu_ms_precision", "timestamp": 1000, "precision": "ms", "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu_ms_precision`, + expected: `{"results":[{"series":[{"name":"cpu_ms_precision","columns":["time","value"],"values":[["1970-01-01T00:00:01Z",100]]}]}]}`, + }, + { + name: "single string point with nanosecond precision timestamp", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu_n_precision", "timestamp": 2000000000, "precision": "n", "fields": {"value": 100}}]}`, + query: `SELECT * FROM "%DB%"."%RP%".cpu_n_precision`, + expected: `{"results":[{"series":[{"name":"cpu_n_precision","columns":["time","value"],"values":[["1970-01-01T00:00:02Z",100]]}]}]}`, + }, + { + name: "single point count query with nanosecond precision timestamp", + query: `SELECT count(value) FROM "%DB%"."%RP%".cpu_n_precision`, + expected: `{"results":[{"series":[{"name":"cpu_n_precision","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]}`, + }, + // WHERE fields queries { reset: true, @@ -431,6 +495,34 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`, }, + // Fill tests + { + name: "fill with value", + write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [ + {"name": "fills", "timestamp": "2009-11-10T23:00:02Z","fields": {"val": 3}}, + {"name": "fills", "timestamp": "2009-11-10T23:00:03Z","fields": {"val": 5}}, + {"name": "fills", "timestamp": "2009-11-10T23:00:06Z","fields": {"val": 4}}, + {"name": "fills", "timestamp": "2009-11-10T23:00:16Z","fields": {"val": 10}} + ]}`, + query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(1)`, + expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",1],["2009-11-10T23:00:15Z",10]]}]}]}`, + }, + { + name: "fill with previous", + query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(previous)`, + expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",4],["2009-11-10T23:00:15Z",10]]}]}]}`, + }, + { + name: "fill with none, i.e. clear out nulls", + query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s) fill(none)`, + expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:15Z",10]]}]}]}`, + }, + { + name: "fill defaults to null", + query: `select mean(val) from "%DB%"."%RP%".fills where time >= '2009-11-10T23:00:00Z' and time < '2009-11-10T23:00:20Z' group by time(5s)`, + expected: `{"results":[{"series":[{"name":"fills","columns":["time","mean"],"values":[["2009-11-10T23:00:00Z",4],["2009-11-10T23:00:05Z",4],["2009-11-10T23:00:10Z",null],["2009-11-10T23:00:15Z",10]]}]}]}`, + }, + // Metadata display tests { @@ -594,7 +686,22 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent { name: "Check for default retention policy", query: `SHOW RETENTION POLICIES mydatabase`, - expected: `{"results":[{"series":[{"columns":["name","duration","replicaN"],"values":[["default","0",1]]}]}]}`, + expected: `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}`, + }, + { + name: "Ensure retention policy with infinite retention can be created", + query: `CREATE RETENTION POLICY rp1 ON mydatabase DURATION INF REPLICATION 1`, + expected: `{"results":[{}]}`, + }, + { + name: "Ensure retention policy with acceptable retention can be created", + query: `CREATE RETENTION POLICY rp2 ON mydatabase DURATION 30d REPLICATION 1`, + expected: `{"results":[{}]}`, + }, + { + name: "Ensure retention policy with unacceptable retention cannot be created", + query: `CREATE RETENTION POLICY rp3 ON mydatabase DURATION 1s REPLICATION 1`, + expected: `{"results":[{"error":"retention policy duration needs to be at least 1h0m0s"}]}`, }, { name: "Ensure database with default retention policy can be deleted", @@ -710,7 +817,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent if tt.queryDb != "" { urlDb = tt.queryDb } - got, ok := query(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention)) + got, ok := queryAndWait(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention), 3*time.Second) if !ok { t.Errorf("Test \"%s\" failed\n exp: %s\n got: %s\n", name, rewriteDbRp(tt.expected, database, retention), got) } @@ -728,7 +835,7 @@ func TestSingleServer(t *testing.T) { os.RemoveAll(dir) }() - nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090) + nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil) runTestsData(t, testName, nodes, "mydb", "myrp") } @@ -744,7 +851,7 @@ func Test3NodeServer(t *testing.T) { os.RemoveAll(dir) }() - nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190) + nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190, nil) runTestsData(t, testName, nodes, "mydb", "myrp") } @@ -763,7 +870,7 @@ func TestClientLibrary(t *testing.T) { retentionPolicy := "myrp" now := time.Now().UTC() - nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290) + nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290, nil) createDatabase(t, testName, nodes, database) createRetentionPolicy(t, testName, nodes, database, retentionPolicy) @@ -834,6 +941,117 @@ func TestClientLibrary(t *testing.T) { } } +func Test_ServerSingleGraphiteIntegration(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8390 + testName := "graphite integration" + dir := tempfile() + now := time.Now().UTC().Round(time.Millisecond) + c := main.NewConfig() + g := main.Graphite{ + Enabled: true, + Database: "graphite", + Protocol: "TCP", + } + c.Graphites = append(c.Graphites, g) + + t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + createDatabase(t, testName, nodes, "graphite") + createRetentionPolicy(t, testName, nodes, "graphite", "raw") + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + t.Log("Writing data") + data := []byte(`cpu 23.456 `) + data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...) + data = append(data, '\n') + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano)) + + // query and wait for results + got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + +func Test_ServerSingleGraphiteIntegration_NoDatabase(t *testing.T) { + if testing.Short() { + t.Skip() + } + nNodes := 1 + basePort := 8490 + testName := "graphite integration" + dir := tempfile() + now := time.Now().UTC().Round(time.Millisecond) + c := main.NewConfig() + g := main.Graphite{ + Enabled: true, + Port: 2103, + Protocol: "TCP", + } + c.Graphites = append(c.Graphites, g) + c.Logging.WriteTracing = true + + t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress)) + nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) + + // Connect to the graphite endpoint we just spun up + conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) + if err != nil { + t.Fatal(err) + return + } + + // Need to wait for the database to be created + expected := `{"results":[{"series":[{"columns":["name"],"values":[["graphite"]]}]}]}` + got, ok := queryAndWait(t, nodes, "graphite", `show databases`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } + + // Need to wait for the database to get a default retention policy + expected = `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true]]}]}]}` + got, ok = queryAndWait(t, nodes, "graphite", `show retention policies graphite`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } + + t.Log("Writing data") + data := []byte(`cpu 23.456 `) + data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...) + data = append(data, '\n') + _, err = conn.Write(data) + conn.Close() + if err != nil { + t.Fatal(err) + return + } + + // Wait for data to show up + expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano)) + got, ok = queryAndWait(t, nodes, "graphite", `select * from "graphite"."default".cpu`, expected, 2*time.Second) + if !ok { + t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) + } +} + // helper funcs func errToString(err error) string { @@ -849,4 +1067,5 @@ func mustMarshalJSON(v interface{}) string { panic(e) } return string(b) + } diff --git a/graphite/graphite.go b/graphite/graphite.go index a34d7771ea..8d84d0c487 100644 --- a/graphite/graphite.go +++ b/graphite/graphite.go @@ -16,6 +16,9 @@ const ( // DefaultGraphiteNameSeparator represents the default Graphite field separator. DefaultGraphiteNameSeparator = "." + + // DefaultDatabaseName is the default database that is created if none is specified + DefaultDatabaseName = "graphite" ) var ( @@ -26,16 +29,16 @@ var ( // ErrServerClosed return when closing an already closed graphite server. ErrServerClosed = errors.New("server already closed") - // ErrDatabaseNotSpecified retuned when no database was specified in the config file - ErrDatabaseNotSpecified = errors.New("database was not specified in config") - // ErrServerNotSpecified returned when Server is not specified. ErrServerNotSpecified = errors.New("server not present") ) // SeriesWriter defines the interface for the destination of the data. -type SeriesWriter interface { - WriteSeries(database, retentionPolicy string, points []influxdb.Point) (uint64, error) +type Server interface { + WriteSeries(string, string, []influxdb.Point) (uint64, error) + CreateDatabase(string) error + CreateRetentionPolicy(string, *influxdb.RetentionPolicy) error + DatabaseExists(string) bool } // Parser encapulates a Graphite Parser. diff --git a/graphite/graphite_tcp.go b/graphite/graphite_tcp.go index 8bbac054b0..93df49834b 100644 --- a/graphite/graphite_tcp.go +++ b/graphite/graphite_tcp.go @@ -2,6 +2,7 @@ package graphite import ( "bufio" + "io" "log" "net" "strings" @@ -11,27 +12,40 @@ import ( // TCPServer processes Graphite data received over TCP connections. type TCPServer struct { - writer SeriesWriter + server Server parser *Parser Database string + Logger *log.Logger } // NewTCPServer returns a new instance of a TCPServer. -func NewTCPServer(p *Parser, w SeriesWriter) *TCPServer { +func NewTCPServer(p *Parser, s Server) *TCPServer { return &TCPServer{ parser: p, - writer: w, + server: s, } } +// SetLogOutput sets writer for all Graphite log output. +func (s *TCPServer) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[graphite] ", log.LstdFlags) +} + // ListenAndServe instructs the TCPServer to start processing Graphite data // on the given interface. iface must be in the form host:port func (t *TCPServer) ListenAndServe(iface string) error { if iface == "" { // Make sure we have an address return ErrBindAddressRequired - } else if t.Database == "" { // Make sure they have a database - return ErrDatabaseNotSpecified + } else if t.Database == "" { + // If they didn't specify a database, create one and set a default retention policy. + if !t.server.DatabaseExists(DefaultDatabaseName) { + t.Logger.Printf("default database %q does not exist. creating.\n", DefaultDatabaseName) + if e := t.server.CreateDatabase(DefaultDatabaseName); e != nil { + return e + } + t.Database = DefaultDatabaseName + } } ln, err := net.Listen("tcp", iface) @@ -42,7 +56,7 @@ func (t *TCPServer) ListenAndServe(iface string) error { for { conn, err := ln.Accept() if err != nil { - log.Println("error accepting TCP connection", err.Error()) + t.Logger.Println("error accepting TCP connection", err.Error()) continue } go t.handleConnection(conn) @@ -69,11 +83,14 @@ func (t *TCPServer) handleConnection(conn net.Conn) { // Parse it. point, err := t.parser.Parse(line) if err != nil { - log.Printf("unable to parse data: %s", err) + t.Logger.Printf("unable to parse data: %s", err) continue } // Send the data to database - t.writer.WriteSeries(t.Database, "", []influxdb.Point{point}) + _, e := t.server.WriteSeries(t.Database, "", []influxdb.Point{point}) + if e != nil { + t.Logger.Printf("failed to write data point to database %q: %s\n", t.Database, e) + } } } diff --git a/graphite/graphite_udp.go b/graphite/graphite_udp.go index 6e4656a0b9..9e4f6785dc 100644 --- a/graphite/graphite_udp.go +++ b/graphite/graphite_udp.go @@ -1,6 +1,8 @@ package graphite import ( + "io" + "log" "net" "strings" @@ -13,28 +15,41 @@ const ( // UDPerver processes Graphite data received via UDP. type UDPServer struct { - writer SeriesWriter + server Server parser *Parser Database string + Logger *log.Logger } // NewUDPServer returns a new instance of a UDPServer -func NewUDPServer(p *Parser, w SeriesWriter) *UDPServer { +func NewUDPServer(p *Parser, s Server) *UDPServer { u := UDPServer{ parser: p, - writer: w, + server: s, } return &u } +// SetLogOutput sets writer for all Graphite log output. +func (s *UDPServer) SetLogOutput(w io.Writer) { + s.Logger = log.New(w, "[graphite] ", log.LstdFlags) +} + // ListenAndServer instructs the UDPServer to start processing Graphite data // on the given interface. iface must be in the form host:port. func (u *UDPServer) ListenAndServe(iface string) error { if iface == "" { // Make sure we have an address return ErrBindAddressRequired } else if u.Database == "" { // Make sure they have a database - return ErrDatabaseNotSpecified + // If they didn't specify a database, create one and set a default retention policy. + if !u.server.DatabaseExists(DefaultDatabaseName) { + u.Logger.Printf("default database %q does not exist. creating.\n", DefaultDatabaseName) + if e := u.server.CreateDatabase(DefaultDatabaseName); e != nil { + return e + } + u.Database = DefaultDatabaseName + } } addr, err := net.ResolveUDPAddr("udp", iface) @@ -61,7 +76,10 @@ func (u *UDPServer) ListenAndServe(iface string) error { } // Send the data to database - u.writer.WriteSeries(u.Database, "", []influxdb.Point{point}) + _, e := u.server.WriteSeries(u.Database, "", []influxdb.Point{point}) + if e != nil { + u.Logger.Printf("failed to write data point: %s\n", e) + } } } }() diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 2c6eb74896..84fbd14ee4 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -278,7 +278,7 @@ func TestHandler_RetentionPolicies(t *testing.T) { if status != http.StatusOK { t.Fatalf("unexpected status: %d", status) - } else if body != `{"results":[{"series":[{"columns":["name","duration","replicaN"],"values":[["bar","168h0m0s",1]]}]}]}` { + } else if body != `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["bar","168h0m0s",1,false]]}]}]}` { t.Fatalf("unexpected body: %s", body) } } diff --git a/influxql/INFLUXQL.md b/influxql/INFLUXQL.md index d305df7b3a..a0b5b8746e 100644 --- a/influxql/INFLUXQL.md +++ b/influxql/INFLUXQL.md @@ -576,7 +576,7 @@ select_stmt = fields from_clause [ into_clause ] [ where_clause ] ```sql -- select mean value from the cpu measurement where region = 'uswest' grouped by 10 minute intervals -SELECT mean(value) FROM cpu WHERE region = 'uswest' GROUP BY time(10m); +SELECT mean(value) FROM cpu WHERE region = 'uswest' GROUP BY time(10m) fill(0); ``` ## Clauses @@ -584,7 +584,7 @@ SELECT mean(value) FROM cpu WHERE region = 'uswest' GROUP BY time(10m); ``` from_clause = "FROM" measurements . -group_by_clause = "GROUP BY" dimensions . +group_by_clause = "GROUP BY" dimensions fill(