Merge branch 'master' of https://github.com/influxdb/influxdb into join
Conflicts: cmd/influxd/config.go cmd/influxd/run.gopull/1295/head
commit
d9411c3d84
|
@ -8,9 +8,11 @@ import (
|
|||
"os/user"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/influxdb/influxdb/graphite"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -51,13 +53,9 @@ type Config struct {
|
|||
Assets string `toml:"assets"`
|
||||
} `toml:"admin"`
|
||||
|
||||
Graphites []Graphite `toml:"graphite"`
|
||||
|
||||
InputPlugins struct {
|
||||
Graphite struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
Port int `toml:"port"`
|
||||
Database string `toml:"database"`
|
||||
UDPEnabled bool `toml:"udp_enabled"`
|
||||
} `toml:"graphite"`
|
||||
UDPInput struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
Port int `toml:"port"`
|
||||
|
@ -105,6 +103,16 @@ type Config struct {
|
|||
} `toml:"logging"`
|
||||
}
|
||||
|
||||
type Graphite struct {
|
||||
Addr string `toml:"address"`
|
||||
Database string `toml:"database"`
|
||||
Enabled bool `toml:"enabled"`
|
||||
Port uint16 `toml:"port"`
|
||||
Protocol string `toml:"protocol"`
|
||||
NamePosition string `toml:"name-position"`
|
||||
NameSeparator string `toml:"name-separator"`
|
||||
}
|
||||
|
||||
// NewConfig returns an instance of Config with reasonable defaults.
|
||||
func NewConfig() *Config {
|
||||
u, _ := user.Current()
|
||||
|
@ -261,5 +269,37 @@ func ParseConfig(s string) (*Config, error) {
|
|||
return c, nil
|
||||
}
|
||||
|
||||
// ConnnectionString returns the connection string for this Graphite config in the form host:port.
|
||||
func (g *Graphite) ConnectionString(defaultBindAddr string) string {
|
||||
|
||||
addr := g.Addr
|
||||
// If no address specified, use default.
|
||||
if addr == "" {
|
||||
addr = defaultBindAddr
|
||||
}
|
||||
|
||||
port := g.Port
|
||||
// If no port specified, use default.
|
||||
if port == 0 {
|
||||
port = graphite.DefaultGraphitePort
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%d", addr, port)
|
||||
}
|
||||
|
||||
// NameSeparatorString returns the character separating fields for Graphite data, or the default
|
||||
// if no separator is set.
|
||||
func (g *Graphite) NameSeparatorString() string {
|
||||
if g.NameSeparator == "" {
|
||||
return graphite.DefaultGraphiteNameSeparator
|
||||
}
|
||||
return g.NameSeparator
|
||||
}
|
||||
|
||||
// LastEnabled returns whether the Graphite Server shoudl intepret the last field as "name".
|
||||
func (g *Graphite) LastEnabled() bool {
|
||||
return g.NamePosition == strings.ToLower("last")
|
||||
}
|
||||
|
||||
// maxInt is the largest integer representable by a word (architeture dependent).
|
||||
const maxInt = int64(^uint(0) >> 1)
|
||||
|
|
|
@ -2,10 +2,11 @@ package main_test
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/cmd/influxd"
|
||||
main "github.com/influxdb/influxdb/cmd/influxd"
|
||||
)
|
||||
|
||||
// Ensure that megabyte sizes can be parsed.
|
||||
|
@ -61,12 +62,40 @@ func TestParseConfig(t *testing.T) {
|
|||
t.Fatalf("data port mismatch: %v", c.Data.Port)
|
||||
}
|
||||
|
||||
if c.InputPlugins.Graphite.Enabled != false {
|
||||
t.Fatalf("graphite enabled mismatch: %v", c.InputPlugins.Graphite.Enabled)
|
||||
} else if c.InputPlugins.Graphite.Port != 2003 {
|
||||
t.Fatalf("graphite port mismatch: %v", c.InputPlugins.Graphite.Enabled)
|
||||
} else if c.InputPlugins.Graphite.Database != "" {
|
||||
t.Fatalf("graphite database mismatch: %v", c.InputPlugins.Graphite.Database)
|
||||
if len(c.Graphites) != 2 {
|
||||
t.Fatalf("graphites mismatch. expected %v, got: %v", 2, len(c.Graphites))
|
||||
}
|
||||
|
||||
tcpGraphite := c.Graphites[0]
|
||||
switch {
|
||||
case tcpGraphite.Enabled != true:
|
||||
t.Fatalf("graphite tcp enabled mismatch: expected: %v, got %v", true, tcpGraphite.Enabled)
|
||||
case tcpGraphite.Addr != "192.168.0.1":
|
||||
t.Fatalf("graphite tcp address mismatch: expected %v, got %v", "192.168.0.1", tcpGraphite.Addr)
|
||||
case tcpGraphite.Port != 2003:
|
||||
t.Fatalf("graphite tcp port mismatch: expected %v, got %v", 2003, tcpGraphite.Port)
|
||||
case tcpGraphite.Database != "graphite_tcp":
|
||||
t.Fatalf("graphite tcp database mismatch: expected %v, got %v", "graphite_tcp", tcpGraphite.Database)
|
||||
case strings.ToLower(tcpGraphite.Protocol) != "tcp":
|
||||
t.Fatalf("graphite tcp protocol mismatch: expected %v, got %v", "tcp", strings.ToLower(tcpGraphite.Protocol))
|
||||
case tcpGraphite.LastEnabled() != true:
|
||||
t.Fatalf("graphite tcp name-position mismatch: expected %v, got %v", "last", tcpGraphite.NamePosition)
|
||||
case tcpGraphite.NameSeparatorString() != "-":
|
||||
t.Fatalf("graphite tcp name-separator mismatch: expected %v, got %v", "-", tcpGraphite.NameSeparatorString())
|
||||
}
|
||||
|
||||
udpGraphite := c.Graphites[1]
|
||||
switch {
|
||||
case udpGraphite.Enabled != true:
|
||||
t.Fatalf("graphite udp enabled mismatch: expected: %v, got %v", true, udpGraphite.Enabled)
|
||||
case udpGraphite.Addr != "192.168.0.2":
|
||||
t.Fatalf("graphite udp address mismatch: expected %v, got %v", "192.168.0.2", udpGraphite.Addr)
|
||||
case udpGraphite.Port != 2005:
|
||||
t.Fatalf("graphite udp port mismatch: expected %v, got %v", 2005, udpGraphite.Port)
|
||||
case udpGraphite.Database != "graphite_udp":
|
||||
t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite_udp", udpGraphite.Database)
|
||||
case strings.ToLower(udpGraphite.Protocol) != "udp":
|
||||
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
|
||||
}
|
||||
|
||||
if c.Broker.Port != 8090 {
|
||||
|
@ -139,17 +168,31 @@ read-timeout = "5s"
|
|||
|
||||
[input_plugins]
|
||||
|
||||
# Configure the graphite api
|
||||
[input_plugins.graphite]
|
||||
enabled = false
|
||||
port = 2003
|
||||
database = "" # store graphite data in this database
|
||||
|
||||
[input_plugins.udp]
|
||||
enabled = true
|
||||
port = 4444
|
||||
database = "test"
|
||||
|
||||
# Configure the Graphite servers
|
||||
[[graphite]]
|
||||
protocol = "TCP"
|
||||
enabled = true
|
||||
address = "192.168.0.1"
|
||||
port = 2003
|
||||
database = "graphite_tcp" # store graphite data in this database
|
||||
name-position = "last"
|
||||
name-separator = "-"
|
||||
|
||||
[[graphite]]
|
||||
protocol = "udP"
|
||||
enabled = true
|
||||
address = "192.168.0.2"
|
||||
port = 2005
|
||||
database = "graphite_udp" # store graphite data in this database
|
||||
|
||||
# Raft configuration
|
||||
[raft]
|
||||
# The raft port should be open between all servers in a cluster.
|
||||
# Broker configuration
|
||||
[broker]
|
||||
# The broker port should be open between all servers in a cluster.
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/graphite"
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
)
|
||||
|
||||
|
@ -69,6 +70,37 @@ func execRun(args []string) {
|
|||
go func() { log.Fatal(http.ListenAndServe(config.DataAddr(), sh)) }()
|
||||
}
|
||||
log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr())
|
||||
|
||||
// Spin up any Graphite servers
|
||||
for _, c := range config.Graphites {
|
||||
if !c.Enabled {
|
||||
continue
|
||||
}
|
||||
|
||||
// Configure Graphite parsing.
|
||||
parser := graphite.NewParser()
|
||||
parser.Separator = c.NameSeparatorString()
|
||||
parser.LastEnabled = c.LastEnabled()
|
||||
|
||||
// Start the relevant server.
|
||||
if strings.ToLower(c.Protocol) == "tcp" {
|
||||
g := graphite.NewTCPServer(parser, s)
|
||||
g.Database = c.Database
|
||||
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
|
||||
if err != nil {
|
||||
log.Println("failed to start TCP Graphite Server", err.Error())
|
||||
}
|
||||
} else if strings.ToLower(c.Protocol) == "udp" {
|
||||
g := graphite.NewUDPServer(parser, s)
|
||||
g.Database = c.Database
|
||||
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
|
||||
if err != nil {
|
||||
log.Println("failed to start UDP Graphite Server", err.Error())
|
||||
}
|
||||
} else {
|
||||
log.Fatalf("unrecognized Graphite Server prototcol", c.Protocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait indefinitely.
|
||||
|
@ -164,7 +196,7 @@ func openServer(path string, u *url.URL, b *messaging.Broker, initializing bool,
|
|||
// Create and open the server.
|
||||
s := influxdb.NewServer()
|
||||
if err := s.Open(path); err != nil {
|
||||
log.Fatalf("failed to open data server", err.Error())
|
||||
log.Fatalf("failed to open data server: %v", err.Error())
|
||||
}
|
||||
|
||||
// If the server is uninitialized then initialize or join it.
|
||||
|
@ -188,7 +220,7 @@ func initializeServer(s *influxdb.Server, b *messaging.Broker) {
|
|||
|
||||
// Create replica on broker.
|
||||
if err := b.CreateReplica(1); err != nil {
|
||||
log.Fatalf("replica creation error: %d", err)
|
||||
log.Fatalf("replica creation error: %s", err)
|
||||
}
|
||||
|
||||
// Create messaging client.
|
||||
|
|
|
@ -507,7 +507,7 @@ func (d *database) createMeasurementIfNotExists(name string) *Measurement {
|
|||
|
||||
// AddField adds a field to the measurement name. Returns false if already present
|
||||
func (d *database) AddField(name string, f *Field) bool {
|
||||
panic("not implemented")
|
||||
if true { panic("not implemented") }
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
|
|||
idx.addSeriesToIndex(m.Name, s)
|
||||
mm := idx.MeasurementBySeriesID(uint32(1))
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
|
||||
}
|
||||
|
||||
// now test that we can add another
|
||||
|
@ -44,12 +44,12 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
|
|||
idx.addSeriesToIndex(m.Name, s)
|
||||
mm = idx.MeasurementBySeriesID(uint32(2))
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
|
||||
}
|
||||
|
||||
mm = idx.MeasurementBySeriesID(uint32(1))
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,7 +81,7 @@ func TestDatabase_SeriesBySeriesID(t *testing.T) {
|
|||
idx.addSeriesToIndex("foo", s)
|
||||
ss := idx.SeriesByID(uint32(2))
|
||||
if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
t.Fatalf("series not equal:\n%v\n%v", s, ss)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,9 +100,9 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
|
|||
idx.addSeriesToIndex(m.Name, s)
|
||||
mm, ss := idx.MeasurementAndSeries(m.Name, s.Tags)
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
|
||||
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
t.Fatalf("series not equal:\n%v\n%v", s, ss)
|
||||
}
|
||||
|
||||
// now test that we can add another
|
||||
|
@ -113,9 +113,9 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
|
|||
idx.addSeriesToIndex(m.Name, s)
|
||||
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)
|
||||
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
|
||||
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
|
||||
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
|
||||
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
|
||||
t.Fatalf("series not equal:\n%s\n%s", s, ss)
|
||||
t.Fatalf("series not equal:\n%v\n%v", s, ss)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
|
|||
l := idx.SeriesIDs([]string{"cpu_load"}, nil)
|
||||
r := []uint32{1, 2}
|
||||
if !l.Equals(r) {
|
||||
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
|
||||
t.Fatalf("series IDs not the same:\n%d\n%d", l, r)
|
||||
}
|
||||
|
||||
// now add another in a different measurement
|
||||
|
@ -165,7 +165,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
|
|||
l = idx.SeriesIDs([]string{"cpu_load"}, nil)
|
||||
r = []uint32{1, 2, 3}
|
||||
if !l.Equals(r) {
|
||||
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
|
||||
t.Fatalf("series IDs not the same:\n%d\n%d", l, r)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -563,7 +563,7 @@ func TestDatabase_SeriesIDsIntersect(t *testing.T) {
|
|||
for i, tt := range tests {
|
||||
a := SeriesIDs(tt.left).Intersect(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s intersect %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
t.Fatalf("%d: %v intersect %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -620,7 +620,7 @@ func TestDatabase_SeriesIDsUnion(t *testing.T) {
|
|||
for i, tt := range tests {
|
||||
a := SeriesIDs(tt.left).Union(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s union %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
t.Fatalf("%d: %v union %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -677,7 +677,7 @@ func TestDatabase_SeriesIDsReject(t *testing.T) {
|
|||
for i, tt := range tests {
|
||||
a := SeriesIDs(tt.left).Reject(tt.right)
|
||||
if !a.Equals(tt.expected) {
|
||||
t.Fatalf("%d: %s reject %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
t.Fatalf("%d: %v reject %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,8 +35,8 @@ port = 8083 # binding is disabled if the port isn't set
|
|||
# Configure the http api
|
||||
[api]
|
||||
port = 8086 # binding is disabled if the node is not a Data node.
|
||||
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
|
||||
# ssl-cert = /path/to/cert.pem
|
||||
# ssl-port = 8084 # SSL support is enabled if you set a port and cert
|
||||
# ssl-cert = "/path/to/cert.pem"
|
||||
|
||||
# connections will timeout after this amount of time. Ensures that clients that misbehave
|
||||
# and keep alive connections they don't use won't end up connection a million times.
|
||||
|
@ -45,14 +45,6 @@ read-timeout = "5s"
|
|||
|
||||
[input_plugins]
|
||||
|
||||
# Configure the graphite api
|
||||
[input_plugins.graphite]
|
||||
enabled = false
|
||||
# address = "0.0.0.0" # If not set, is actually set to bind-address.
|
||||
# port = 2003
|
||||
# database = "" # store graphite data in this database
|
||||
# udp_enabled = true # enable udp interface on the same port as the tcp interface
|
||||
|
||||
# Configure the collectd api
|
||||
[input_plugins.collectd]
|
||||
enabled = false
|
||||
|
@ -77,6 +69,14 @@ read-timeout = "5s"
|
|||
# port = 5551
|
||||
# database = "db1"
|
||||
|
||||
# Configure the Graphite plugins.
|
||||
[[graphite]] # 1 or more of these sections may be present.
|
||||
enabled = false
|
||||
# protocol = "" # Set to "tcp" or "udp"
|
||||
# address = "0.0.0.0" # If not set, is actually set to bind-address.
|
||||
# port = 2003
|
||||
# database = "" # store graphite data in this database
|
||||
|
||||
# Raft configuration
|
||||
[raft]
|
||||
# The raft port should be open between all servers in a cluster.
|
||||
|
|
300
graphite.go
300
graphite.go
|
@ -1,300 +0,0 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
log "code.google.com/p/log4go"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBindAddressRequired is returned when starting the GraphiteServer
|
||||
// without a TCP or UDP listening address.
|
||||
ErrBindAddressRequired = errors.New("bind address required")
|
||||
|
||||
// ErrGraphiteServerClosed return when closing an already closed graphite server.
|
||||
ErrGraphiteServerClosed = errors.New("graphite server already closed")
|
||||
)
|
||||
|
||||
// GraphiteListener provides a tcp and/or udp listener that you can
|
||||
// use to ingest metrics into influxdb via the graphite protocol. it
|
||||
// behaves as a carbon daemon, except:
|
||||
//
|
||||
// no rounding of timestamps to the nearest interval. Upon ingestion
|
||||
// of multiple datapoints for a given key within the same interval
|
||||
// (possibly but not necessarily the same timestamp), graphite would
|
||||
// use one (the latest received) value with a rounded timestamp
|
||||
// representing that interval. We store values for every timestamp we
|
||||
// receive (only the latest value for a given metric-timestamp pair)
|
||||
// so it's up to the user to feed the data in proper intervals (and
|
||||
// use round intervals if you plan to rely on that)
|
||||
type GraphiteServer struct {
|
||||
server *Server
|
||||
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
done chan struct{} // close notification
|
||||
|
||||
// The TCP address to listen on.
|
||||
TCPAddr *net.TCPAddr
|
||||
|
||||
// The UDP address to listen on.
|
||||
UDPAddr *net.UDPAddr
|
||||
|
||||
// The name of the database to insert data into.
|
||||
Database string
|
||||
|
||||
// The cluster admin authorized to insert the data.
|
||||
User *User
|
||||
}
|
||||
|
||||
// NewGraphiteServer returns an instance of GraphiteServer attached to a Server.
|
||||
func NewGraphiteServer(server *Server) *GraphiteServer {
|
||||
return &GraphiteServer{server: server}
|
||||
}
|
||||
|
||||
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
|
||||
func (s *GraphiteServer) ListenAndServe() error {
|
||||
// Make sure we have either a TCP address or a UDP address.
|
||||
// Also validate that there is an admin user to insert data as.
|
||||
if s.TCPAddr == nil && s.UDPAddr == nil {
|
||||
return ErrBindAddressRequired
|
||||
} else if s.User != nil {
|
||||
return ErrUserNotFound
|
||||
}
|
||||
|
||||
// Create a new close notification channel.
|
||||
done := make(chan struct{}, 0)
|
||||
s.done = done
|
||||
|
||||
// Open the TCP connection.
|
||||
if s.TCPAddr != nil {
|
||||
l, err := net.ListenTCP("tcp", s.TCPAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = l.Close() }()
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.serveTCP(l, done)
|
||||
}
|
||||
|
||||
// Open the UDP connection.
|
||||
if s.UDPAddr != nil {
|
||||
l, err := net.ListenUDP("udp", s.UDPAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = l.Close() }()
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.serveUDP(l, done)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// serveTCP handles incoming TCP connection requests.
|
||||
func (s *GraphiteServer) serveTCP(l *net.TCPListener, done chan struct{}) {
|
||||
defer s.wg.Done()
|
||||
|
||||
// Listen for server close.
|
||||
go func() {
|
||||
<-done
|
||||
l.Close()
|
||||
}()
|
||||
|
||||
// Listen for new TCP connections.
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
// TODO(benbjohnson): Check for connection closed.
|
||||
log.Error("GraphiteServer: Accept: ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.handleTCPConn(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *GraphiteServer) handleTCPConn(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
defer s.wg.Done()
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
for {
|
||||
err := s.handleMessage(reader)
|
||||
if err != nil {
|
||||
if io.EOF == err {
|
||||
log.Debug("GraphiteServer: Client closed graphite connection")
|
||||
return
|
||||
}
|
||||
log.Error("GraphiteServer:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serveUDP handles incoming UDP messages.
|
||||
func (s *GraphiteServer) serveUDP(conn *net.UDPConn, done chan struct{}) {
|
||||
defer s.wg.Done()
|
||||
|
||||
// Listen for server close.
|
||||
go func() {
|
||||
<-done
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
buf := make([]byte, 65536)
|
||||
for {
|
||||
// Read from connection.
|
||||
n, _, err := conn.ReadFromUDP(buf)
|
||||
if err == io.EOF {
|
||||
return
|
||||
} else if err != nil {
|
||||
log.Warn("GraphiteServer: Error when reading from UDP connection %s", err.Error())
|
||||
}
|
||||
|
||||
// Read in data in a separate goroutine.
|
||||
s.wg.Add(1)
|
||||
go s.handleUDPMessage(string(buf[:n]))
|
||||
}
|
||||
}
|
||||
|
||||
// handleUDPMessage splits a UDP packet by newlines and processes each message.
|
||||
func (s *GraphiteServer) handleUDPMessage(msg string) {
|
||||
defer s.wg.Done()
|
||||
for _, metric := range strings.Split(msg, "\n") {
|
||||
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the server's listeners.
|
||||
func (s *GraphiteServer) Close() error {
|
||||
// Notify other goroutines of shutdown.
|
||||
s.mu.Lock()
|
||||
if s.done == nil {
|
||||
s.mu.Unlock()
|
||||
return ErrGraphiteServerClosed
|
||||
}
|
||||
close(s.done)
|
||||
s.done = nil
|
||||
s.mu.Unlock()
|
||||
|
||||
// Wait for all goroutines to shutdown.
|
||||
s.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleMessage decodes a graphite message from the reader and sends it to the
|
||||
// committer goroutine.
|
||||
func (s *GraphiteServer) handleMessage(r *bufio.Reader) error {
|
||||
panic("not yet implemented: GraphiteServer.handleMessage()")
|
||||
/* TEMPORARILY REMOVED FOR PROTOBUFS.
|
||||
// Decode graphic metric.
|
||||
m, err := decodeGraphiteMetric(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// Convert metric to a field value.
|
||||
v := &protocol.FieldValue{}
|
||||
if m.isInt {
|
||||
v.Int64Value = &m.integerValue
|
||||
} else {
|
||||
v.DoubleValue = &m.floatValue
|
||||
}
|
||||
|
||||
// Use a single sequence number to make sure last write wins.
|
||||
sn := uint64(1)
|
||||
|
||||
// Send data point to committer.
|
||||
p := &protocol.Point{
|
||||
Timestamp: &m.timestamp,
|
||||
Values: []*protocol.FieldValue{v},
|
||||
SequenceNumber: &sn,
|
||||
}
|
||||
|
||||
// Write data to server.
|
||||
series := &protocol.Series{
|
||||
Name: &m.name,
|
||||
Fields: []string{"value"},
|
||||
Points: []*protocol.Point{p},
|
||||
}
|
||||
|
||||
// TODO: Validate user.
|
||||
|
||||
// Look up database.
|
||||
db := s.server.Database(s.Database)
|
||||
if db == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
|
||||
// Write series data to database.
|
||||
if err := db.WriteSeries(series); err != nil {
|
||||
return fmt.Errorf("write series data: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type graphiteMetric struct {
|
||||
name string
|
||||
isInt bool
|
||||
integerValue int64
|
||||
floatValue float64
|
||||
timestamp int64
|
||||
}
|
||||
|
||||
// returns err == io.EOF when we hit EOF without any further data
|
||||
func decodeGraphiteMetric(r *bufio.Reader) (*graphiteMetric, error) {
|
||||
// Read up to the next newline.
|
||||
buf, err := r.ReadBytes('\n')
|
||||
str := strings.TrimSpace(string(buf))
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
|
||||
}
|
||||
if str == "" {
|
||||
return nil, err
|
||||
}
|
||||
// else we got EOF but also data, so just try to process it as valid data
|
||||
}
|
||||
|
||||
// Break into 3 fields (name, value, timestamp).
|
||||
fields := strings.Fields(str)
|
||||
if len(fields) != 3 {
|
||||
return nil, fmt.Errorf("received '%s' which doesn't have three fields", str)
|
||||
}
|
||||
|
||||
// Create a metric.
|
||||
m := &graphiteMetric{name: fields[0]}
|
||||
|
||||
// Parse value.
|
||||
v, err := strconv.ParseFloat(fields[1], 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Determine if value is a float or an int.
|
||||
if i := int64(v); float64(i) == v {
|
||||
m.integerValue, m.isInt = int64(v), true
|
||||
} else {
|
||||
m.floatValue = v
|
||||
}
|
||||
|
||||
// Parse timestamp.
|
||||
timestamp, err := strconv.ParseUint(fields[2], 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.timestamp = int64(timestamp) * int64(time.Millisecond)
|
||||
|
||||
return m, nil
|
||||
*/
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultGraphitePort represents the default Graphite (Carbon) plaintext port.
|
||||
DefaultGraphitePort = 2003
|
||||
|
||||
// DefaultGraphiteNameSeparator represents the default Graphite field separator.
|
||||
DefaultGraphiteNameSeparator = "."
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrBindAddressRequired is returned when starting the Server
|
||||
// without a TCP or UDP listening address.
|
||||
ErrBindAddressRequired = errors.New("bind address required")
|
||||
|
||||
// ErrServerClosed return when closing an already closed graphite server.
|
||||
ErrServerClosed = errors.New("server already closed")
|
||||
|
||||
// ErrDatabaseNotSpecified retuned when no database was specified in the config file
|
||||
ErrDatabaseNotSpecified = errors.New("database was not specified in config")
|
||||
|
||||
// ErrServerNotSpecified returned when Server is not specified.
|
||||
ErrServerNotSpecified = errors.New("server not present")
|
||||
)
|
||||
|
||||
// SeriesWriter defines the interface for the destination of the data.
|
||||
type SeriesWriter interface {
|
||||
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
|
||||
}
|
||||
|
||||
// Metric represents a metric as processed by the Graphite parser.
|
||||
type Metric struct {
|
||||
Name string
|
||||
Tags map[string]string
|
||||
Value interface{}
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Parser encapulates a Graphite Parser.
|
||||
type Parser struct {
|
||||
Separator string
|
||||
LastEnabled bool
|
||||
}
|
||||
|
||||
// NewParser returns a GraphiteParser instance.
|
||||
func NewParser() *Parser {
|
||||
return &Parser{Separator: DefaultGraphiteNameSeparator}
|
||||
}
|
||||
|
||||
// Parse performs Graphite parsing of a single line.
|
||||
func (p *Parser) Parse(line string) (*Metric, error) {
|
||||
// Break into 3 fields (name, value, timestamp).
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) != 3 {
|
||||
return nil, fmt.Errorf("received %q which doesn't have three fields", line)
|
||||
}
|
||||
|
||||
m := new(Metric)
|
||||
// decode the name and tags
|
||||
name, tags, err := p.DecodeNameAndTags(fields[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.Name = name
|
||||
m.Tags = tags
|
||||
|
||||
// Parse value.
|
||||
v, err := strconv.ParseFloat(fields[1], 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Determine if value is a float or an int.
|
||||
if i := int64(v); float64(i) == v {
|
||||
m.Value = int64(v)
|
||||
} else {
|
||||
m.Value = v
|
||||
}
|
||||
|
||||
// Parse timestamp.
|
||||
unixTime, err := strconv.ParseInt(fields[2], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m.Timestamp = time.Unix(0, unixTime*int64(time.Millisecond))
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
|
||||
func (p *Parser) DecodeNameAndTags(field string) (string, map[string]string, error) {
|
||||
var (
|
||||
name string
|
||||
tags = make(map[string]string)
|
||||
)
|
||||
|
||||
// decode the name and tags
|
||||
values := strings.Split(field, p.Separator)
|
||||
if len(values)%2 != 1 {
|
||||
// There should always be an odd number of fields to map a metric name and tags
|
||||
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu
|
||||
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field)
|
||||
}
|
||||
|
||||
if p.LastEnabled {
|
||||
name = values[len(values)-1]
|
||||
values = values[0 : len(values)-1]
|
||||
} else {
|
||||
name = values[0]
|
||||
values = values[1:len(values)]
|
||||
}
|
||||
|
||||
if name == "" {
|
||||
return name, tags, fmt.Errorf("no name specified for metric. %q", field)
|
||||
}
|
||||
|
||||
// Grab the pairs and throw them in the map
|
||||
for i := 0; i < len(values); i += 2 {
|
||||
k := values[i]
|
||||
v := values[i+1]
|
||||
tags[k] = v
|
||||
}
|
||||
|
||||
return name, tags, nil
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"log"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// TCPServer processes Graphite data received over TCP connections.
|
||||
type TCPServer struct {
|
||||
writer SeriesWriter
|
||||
parser *Parser
|
||||
|
||||
Database string
|
||||
}
|
||||
|
||||
// NewTCPServer returns a new instance of a TCPServer.
|
||||
func NewTCPServer(p *Parser, w SeriesWriter) *TCPServer {
|
||||
return &TCPServer{
|
||||
parser: p,
|
||||
writer: w,
|
||||
}
|
||||
}
|
||||
|
||||
// ListenAndServe instructs the TCPServer to start processing Graphite data
|
||||
// on the given interface. iface must be in the form host:port
|
||||
func (t *TCPServer) ListenAndServe(iface string) error {
|
||||
if iface == "" { // Make sure we have an address
|
||||
return ErrBindAddressRequired
|
||||
} else if t.Database == "" { // Make sure they have a database
|
||||
return ErrDatabaseNotSpecified
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", iface)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
log.Println("error accepting TCP connection", err.Error())
|
||||
continue
|
||||
}
|
||||
go t.handleConnection(conn)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleConnection services an individual TCP connection.
|
||||
func (t *TCPServer) handleConnection(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
for {
|
||||
// Read up to the next newline.
|
||||
buf, err := reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Trim the buffer, even though there should be no padding
|
||||
line := strings.TrimSpace(string(buf))
|
||||
|
||||
// Parse it.
|
||||
metric, err := t.parser.Parse(line)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert metric to a field value.
|
||||
var values = make(map[string]interface{})
|
||||
values[metric.Name] = metric.Value
|
||||
|
||||
// Send the data to database
|
||||
t.writer.WriteSeries(t.Database, "", metric.Name, metric.Tags, metric.Timestamp, values)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,250 @@
|
|||
package graphite_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/graphite"
|
||||
)
|
||||
|
||||
func Test_DecodeNameAndTags(t *testing.T) {
|
||||
var tests = []struct {
|
||||
test string
|
||||
str string
|
||||
name string
|
||||
tags map[string]string
|
||||
position string
|
||||
separator string
|
||||
err string
|
||||
}{
|
||||
{test: "metric only", str: "cpu", name: "cpu"},
|
||||
{test: "metric with single series", str: "cpu.hostname.server01", name: "cpu", tags: map[string]string{"hostname": "server01"}},
|
||||
{test: "metric with multiple series", str: "cpu.region.us-west.hostname.server01", name: "cpu", tags: map[string]string{"hostname": "server01", "region": "us-west"}},
|
||||
{test: "no metric", tags: make(map[string]string), err: `no name specified for metric. ""`},
|
||||
{test: "wrong metric format", str: "foo.cpu", tags: make(map[string]string), err: `received "foo.cpu" which doesn't conform to format of key.value.key.value.metric or metric`},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q...", test.test)
|
||||
|
||||
p := graphite.NewParser()
|
||||
if test.separator != "" {
|
||||
p.Separator = test.separator
|
||||
}
|
||||
|
||||
name, tags, err := p.DecodeNameAndTags(test.str)
|
||||
if errstr(err) != test.err {
|
||||
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
|
||||
}
|
||||
if name != test.name {
|
||||
t.Fatalf("name parse failer. expected %v, got %v", test.name, name)
|
||||
}
|
||||
if len(tags) != len(test.tags) {
|
||||
t.Fatalf("unexpected number of tags. expected %d, got %d", len(test.tags), len(tags))
|
||||
}
|
||||
for k, v := range test.tags {
|
||||
if tags[k] != v {
|
||||
t.Fatalf("unexpected tag value for tags[%s]. expected %q, got %q", k, v, tags[k])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_DecodeMetric(t *testing.T) {
|
||||
testTime := time.Now()
|
||||
epochTime := testTime.UnixNano() / 1000000 // nanos to milliseconds
|
||||
strTime := strconv.FormatInt(epochTime, 10)
|
||||
|
||||
var tests = []struct {
|
||||
test string
|
||||
line string
|
||||
name string
|
||||
tags map[string]string
|
||||
isInt bool
|
||||
iv int64
|
||||
fv float64
|
||||
timestamp time.Time
|
||||
position, separator string
|
||||
err string
|
||||
}{
|
||||
{
|
||||
test: "position first by default",
|
||||
line: `cpu.foo.bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "position first if unable to determine",
|
||||
position: "foo",
|
||||
line: `cpu.foo.bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "position last if specified",
|
||||
position: "last",
|
||||
line: `foo.bar.cpu 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "position first if specified with no series",
|
||||
position: "first",
|
||||
line: `cpu 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "position last if specified with no series",
|
||||
position: "last",
|
||||
line: `cpu 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "sepeartor is . by default",
|
||||
line: `cpu.foo.bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "sepeartor is . if specified",
|
||||
separator: ".",
|
||||
line: `cpu.foo.bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "sepeartor is - if specified",
|
||||
separator: "-",
|
||||
line: `cpu-foo-bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "sepeartor is boo if specified",
|
||||
separator: "boo",
|
||||
line: `cpuboofooboobar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
|
||||
{
|
||||
test: "series + metric + integer value",
|
||||
line: `cpu.foo.bar 50 ` + strTime,
|
||||
name: "cpu",
|
||||
tags: map[string]string{"foo": "bar"},
|
||||
isInt: true,
|
||||
iv: 50,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "metric only with float value",
|
||||
line: `cpu 50.554 ` + strTime,
|
||||
name: "cpu",
|
||||
isInt: false,
|
||||
fv: 50.554,
|
||||
timestamp: testTime,
|
||||
},
|
||||
{
|
||||
test: "missing metric",
|
||||
line: `50.554 1419972457825`,
|
||||
err: `received "50.554 1419972457825" which doesn't have three fields`,
|
||||
},
|
||||
{
|
||||
test: "should fail on invalid key",
|
||||
line: `foo.cpu 50.554 1419972457825`,
|
||||
err: `received "foo.cpu" which doesn't conform to format of key.value.key.value.metric or metric`,
|
||||
},
|
||||
{
|
||||
test: "should fail parsing invalid float",
|
||||
line: `cpu 50.554z 1419972457825`,
|
||||
err: `strconv.ParseFloat: parsing "50.554z": invalid syntax`,
|
||||
},
|
||||
{
|
||||
test: "should fail parsing invalid int",
|
||||
line: `cpu 50z 1419972457825`,
|
||||
err: `strconv.ParseFloat: parsing "50z": invalid syntax`,
|
||||
},
|
||||
{
|
||||
test: "should fail parsing invalid time",
|
||||
line: `cpu 50.554 14199724z57825`,
|
||||
err: `strconv.ParseInt: parsing "14199724z57825": invalid syntax`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Logf("testing %q...", test.test)
|
||||
|
||||
p := graphite.NewParser()
|
||||
if test.separator != "" {
|
||||
p.Separator = test.separator
|
||||
}
|
||||
p.LastEnabled = (test.position == "last")
|
||||
|
||||
m, err := p.Parse(test.line)
|
||||
if errstr(err) != test.err {
|
||||
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
|
||||
}
|
||||
if err != nil {
|
||||
// If we erred out,it was intended and the following tests won't work
|
||||
continue
|
||||
}
|
||||
if m.Name != test.name {
|
||||
t.Fatalf("name parse failer. expected %v, got %v", test.name, m.Name)
|
||||
}
|
||||
if len(m.Tags) != len(test.tags) {
|
||||
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(m.Tags))
|
||||
}
|
||||
if test.isInt {
|
||||
i := m.Value.(int64)
|
||||
if i != test.iv {
|
||||
t.Fatalf("integerValue value mismatch. expected %v, got %v", test.iv, m.Value)
|
||||
}
|
||||
} else {
|
||||
f := m.Value.(float64)
|
||||
if m.Value != f {
|
||||
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.fv, f)
|
||||
}
|
||||
}
|
||||
if m.Timestamp.UnixNano()/1000000 != test.timestamp.UnixNano()/1000000 {
|
||||
t.Fatalf("timestamp value mismatch. expected %v, got %v", test.timestamp.UnixNano(), m.Timestamp.UnixNano())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test Helpers
|
||||
func errstr(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
}
|
||||
return ""
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package graphite
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
udpBufferSize = 65536
|
||||
)
|
||||
|
||||
// UDPerver processes Graphite data received via UDP.
|
||||
type UDPServer struct {
|
||||
writer SeriesWriter
|
||||
parser *Parser
|
||||
|
||||
Database string
|
||||
}
|
||||
|
||||
// NewUDPServer returns a new instance of a UDPServer
|
||||
func NewUDPServer(p *Parser, w SeriesWriter) *UDPServer {
|
||||
u := UDPServer{
|
||||
parser: p,
|
||||
writer: w,
|
||||
}
|
||||
return &u
|
||||
}
|
||||
|
||||
// ListenAndServer instructs the UDPServer to start processing Graphite data
|
||||
// on the given interface. iface must be in the form host:port.
|
||||
func (u *UDPServer) ListenAndServe(iface string) error {
|
||||
if iface == "" { // Make sure we have an address
|
||||
return ErrBindAddressRequired
|
||||
} else if u.Database == "" { // Make sure they have a database
|
||||
return ErrDatabaseNotSpecified
|
||||
}
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp", iface)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP("udp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, udpBufferSize)
|
||||
go func() {
|
||||
for {
|
||||
n, _, err := conn.ReadFromUDP(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, line := range strings.Split(string(buf[:n]), "\n") {
|
||||
m, err := u.parser.Parse(line)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert metric to a field value.
|
||||
var values = make(map[string]interface{})
|
||||
values[m.Name] = m.Value
|
||||
|
||||
// Send the data to database
|
||||
u.writer.WriteSeries(u.Database, "", m.Name, m.Tags, m.Timestamp, values)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
|
@ -568,7 +568,7 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
|
|||
|
||||
status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":""}`)
|
||||
if status != http.StatusInternalServerError {
|
||||
t.Fatalf("unexpected status: %d", status, body)
|
||||
t.Fatalf("unexpected status: %d, %s", status, body)
|
||||
} else if body != `data node url required` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
# The Influx Query Language Specification
|
||||
|
||||
## Introduction
|
||||
|
||||
This is a reference for the Influx Query Language ("InfluxQL").
|
||||
|
||||
InfluxQL is a SQL-like query language for interacting with InfluxDB. It was lovingly crafted to feel familiar to those coming from other
|
||||
SQL or SQL-like environments while providing features specific to storing
|
||||
and analyzing time series data.
|
||||
|
||||
## Notation
|
||||
|
||||
This specification uses the same notation used by Google's Go programming language, which can be found at http://golang.org. The syntax is specified in Extended Backus-Naur Form ("EBNF"):
|
||||
|
||||
```
|
||||
Production = production_name "=" [ Expression ] "." .
|
||||
Expression = Alternative { "|" Alternative } .
|
||||
Alternative = Term { Term } .
|
||||
Term = production_name | token [ "…" token ] | Group | Option | Repetition .
|
||||
Group = "(" Expression ")" .
|
||||
Option = "[" Expression "]" .
|
||||
Repetition = "{" Expression "}" .
|
||||
```
|
||||
|
||||
Notation operators in order of increasing precedence:
|
||||
|
||||
```
|
||||
| alternation
|
||||
() grouping
|
||||
[] option (0 or 1 times)
|
||||
{} repetition (0 to n times)
|
||||
```
|
||||
|
||||
## Characters & Digits
|
||||
|
||||
```
|
||||
newline = /* the Unicode code point U+000A */ .
|
||||
unicode_char = /* an arbitrary Unicode code point except newline */ .
|
||||
ascii_letter = "A" .. "Z" | "a" .. "z" .
|
||||
decimal_digit = "0" .. "9" .
|
||||
```
|
||||
|
||||
## Database name
|
||||
|
||||
Database names are more limited than other identifiers because they appear in URLs.
|
||||
|
||||
```
|
||||
db_name = ascii_letter { ascii_letter | decimal_digit | "_" | "-" } .
|
||||
```
|
||||
|
||||
## Identifiers
|
||||
|
||||
```
|
||||
identifier = unquoted_identifier | quoted_identifier .
|
||||
unquoted_identifier = ascii_letter { ascii_letter | decimal_digit | "_" | "." } .
|
||||
quoted_identifier = `"` unicode_char { unicode_char } `"` .
|
||||
```
|
||||
|
||||
## Keywords
|
||||
|
||||
```
|
||||
ALL ALTER AS ASC BEGIN
|
||||
BY CREATE CONTINUOUS DATABASE DEFAULT
|
||||
DELETE DESC DROP DURATION END
|
||||
EXISTS EXPLAIN FIELD FROM GRANT
|
||||
GROUP IF INNER INSERT INTO
|
||||
KEYS LIMIT LIST MEASUREMENT MEASUREMENTS
|
||||
ON ORDER PASSWORD POLICY PRIVILEGES
|
||||
QUERIES QUERY READ REPLICATION RETENTION
|
||||
EVOKE SELECT SERIES TAG TO
|
||||
USER VALUES WHERE WITH WRITE
|
||||
```
|
||||
|
||||
## Literals
|
||||
|
||||
### Numbers
|
||||
|
||||
```
|
||||
int_lit = decimal_lit .
|
||||
decimal_lit = ( "1" .. "9" ) { decimal_digit } .
|
||||
float_lit = decimals "." decimals .
|
||||
decimals = decimal_digit { decimal_digit } .
|
||||
```
|
||||
|
||||
### Strings
|
||||
|
||||
```
|
||||
string_lit = '"' { unicode_char } '"' .
|
||||
```
|
||||
|
||||
### Durations
|
||||
|
||||
```
|
||||
duration_lit = decimals duration_unit .
|
||||
duration_unit = "u" | "µ" | "s" | "h" | "d" | "w" | "ms" .
|
||||
```
|
||||
|
||||
## Queries
|
||||
|
||||
A query is composed of one or more statements separated by a semicolon.
|
||||
|
||||
```
|
||||
query = statement { ; statement } .
|
||||
|
||||
statement = alter_retention_policy_stmt |
|
||||
create_continuous_query_stmt |
|
||||
create_database_stmt |
|
||||
create_retention_policy_stmt |
|
||||
create_user_stmt |
|
||||
delete_stmt |
|
||||
drop_continuous_query_stmt |
|
||||
drop_database_stmt |
|
||||
drop_series_stmt |
|
||||
drop_user_stmt |
|
||||
grant_stmt |
|
||||
list_continuous_queries_stmt |
|
||||
list_databases_stmt |
|
||||
list_field_key_stmt |
|
||||
list_field_value_stmt |
|
||||
list_measurements_stmt |
|
||||
list_series_stmt |
|
||||
list_tag_key_stmt |
|
||||
list_tag_value_stmt |
|
||||
revoke_stmt |
|
||||
select_stmt .
|
||||
```
|
||||
|
||||
## Statements
|
||||
|
||||
### ALTER RETENTION POLICY
|
||||
|
||||
```
|
||||
alter_retention_policy_stmt = "ALTER RETENTION POLICY" policy_name "ON"
|
||||
db_name retention_policy_option
|
||||
[ retention_policy_option ]
|
||||
[ retention_policy_option ] .
|
||||
|
||||
policy_name = identifier .
|
||||
|
||||
retention_policy_option = retention_policy_duration |
|
||||
retention_policy_replication |
|
||||
"DEFAULT" .
|
||||
|
||||
retention_policy_duration = "DURATION" duration_lit .
|
||||
retention_policy_replication = "REPLICATION" int_lit
|
||||
```
|
||||
|
||||
#### Examples:
|
||||
|
||||
```sql
|
||||
-- Set default retention policy for mydb to 1h.cpu.
|
||||
ALTER RETENTION POLICY "1h.cpu" ON mydb DEFAULT;
|
||||
|
||||
-- Change duration and replication factor.
|
||||
ALTER RETENTION POLICY policy1 ON somedb DURATION 1h REPLICATION 4
|
||||
```
|
||||
|
||||
### CREATE CONTINUOUS QUERY
|
||||
|
||||
```
|
||||
create_continuous_query_stmt = "CREATE CONTINUOUS QUERY" query_name "ON" db_name
|
||||
"BEGIN" select_stmt "END" .
|
||||
|
||||
query_name = identifier .
|
||||
```
|
||||
|
||||
#### Examples:
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 10m_event_count
|
||||
ON db_name
|
||||
BEGIN
|
||||
SELECT count(value)
|
||||
INTO 10m.events
|
||||
FROM events
|
||||
GROUP BY time(10m)
|
||||
END;
|
||||
|
||||
-- this selects from the output of one continuous query and outputs to another series
|
||||
CREATE CONTINUOUS QUERY 1h_event_count
|
||||
ON db_name
|
||||
BEGIN
|
||||
SELECT sum(count) as count
|
||||
INTO 1h.events
|
||||
FROM events
|
||||
GROUP BY time(1h)
|
||||
END;
|
||||
```
|
||||
|
||||
### CREATE DATABASE
|
||||
|
||||
```
|
||||
create_database_stmt = "CREATE DATABASE" db_name
|
||||
```
|
||||
|
||||
#### Example:
|
||||
|
||||
```sql
|
||||
CREATE DATABASE foo
|
||||
```
|
||||
|
||||
### CREATE RETENTION POLICY
|
||||
|
||||
```
|
||||
create_retention_policy_stmt = "CREATE RETENTION POLICY" policy_name "ON"
|
||||
db_name retention_policy_duration
|
||||
retention_policy_replication
|
||||
[ "DEFAULT" ] .
|
||||
```
|
||||
|
||||
#### Examples
|
||||
|
||||
```sql
|
||||
-- Create a retention policy.
|
||||
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2;
|
||||
|
||||
-- Create a retention policy and set it as the default.
|
||||
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2 DEFAULT;
|
||||
```
|
||||
|
||||
### CREATE USER
|
||||
|
||||
```
|
||||
create_user_stmt = "CREATE USER" user_name "WITH PASSWORD" password
|
||||
[ "WITH ALL PRIVILEGES" ] .
|
||||
```
|
||||
|
||||
#### Examples:
|
||||
|
||||
```sql
|
||||
-- Create a normal database user.
|
||||
CREATE USER jdoe WITH PASSWORD "1337password";
|
||||
|
||||
-- Create a cluster admin.
|
||||
-- Note: Unlike the GRANT statement, the "PRIVILEGES" keyword is required here.
|
||||
CREATE USER jdoe WITH PASSWORD "1337password" WITH ALL PRIVILEGES;
|
||||
```
|
||||
|
||||
### DELETE
|
||||
|
||||
```
|
||||
delete_stmt = "DELETE" from_clause where_clause .
|
||||
```
|
||||
|
||||
#### Example:
|
||||
|
||||
```sql
|
||||
DELETE FROM cpu WHERE region = 'uswest'
|
||||
```
|
||||
|
||||
### GRANT
|
||||
|
||||
```
|
||||
grant_stmt = "GRANT" privilege [ on_clause ] to_clause
|
||||
```
|
||||
|
||||
#### Examples:
|
||||
|
||||
```sql
|
||||
-- grant cluster admin privileges
|
||||
GRANT ALL TO jdoe;
|
||||
|
||||
-- grant read access to a database
|
||||
GRANT READ ON mydb TO jdoe;
|
||||
```
|
||||
|
||||
## Clauses
|
||||
|
||||
```
|
||||
from_clause = "FROM" measurements .
|
||||
|
||||
where_clause = "WHERE" expr .
|
||||
|
||||
on_clause = db_name .
|
||||
|
||||
to_clause = user_name .
|
||||
```
|
||||
|
||||
## Other
|
||||
|
||||
```
|
||||
expr =
|
||||
|
||||
measurements =
|
||||
|
||||
user_name = identifier .
|
||||
|
||||
password = identifier .
|
||||
|
||||
privilege = "ALL" [ "PRIVILEGES" ] | "READ" | "WRITE" .
|
||||
```
|
180
influxql/ast.go
180
influxql/ast.go
|
@ -47,47 +47,49 @@ type Node interface {
|
|||
func (_ *Query) node() {}
|
||||
func (_ Statements) node() {}
|
||||
|
||||
func (_ *SelectStatement) node() {}
|
||||
func (_ *DeleteStatement) node() {}
|
||||
func (_ *ListSeriesStatement) node() {}
|
||||
func (_ *ListMeasurementsStatement) node() {}
|
||||
func (_ *ListTagKeysStatement) node() {}
|
||||
func (_ *ListTagValuesStatement) node() {}
|
||||
func (_ *ListFieldKeysStatement) node() {}
|
||||
func (_ *ListFieldValuesStatement) node() {}
|
||||
func (_ *ListContinuousQueriesStatement) node() {}
|
||||
func (_ *DropSeriesStatement) node() {}
|
||||
func (_ *DropContinuousQueryStatement) node() {}
|
||||
func (_ *DropDatabaseStatement) node() {}
|
||||
func (_ *DropUserStatement) node() {}
|
||||
func (_ *AlterRetentionPolicyStatement) node() {}
|
||||
func (_ *CreateContinuousQueryStatement) node() {}
|
||||
func (_ *CreateDatabaseStatement) node() {}
|
||||
func (_ *CreateUserStatement) node() {}
|
||||
func (_ *CreateRetentionPolicyStatement) node() {}
|
||||
func (_ *CreateUserStatement) node() {}
|
||||
func (_ *DeleteStatement) node() {}
|
||||
func (_ *DropContinuousQueryStatement) node() {}
|
||||
func (_ *DropDatabaseStatement) node() {}
|
||||
func (_ *DropSeriesStatement) node() {}
|
||||
func (_ *DropUserStatement) node() {}
|
||||
func (_ *GrantStatement) node() {}
|
||||
func (_ *ListContinuousQueriesStatement) node() {}
|
||||
func (_ *ListDatabasesStatement) node() {}
|
||||
func (_ *ListFieldKeysStatement) node() {}
|
||||
func (_ *ListFieldValuesStatement) node() {}
|
||||
func (_ *ListMeasurementsStatement) node() {}
|
||||
func (_ *ListSeriesStatement) node() {}
|
||||
func (_ *ListTagKeysStatement) node() {}
|
||||
func (_ *ListTagValuesStatement) node() {}
|
||||
func (_ *RevokeStatement) node() {}
|
||||
func (_ *AlterRetentionPolicyStatement) node() {}
|
||||
func (_ *SelectStatement) node() {}
|
||||
|
||||
func (_ Fields) node() {}
|
||||
func (_ *Field) node() {}
|
||||
func (_ Dimensions) node() {}
|
||||
func (_ *BinaryExpr) node() {}
|
||||
func (_ *BooleanLiteral) node() {}
|
||||
func (_ *Call) node() {}
|
||||
func (_ *Dimension) node() {}
|
||||
func (_ Dimensions) node() {}
|
||||
func (_ *DurationLiteral) node() {}
|
||||
func (_ *Field) node() {}
|
||||
func (_ Fields) node() {}
|
||||
func (_ *Join) node() {}
|
||||
func (_ *Measurement) node() {}
|
||||
func (_ Measurements) node() {}
|
||||
func (_ *Join) node() {}
|
||||
func (_ *Merge) node() {}
|
||||
func (_ *VarRef) node() {}
|
||||
func (_ *Call) node() {}
|
||||
func (_ *NumberLiteral) node() {}
|
||||
func (_ *StringLiteral) node() {}
|
||||
func (_ *BooleanLiteral) node() {}
|
||||
func (_ *TimeLiteral) node() {}
|
||||
func (_ *DurationLiteral) node() {}
|
||||
func (_ *BinaryExpr) node() {}
|
||||
func (_ *ParenExpr) node() {}
|
||||
func (_ *Wildcard) node() {}
|
||||
func (_ SortFields) node() {}
|
||||
func (_ *SortField) node() {}
|
||||
func (_ SortFields) node() {}
|
||||
func (_ *StringLiteral) node() {}
|
||||
func (_ *Target) node() {}
|
||||
func (_ *TimeLiteral) node() {}
|
||||
func (_ *VarRef) node() {}
|
||||
func (_ *Wildcard) node() {}
|
||||
|
||||
// Query represents a collection of ordered statements.
|
||||
type Query struct {
|
||||
|
@ -115,26 +117,27 @@ type Statement interface {
|
|||
stmt()
|
||||
}
|
||||
|
||||
func (_ *SelectStatement) stmt() {}
|
||||
func (_ *DeleteStatement) stmt() {}
|
||||
func (_ *ListSeriesStatement) stmt() {}
|
||||
func (_ *DropSeriesStatement) stmt() {}
|
||||
func (_ *ListContinuousQueriesStatement) stmt() {}
|
||||
func (_ *AlterRetentionPolicyStatement) stmt() {}
|
||||
func (_ *CreateContinuousQueryStatement) stmt() {}
|
||||
func (_ *CreateDatabaseStatement) stmt() {}
|
||||
func (_ *CreateRetentionPolicyStatement) stmt() {}
|
||||
func (_ *CreateUserStatement) stmt() {}
|
||||
func (_ *DeleteStatement) stmt() {}
|
||||
func (_ *DropContinuousQueryStatement) stmt() {}
|
||||
func (_ *ListMeasurementsStatement) stmt() {}
|
||||
func (_ *ListTagKeysStatement) stmt() {}
|
||||
func (_ *ListTagValuesStatement) stmt() {}
|
||||
func (_ *DropDatabaseStatement) stmt() {}
|
||||
func (_ *DropSeriesStatement) stmt() {}
|
||||
func (_ *DropUserStatement) stmt() {}
|
||||
func (_ *GrantStatement) stmt() {}
|
||||
func (_ *ListContinuousQueriesStatement) stmt() {}
|
||||
func (_ *ListDatabasesStatement) stmt() {}
|
||||
func (_ *ListFieldKeysStatement) stmt() {}
|
||||
func (_ *ListFieldValuesStatement) stmt() {}
|
||||
func (_ *CreateDatabaseStatement) stmt() {}
|
||||
func (_ *CreateUserStatement) stmt() {}
|
||||
func (_ *GrantStatement) stmt() {}
|
||||
func (_ *ListMeasurementsStatement) stmt() {}
|
||||
func (_ *ListSeriesStatement) stmt() {}
|
||||
func (_ *ListTagKeysStatement) stmt() {}
|
||||
func (_ *ListTagValuesStatement) stmt() {}
|
||||
func (_ *RevokeStatement) stmt() {}
|
||||
func (_ *CreateRetentionPolicyStatement) stmt() {}
|
||||
func (_ *DropDatabaseStatement) stmt() {}
|
||||
func (_ *DropUserStatement) stmt() {}
|
||||
func (_ *AlterRetentionPolicyStatement) stmt() {}
|
||||
func (_ *SelectStatement) stmt() {}
|
||||
|
||||
// Expr represents an expression that can be evaluated to a value.
|
||||
type Expr interface {
|
||||
|
@ -142,15 +145,15 @@ type Expr interface {
|
|||
expr()
|
||||
}
|
||||
|
||||
func (_ *VarRef) expr() {}
|
||||
func (_ *Call) expr() {}
|
||||
func (_ *NumberLiteral) expr() {}
|
||||
func (_ *StringLiteral) expr() {}
|
||||
func (_ *BooleanLiteral) expr() {}
|
||||
func (_ *TimeLiteral) expr() {}
|
||||
func (_ *DurationLiteral) expr() {}
|
||||
func (_ *BinaryExpr) expr() {}
|
||||
func (_ *BooleanLiteral) expr() {}
|
||||
func (_ *Call) expr() {}
|
||||
func (_ *DurationLiteral) expr() {}
|
||||
func (_ *NumberLiteral) expr() {}
|
||||
func (_ *ParenExpr) expr() {}
|
||||
func (_ *StringLiteral) expr() {}
|
||||
func (_ *TimeLiteral) expr() {}
|
||||
func (_ *VarRef) expr() {}
|
||||
func (_ *Wildcard) expr() {}
|
||||
|
||||
// Source represents a source of data for a statement.
|
||||
|
@ -159,8 +162,8 @@ type Source interface {
|
|||
source()
|
||||
}
|
||||
|
||||
func (_ *Measurement) source() {}
|
||||
func (_ *Join) source() {}
|
||||
func (_ *Measurement) source() {}
|
||||
func (_ *Merge) source() {}
|
||||
|
||||
// SortField represens a field to sort results by.
|
||||
|
@ -228,6 +231,9 @@ type CreateUserStatement struct {
|
|||
|
||||
// User's password
|
||||
Password string
|
||||
|
||||
// User's privilege level.
|
||||
Privilege *Privilege
|
||||
}
|
||||
|
||||
// String returns a string representation of the create user statement.
|
||||
|
@ -237,6 +243,12 @@ func (s *CreateUserStatement) String() string {
|
|||
_, _ = buf.WriteString(s.Name)
|
||||
_, _ = buf.WriteString(" WITH PASSWORD ")
|
||||
_, _ = buf.WriteString(s.Password)
|
||||
|
||||
if s.Privilege != nil {
|
||||
_, _ = buf.WriteString(" WITH ")
|
||||
_, _ = buf.WriteString(s.Privilege.String())
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
|
@ -263,6 +275,9 @@ const (
|
|||
AllPrivileges
|
||||
)
|
||||
|
||||
// NewPrivilege returns an initialized *Privilege.
|
||||
func NewPrivilege(p Privilege) *Privilege { return &p }
|
||||
|
||||
// String returns a string representation of a Privilege.
|
||||
func (p Privilege) String() string {
|
||||
switch p {
|
||||
|
@ -334,7 +349,7 @@ type CreateRetentionPolicyStatement struct {
|
|||
Name string
|
||||
|
||||
// Name of database this policy belongs to.
|
||||
DB string
|
||||
Database string
|
||||
|
||||
// Duration data written to this policy will be retained.
|
||||
Duration time.Duration
|
||||
|
@ -352,7 +367,7 @@ func (s *CreateRetentionPolicyStatement) String() string {
|
|||
_, _ = buf.WriteString("CREATE RETENTION POLICY ")
|
||||
_, _ = buf.WriteString(s.Name)
|
||||
_, _ = buf.WriteString(" ON ")
|
||||
_, _ = buf.WriteString(s.DB)
|
||||
_, _ = buf.WriteString(s.Database)
|
||||
_, _ = buf.WriteString(" DURATION ")
|
||||
_, _ = buf.WriteString(FormatDuration(s.Duration))
|
||||
_, _ = buf.WriteString(" REPLICATION ")
|
||||
|
@ -369,7 +384,7 @@ type AlterRetentionPolicyStatement struct {
|
|||
Name string
|
||||
|
||||
// Name of the database this policy belongs to.
|
||||
DB string
|
||||
Database string
|
||||
|
||||
// Duration data written to this policy will be retained.
|
||||
Duration *time.Duration
|
||||
|
@ -387,7 +402,7 @@ func (s *AlterRetentionPolicyStatement) String() string {
|
|||
_, _ = buf.WriteString("ALTER RETENTION POLICY ")
|
||||
_, _ = buf.WriteString(s.Name)
|
||||
_, _ = buf.WriteString(" ON ")
|
||||
_, _ = buf.WriteString(s.DB)
|
||||
_, _ = buf.WriteString(s.Database)
|
||||
|
||||
if s.Duration != nil {
|
||||
_, _ = buf.WriteString(" DURATION ")
|
||||
|
@ -411,6 +426,9 @@ type SelectStatement struct {
|
|||
// Expressions returned from the selection.
|
||||
Fields Fields
|
||||
|
||||
// Target (destination) for the result of the select.
|
||||
Target *Target
|
||||
|
||||
// Expressions used for grouping the selection.
|
||||
Dimensions Dimensions
|
||||
|
||||
|
@ -433,6 +451,11 @@ func (s *SelectStatement) String() string {
|
|||
var buf bytes.Buffer
|
||||
_, _ = buf.WriteString("SELECT ")
|
||||
_, _ = buf.WriteString(s.Fields.String())
|
||||
|
||||
if s.Target != nil {
|
||||
_, _ = buf.WriteString(" ")
|
||||
_, _ = buf.WriteString(s.Target.String())
|
||||
}
|
||||
_, _ = buf.WriteString(" FROM ")
|
||||
_, _ = buf.WriteString(s.Source.String())
|
||||
if s.Condition != nil {
|
||||
|
@ -591,6 +614,38 @@ func MatchSource(src Source, name string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// Target represents a target (destination) policy, measurment, and DB.
|
||||
type Target struct {
|
||||
// Retention policy to write into.
|
||||
RetentionPolicy string
|
||||
|
||||
// Measurement to write into.
|
||||
Measurement string
|
||||
|
||||
// Database to write into.
|
||||
Database string
|
||||
}
|
||||
|
||||
// String returns a string representation of the Target.
|
||||
func (t *Target) String() string {
|
||||
var buf bytes.Buffer
|
||||
_, _ = buf.WriteString("INTO ")
|
||||
|
||||
if t.RetentionPolicy != "" {
|
||||
_, _ = buf.WriteString(t.RetentionPolicy)
|
||||
_, _ = buf.WriteString(".")
|
||||
}
|
||||
|
||||
_, _ = buf.WriteString(t.Measurement)
|
||||
|
||||
if t.Database != "" {
|
||||
_, _ = buf.WriteString(" ON ")
|
||||
_, _ = buf.WriteString(t.Database)
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// DeleteStatement represents a command for removing data from the database.
|
||||
type DeleteStatement struct {
|
||||
// Data source that values are removed from.
|
||||
|
@ -659,16 +714,27 @@ type ListContinuousQueriesStatement struct{}
|
|||
// String returns a string representation of the list continuous queries statement.
|
||||
func (s *ListContinuousQueriesStatement) String() string { return "LIST CONTINUOUS QUERIES" }
|
||||
|
||||
// ListDatabasesStatement represents a command for listing all databases in the cluster.
|
||||
type ListDatabasesStatement struct{}
|
||||
|
||||
// String returns a string representation of the list databases command.
|
||||
func (s *ListDatabasesStatement) String() string { return "LIST DATABASES" }
|
||||
|
||||
// CreateContinuousQueriesStatement represents a command for creating a continuous query.
|
||||
type CreateContinuousQueryStatement struct {
|
||||
// Name of the continuous query to be created.
|
||||
Name string
|
||||
|
||||
// Name of the database to create the continuous query on.
|
||||
Database string
|
||||
|
||||
// Source of data (SELECT statement).
|
||||
Source *SelectStatement
|
||||
Target string
|
||||
}
|
||||
|
||||
// String returns a string representation of the statement.
|
||||
func (s *CreateContinuousQueryStatement) String() string {
|
||||
return fmt.Sprintf("CREATE CONTINUOUS QUERY %s AS %s INTO %s", s.Name, s.Source.String(), s.Target)
|
||||
return fmt.Sprintf("CREATE CONTINUOUS QUERY %s ON %s BEGIN %s END", s.Name, s.Database, s.Source.String())
|
||||
}
|
||||
|
||||
// DropContinuousQueriesStatement represents a command for removing a continuous query.
|
||||
|
|
|
@ -65,7 +65,7 @@ func (p *Parser) ParseStatement() (Statement, error) {
|
|||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
switch tok {
|
||||
case SELECT:
|
||||
return p.parseSelectStatement()
|
||||
return p.parseSelectStatement(targetNotRequired)
|
||||
case DELETE:
|
||||
return p.parseDeleteStatement()
|
||||
case LIST:
|
||||
|
@ -93,6 +93,8 @@ func (p *Parser) parseListStatement() (Statement, error) {
|
|||
return p.parseListSeriesStatement()
|
||||
} else if tok == CONTINUOUS {
|
||||
return p.parseListContinuousQueriesStatement()
|
||||
} else if tok == DATABASES {
|
||||
return p.parseListDatabasesStatement()
|
||||
} else if tok == MEASUREMENTS {
|
||||
return p.parseListMeasurementsStatement()
|
||||
} else if tok == TAG {
|
||||
|
@ -190,7 +192,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.DB = ident
|
||||
stmt.Database = ident
|
||||
|
||||
// Parse required DURATION token.
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
|
@ -249,9 +251,9 @@ func (p *Parser) parseAlterRetentionPolicyStatement() (*AlterRetentionPolicyStat
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.DB = ident
|
||||
stmt.Database = ident
|
||||
|
||||
// Loop through option tokens (DURATION, RETENTION, DEFAULT, etc.).
|
||||
// Loop through option tokens (DURATION, REPLICATION, DEFAULT, etc.).
|
||||
maxNumOptions := 3
|
||||
Loop:
|
||||
for i := 0; i < maxNumOptions; i++ {
|
||||
|
@ -442,7 +444,7 @@ func (p *Parser) parsePrivilege() (Privilege, error) {
|
|||
|
||||
// parseSelectStatement parses a select string and returns a Statement AST object.
|
||||
// This function assumes the SELECT token has already been consumed.
|
||||
func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
|
||||
func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, error) {
|
||||
stmt := &SelectStatement{}
|
||||
|
||||
// Parse fields: "SELECT FIELD+".
|
||||
|
@ -452,6 +454,14 @@ func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
|
|||
}
|
||||
stmt.Fields = fields
|
||||
|
||||
// Parse target: "INTO"
|
||||
target, err := p.parseTarget(tr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if target != nil {
|
||||
stmt.Target = target
|
||||
}
|
||||
|
||||
// Parse source.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
|
||||
|
@ -493,6 +503,63 @@ func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
|
|||
return stmt, nil
|
||||
}
|
||||
|
||||
// targetRequirement specifies whether or not a target clause is required.
|
||||
type targetRequirement int
|
||||
|
||||
const (
|
||||
targetRequired targetRequirement = iota
|
||||
targetNotRequired
|
||||
)
|
||||
|
||||
// parseTarget parses a string and returns a Target.
|
||||
func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != INTO {
|
||||
if tr == targetRequired {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"INTO"}, pos)
|
||||
}
|
||||
p.unscan()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Parse identifier. Could be policy or measurement name.
|
||||
ident, err := p.parseIdentifier()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
target := &Target{}
|
||||
|
||||
tok, _, _ := p.scanIgnoreWhitespace()
|
||||
if tok == DOT {
|
||||
// Previous identifier was retention policy name.
|
||||
target.RetentionPolicy = ident
|
||||
|
||||
// Parse required measurement.
|
||||
ident, err = p.parseIdentifier()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
p.unscan()
|
||||
}
|
||||
|
||||
target.Measurement = ident
|
||||
|
||||
// Parse optional ON.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok != ON {
|
||||
p.unscan()
|
||||
return target, nil
|
||||
}
|
||||
|
||||
// Found an ON token so parse required identifier.
|
||||
if ident, err = p.parseIdentifier(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
target.Database = ident
|
||||
|
||||
return target, nil
|
||||
}
|
||||
|
||||
// parseDeleteStatement parses a delete string and returns a DeleteStatement.
|
||||
// This function assumes the DELETE token has already been consumed.
|
||||
func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
|
||||
|
@ -760,6 +827,13 @@ func (p *Parser) parseListContinuousQueriesStatement() (*ListContinuousQueriesSt
|
|||
return stmt, nil
|
||||
}
|
||||
|
||||
// parseListDatabasesStatement parses a string and returns a ListDatabasesStatement.
|
||||
// This function assumes the "LIST DATABASE" tokens have already been consumed.
|
||||
func (p *Parser) parseListDatabasesStatement() (*ListDatabasesStatement, error) {
|
||||
stmt := &ListDatabasesStatement{}
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
// parseCreateContinuousQueriesStatement parses a string and returns a CreateContinuousQueryStatement.
|
||||
// This function assumes the "CREATE CONTINUOUS" tokens have already been consumed.
|
||||
func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQueryStatement, error) {
|
||||
|
@ -771,39 +845,40 @@ func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQuerySt
|
|||
}
|
||||
|
||||
// Read the id of the query to create.
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok != IDENT && tok != STRING {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
|
||||
ident, err := p.parseIdentifier()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Name = lit
|
||||
stmt.Name = ident
|
||||
|
||||
// Expect an "AS SELECT" keyword.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != AS {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"AS"}, pos)
|
||||
// Expect an "ON" keyword.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != ON {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
|
||||
}
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SELECT {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"SELECT"}, pos)
|
||||
|
||||
// Read the name of the database to create the query on.
|
||||
if ident, err = p.parseIdentifier(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Database = ident
|
||||
|
||||
// Expect a "BEGIN SELECT" tokens.
|
||||
if err := p.parseTokens([]Token{BEGIN, SELECT}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the select statement to be used as the source.
|
||||
source, err := p.parseSelectStatement()
|
||||
source, err := p.parseSelectStatement(targetRequired)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Source = source
|
||||
|
||||
// Expect an INTO keyword.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != INTO {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"INTO"}, pos)
|
||||
// Expect a "END" keyword.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != END {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"END"}, pos)
|
||||
}
|
||||
|
||||
// Read the target of the query.
|
||||
tok, pos, lit = p.scanIgnoreWhitespace()
|
||||
if tok != IDENT && tok != STRING {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
|
||||
}
|
||||
stmt.Target = lit
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
|
@ -843,11 +918,11 @@ func (p *Parser) parseCreateUserStatement() (*CreateUserStatement, error) {
|
|||
stmt := &CreateUserStatement{}
|
||||
|
||||
// Parse name of the user to be created.
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
if tok != IDENT && tok != STRING {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
|
||||
ident, err := p.parseIdentifier()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Name = lit
|
||||
stmt.Name = ident
|
||||
|
||||
// Consume "WITH PASSWORD" tokens
|
||||
if err := p.parseTokens([]Token{WITH, PASSWORD}); err != nil {
|
||||
|
@ -855,11 +930,23 @@ func (p *Parser) parseCreateUserStatement() (*CreateUserStatement, error) {
|
|||
}
|
||||
|
||||
// Parse new user's password
|
||||
tok, pos, lit = p.scanIgnoreWhitespace()
|
||||
if tok != IDENT && tok != STRING {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
|
||||
if ident, err = p.parseIdentifier(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Password = lit
|
||||
stmt.Password = ident
|
||||
|
||||
// Check for option WITH clause.
|
||||
if tok, _, _ := p.scanIgnoreWhitespace(); tok != WITH {
|
||||
p.unscan()
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
// We only allow granting of "ALL PRIVILEGES" during CREATE USER.
|
||||
// All other privileges must be granted using a GRANT statement.
|
||||
if err := p.parseTokens([]Token{ALL, PRIVILEGES}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stmt.Privilege = NewPrivilege(AllPrivileges)
|
||||
|
||||
return stmt, nil
|
||||
}
|
||||
|
|
|
@ -147,6 +147,12 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// LIST DATABASES
|
||||
{
|
||||
s: `LIST DATABASES`,
|
||||
stmt: &influxql.ListDatabasesStatement{},
|
||||
},
|
||||
|
||||
// LIST SERIES statement
|
||||
{
|
||||
s: `LIST SERIES`,
|
||||
|
@ -277,16 +283,34 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
stmt: &influxql.ListContinuousQueriesStatement{},
|
||||
},
|
||||
|
||||
// CREATE CONTINUOUS QUERY statement
|
||||
// CREATE CONTINUOUS QUERY ... INTO <measurement>
|
||||
{
|
||||
s: `CREATE CONTINUOUS QUERY myquery AS SELECT count() FROM myseries INTO foo`,
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO measure1 FROM myseries END`,
|
||||
stmt: &influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "testdb",
|
||||
Source: &influxql.SelectStatement{
|
||||
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
|
||||
Target: &influxql.Target{Measurement: "measure1"},
|
||||
Source: &influxql.Measurement{Name: "myseries"},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
// CREATE CONTINUOUS QUERY ... INTO <retention-policy>.<measurement>
|
||||
{
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO "1h.policy1"."cpu.load" FROM myseries END`,
|
||||
stmt: &influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "testdb",
|
||||
Source: &influxql.SelectStatement{
|
||||
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
|
||||
Target: &influxql.Target{
|
||||
RetentionPolicy: "1h.policy1",
|
||||
Measurement: "cpu.load",
|
||||
},
|
||||
Source: &influxql.Measurement{Name: "myseries"},
|
||||
},
|
||||
Target: "foo",
|
||||
},
|
||||
},
|
||||
|
||||
|
@ -307,6 +331,16 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
},
|
||||
},
|
||||
|
||||
// CREATE USER ... WITH ALL PRIVILEGES
|
||||
{
|
||||
s: `CREATE USER testuser WITH PASSWORD pwd1337 WITH ALL PRIVILEGES`,
|
||||
stmt: &influxql.CreateUserStatement{
|
||||
Name: "testuser",
|
||||
Password: "pwd1337",
|
||||
Privilege: influxql.NewPrivilege(influxql.AllPrivileges),
|
||||
},
|
||||
},
|
||||
|
||||
// DROP CONTINUOUS QUERY statement
|
||||
{
|
||||
s: `DROP CONTINUOUS QUERY myquery`,
|
||||
|
@ -428,7 +462,7 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 2`,
|
||||
stmt: &influxql.CreateRetentionPolicyStatement{
|
||||
Name: "policy1",
|
||||
DB: "testdb",
|
||||
Database: "testdb",
|
||||
Duration: time.Hour,
|
||||
Replication: 2,
|
||||
},
|
||||
|
@ -439,7 +473,7 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 2m REPLICATION 4 DEFAULT`,
|
||||
stmt: &influxql.CreateRetentionPolicyStatement{
|
||||
Name: "policy1",
|
||||
DB: "testdb",
|
||||
Database: "testdb",
|
||||
Duration: 2 * time.Minute,
|
||||
Replication: 4,
|
||||
Default: true,
|
||||
|
@ -506,6 +540,10 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
{s: `DROP DATABASE`, err: `found EOF, expected identifier at line 1, char 15`},
|
||||
{s: `DROP USER`, err: `found EOF, expected identifier at line 1, char 11`},
|
||||
{s: `CREATE USER testuser`, err: `found EOF, expected WITH at line 1, char 22`},
|
||||
{s: `CREATE USER testuser WITH`, err: `found EOF, expected PASSWORD at line 1, char 27`},
|
||||
{s: `CREATE USER testuser WITH PASSWORD`, err: `found EOF, expected identifier at line 1, char 36`},
|
||||
{s: `CREATE USER testuser WITH PASSWORD "pwd" WITH`, err: `found EOF, expected ALL at line 1, char 47`},
|
||||
{s: `CREATE USER testuser WITH PASSWORD "pwd" WITH ALL`, err: `found EOF, expected PRIVILEGES at line 1, char 51`},
|
||||
{s: `GRANT`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
|
||||
{s: `GRANT BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
|
||||
{s: `GRANT READ`, err: `found EOF, expected ON at line 1, char 12`},
|
||||
|
@ -545,6 +583,10 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
|
||||
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
|
||||
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
|
||||
exp := tt.stmt.(*influxql.CreateContinuousQueryStatement).Source.Target
|
||||
got := stmt.(*influxql.CreateContinuousQueryStatement).Source.Target
|
||||
t.Errorf("exp.String() = %#v\n", *exp)
|
||||
t.Errorf("got.String() = %#v\n", *got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -834,7 +876,7 @@ func errstring(err error) string {
|
|||
func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement {
|
||||
stmt := &influxql.AlterRetentionPolicyStatement{
|
||||
Name: name,
|
||||
DB: DB,
|
||||
Database: DB,
|
||||
Default: dfault,
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,14 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
|
|||
return EOF, pos, ""
|
||||
case '"', '\'':
|
||||
return s.scanString()
|
||||
case '.', '+', '-':
|
||||
case '.':
|
||||
ch1, _ := s.r.read()
|
||||
s.r.unread()
|
||||
if isDigit(ch1) {
|
||||
return s.scanNumber()
|
||||
}
|
||||
return DOT, pos, ""
|
||||
case '+', '-':
|
||||
return s.scanNumber()
|
||||
case '*':
|
||||
return MUL, pos, ""
|
||||
|
@ -233,7 +240,6 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
|
|||
}
|
||||
s.r.unread()
|
||||
}
|
||||
|
||||
return NUMBER, pos, buf.String()
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ func TestScanner_Scan(t *testing.T) {
|
|||
{s: `)`, tok: influxql.RPAREN},
|
||||
{s: `,`, tok: influxql.COMMA},
|
||||
{s: `;`, tok: influxql.SEMICOLON},
|
||||
{s: `.`, tok: influxql.DOT},
|
||||
|
||||
// Identifiers
|
||||
{s: `foo`, tok: influxql.IDENT, lit: `foo`},
|
||||
|
@ -79,7 +80,7 @@ func TestScanner_Scan(t *testing.T) {
|
|||
{s: `.23`, tok: influxql.NUMBER, lit: `.23`},
|
||||
{s: `+.23`, tok: influxql.NUMBER, lit: `+.23`},
|
||||
{s: `-.23`, tok: influxql.NUMBER, lit: `-.23`},
|
||||
{s: `.`, tok: influxql.ILLEGAL, lit: `.`},
|
||||
//{s: `.`, tok: influxql.ILLEGAL, lit: `.`},
|
||||
{s: `-.`, tok: influxql.SUB, lit: ``},
|
||||
{s: `+.`, tok: influxql.ADD, lit: ``},
|
||||
{s: `10.3s`, tok: influxql.NUMBER, lit: `10.3`},
|
||||
|
@ -100,15 +101,18 @@ func TestScanner_Scan(t *testing.T) {
|
|||
{s: `ALTER`, tok: influxql.ALTER},
|
||||
{s: `AS`, tok: influxql.AS},
|
||||
{s: `ASC`, tok: influxql.ASC},
|
||||
{s: `BEGIN`, tok: influxql.BEGIN},
|
||||
{s: `BY`, tok: influxql.BY},
|
||||
{s: `CREATE`, tok: influxql.CREATE},
|
||||
{s: `CONTINUOUS`, tok: influxql.CONTINUOUS},
|
||||
{s: `DATABASE`, tok: influxql.DATABASE},
|
||||
{s: `DATABASES`, tok: influxql.DATABASES},
|
||||
{s: `DEFAULT`, tok: influxql.DEFAULT},
|
||||
{s: `DELETE`, tok: influxql.DELETE},
|
||||
{s: `DESC`, tok: influxql.DESC},
|
||||
{s: `DROP`, tok: influxql.DROP},
|
||||
{s: `DURATION`, tok: influxql.DURATION},
|
||||
{s: `END`, tok: influxql.END},
|
||||
{s: `EXISTS`, tok: influxql.EXISTS},
|
||||
{s: `EXPLAIN`, tok: influxql.EXPLAIN},
|
||||
{s: `FIELD`, tok: influxql.FIELD},
|
||||
|
|
|
@ -47,6 +47,7 @@ const (
|
|||
RPAREN // )
|
||||
COMMA // ,
|
||||
SEMICOLON // ;
|
||||
DOT // .
|
||||
|
||||
keyword_beg
|
||||
// Keywords
|
||||
|
@ -54,15 +55,18 @@ const (
|
|||
ALTER
|
||||
AS
|
||||
ASC
|
||||
BEGIN
|
||||
BY
|
||||
CREATE
|
||||
CONTINUOUS
|
||||
DATABASE
|
||||
DATABASES
|
||||
DEFAULT
|
||||
DELETE
|
||||
DESC
|
||||
DROP
|
||||
DURATION
|
||||
END
|
||||
EXISTS
|
||||
EXPLAIN
|
||||
FIELD
|
||||
|
@ -132,20 +136,24 @@ var tokens = [...]string{
|
|||
RPAREN: ")",
|
||||
COMMA: ",",
|
||||
SEMICOLON: ";",
|
||||
DOT: ".",
|
||||
|
||||
ALL: "ALL",
|
||||
ALTER: "ALTER",
|
||||
AS: "AS",
|
||||
ASC: "ASC",
|
||||
BEGIN: "BEGIN",
|
||||
BY: "BY",
|
||||
CREATE: "CREATE",
|
||||
CONTINUOUS: "CONTINUOUS",
|
||||
DATABASE: "DATABASE",
|
||||
DATABASES: "DATABASES",
|
||||
DEFAULT: "DEFAULT",
|
||||
DELETE: "DELETE",
|
||||
DESC: "DESC",
|
||||
DROP: "DROP",
|
||||
DURATION: "DURATION",
|
||||
END: "END",
|
||||
EXISTS: "EXISTS",
|
||||
EXPLAIN: "EXPLAIN",
|
||||
FIELD: "FIELD",
|
||||
|
|
11
server.go
11
server.go
|
@ -1161,10 +1161,19 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
|
|||
return err
|
||||
}
|
||||
|
||||
// If the retention policy is not set, use the default for this database.
|
||||
if retentionPolicy == "" {
|
||||
rp, err := s.DefaultRetentionPolicy(database)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine default retention policy: %s", err.Error())
|
||||
}
|
||||
retentionPolicy = rp.Name
|
||||
}
|
||||
|
||||
// Now write it into the shard.
|
||||
sh, err := s.createShardIfNotExists(database, retentionPolicy, id, timestamp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
|
||||
return fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
|
||||
}
|
||||
|
||||
// Encode point to a byte slice.
|
||||
|
|
Loading…
Reference in New Issue