Merge branch 'master' of https://github.com/influxdb/influxdb into write-series

Conflicts:
	server.go
pull/1308/head
Ben Johnson 2015-01-10 10:03:37 -07:00
commit 6912c1415d
21 changed files with 1345 additions and 483 deletions

View File

@ -8,9 +8,11 @@ import (
"os/user"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/graphite"
)
const (
@ -51,13 +53,9 @@ type Config struct {
Assets string `toml:"assets"`
} `toml:"admin"`
Graphites []Graphite `toml:"graphite"`
InputPlugins struct {
Graphite struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
Database string `toml:"database"`
UDPEnabled bool `toml:"udp_enabled"`
} `toml:"graphite"`
UDPInput struct {
Enabled bool `toml:"enabled"`
Port int `toml:"port"`
@ -105,6 +103,16 @@ type Config struct {
} `toml:"logging"`
}
type Graphite struct {
Addr string `toml:"address"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Port uint16 `toml:"port"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
}
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
u, _ := user.Current()
@ -166,7 +174,7 @@ func (c *Config) MaxOpenShards() int {
// DataAddr returns the binding address the data server
func (c *Config) DataAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Data.Port)
return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Data.Port))
}
// DataURL returns the URL required to contact the data server.
@ -261,5 +269,37 @@ func ParseConfig(s string) (*Config, error) {
return c, nil
}
// ConnnectionString returns the connection string for this Graphite config in the form host:port.
func (g *Graphite) ConnectionString(defaultBindAddr string) string {
addr := g.Addr
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
}
port := g.Port
// If no port specified, use default.
if port == 0 {
port = graphite.DefaultGraphitePort
}
return fmt.Sprintf("%s:%d", addr, port)
}
// NameSeparatorString returns the character separating fields for Graphite data, or the default
// if no separator is set.
func (g *Graphite) NameSeparatorString() string {
if g.NameSeparator == "" {
return graphite.DefaultGraphiteNameSeparator
}
return g.NameSeparator
}
// LastEnabled returns whether the Graphite Server shoudl intepret the last field as "name".
func (g *Graphite) LastEnabled() bool {
return g.NamePosition == strings.ToLower("last")
}
// maxInt is the largest integer representable by a word (architeture dependent).
const maxInt = int64(^uint(0) >> 1)

View File

@ -2,10 +2,11 @@ package main_test
import (
"reflect"
"strings"
"testing"
"time"
"github.com/influxdb/influxdb/cmd/influxd"
main "github.com/influxdb/influxdb/cmd/influxd"
)
// Ensure that megabyte sizes can be parsed.
@ -61,12 +62,40 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("data port mismatch: %v", c.Data.Port)
}
if c.InputPlugins.Graphite.Enabled != false {
t.Fatalf("graphite enabled mismatch: %v", c.InputPlugins.Graphite.Enabled)
} else if c.InputPlugins.Graphite.Port != 2003 {
t.Fatalf("graphite port mismatch: %v", c.InputPlugins.Graphite.Enabled)
} else if c.InputPlugins.Graphite.Database != "" {
t.Fatalf("graphite database mismatch: %v", c.InputPlugins.Graphite.Database)
if len(c.Graphites) != 2 {
t.Fatalf("graphites mismatch. expected %v, got: %v", 2, len(c.Graphites))
}
tcpGraphite := c.Graphites[0]
switch {
case tcpGraphite.Enabled != true:
t.Fatalf("graphite tcp enabled mismatch: expected: %v, got %v", true, tcpGraphite.Enabled)
case tcpGraphite.Addr != "192.168.0.1":
t.Fatalf("graphite tcp address mismatch: expected %v, got %v", "192.168.0.1", tcpGraphite.Addr)
case tcpGraphite.Port != 2003:
t.Fatalf("graphite tcp port mismatch: expected %v, got %v", 2003, tcpGraphite.Port)
case tcpGraphite.Database != "graphite_tcp":
t.Fatalf("graphite tcp database mismatch: expected %v, got %v", "graphite_tcp", tcpGraphite.Database)
case strings.ToLower(tcpGraphite.Protocol) != "tcp":
t.Fatalf("graphite tcp protocol mismatch: expected %v, got %v", "tcp", strings.ToLower(tcpGraphite.Protocol))
case tcpGraphite.LastEnabled() != true:
t.Fatalf("graphite tcp name-position mismatch: expected %v, got %v", "last", tcpGraphite.NamePosition)
case tcpGraphite.NameSeparatorString() != "-":
t.Fatalf("graphite tcp name-separator mismatch: expected %v, got %v", "-", tcpGraphite.NameSeparatorString())
}
udpGraphite := c.Graphites[1]
switch {
case udpGraphite.Enabled != true:
t.Fatalf("graphite udp enabled mismatch: expected: %v, got %v", true, udpGraphite.Enabled)
case udpGraphite.Addr != "192.168.0.2":
t.Fatalf("graphite udp address mismatch: expected %v, got %v", "192.168.0.2", udpGraphite.Addr)
case udpGraphite.Port != 2005:
t.Fatalf("graphite udp port mismatch: expected %v, got %v", 2005, udpGraphite.Port)
case udpGraphite.Database != "graphite_udp":
t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite_udp", udpGraphite.Database)
case strings.ToLower(udpGraphite.Protocol) != "udp":
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
}
if c.Broker.Port != 8090 {
@ -139,17 +168,31 @@ read-timeout = "5s"
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = false
port = 2003
database = "" # store graphite data in this database
[input_plugins.udp]
enabled = true
port = 4444
database = "test"
# Configure the Graphite servers
[[graphite]]
protocol = "TCP"
enabled = true
address = "192.168.0.1"
port = 2003
database = "graphite_tcp" # store graphite data in this database
name-position = "last"
name-separator = "-"
[[graphite]]
protocol = "udP"
enabled = true
address = "192.168.0.2"
port = 2005
database = "graphite_udp" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
# Broker configuration
[broker]
# The broker port should be open between all servers in a cluster.

View File

@ -12,6 +12,7 @@ import (
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/messaging"
)
@ -22,7 +23,7 @@ func execRun(args []string) {
var (
configPath = fs.String("config", configDefaultPath, "")
pidPath = fs.String("pidfile", "", "")
role = fs.String("role", "combined", "")
role = fs.String("role", "", "")
hostname = fs.String("hostname", "", "")
join = fs.String("join", "", "")
)
@ -30,8 +31,8 @@ func execRun(args []string) {
fs.Parse(args)
// Validate CLI flags.
if *role != "combined" && *role != "broker" && *role != "data" {
log.Fatalf("role must be 'combined', 'broker', or 'data'")
if *role != "" && *role != "broker" && *role != "data" {
log.Fatalf("role must be '', 'broker', or 'data'")
}
// Parse join urls from the --join flag.
@ -53,13 +54,13 @@ func execRun(args []string) {
if b != nil {
h = &Handler{brokerHandler: messaging.NewHandler(b)}
go func() { log.Fatal(http.ListenAndServe(config.BrokerAddr(), h)) }()
log.Printf("broker running on %s", config.BrokerAddr())
log.Printf("broker listening on %s", config.BrokerAddr())
}
// Open server, initialize or join as necessary.
s := openServer(config.Data.Dir, config.DataURL(), b, initializing, joinURLs)
// Start the server handler. Attach to broker if running on the same port.
// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := influxdb.NewHandler(s)
sh.AuthenticationEnabled = config.Authentication.Enabled
@ -68,7 +69,38 @@ func execRun(args []string) {
} else {
go func() { log.Fatal(http.ListenAndServe(config.DataAddr(), sh)) }()
}
log.Printf("data node #%d running on %s", s.ID(), config.DataAddr())
log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr())
// Spin up any Graphite servers
for _, c := range config.Graphites {
if !c.Enabled {
continue
}
// Configure Graphite parsing.
parser := graphite.NewParser()
parser.Separator = c.NameSeparatorString()
parser.LastEnabled = c.LastEnabled()
// Start the relevant server.
if strings.ToLower(c.Protocol) == "tcp" {
g := graphite.NewTCPServer(parser, s)
g.Database = c.Database
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start TCP Graphite Server", err.Error())
}
} else if strings.ToLower(c.Protocol) == "udp" {
g := graphite.NewUDPServer(parser, s)
g.Database = c.Database
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start UDP Graphite Server", err.Error())
}
} else {
log.Fatalf("unrecognized Graphite Server prototcol", c.Protocol)
}
}
}
// Wait indefinitely.
@ -164,7 +196,7 @@ func openServer(path string, u *url.URL, b *messaging.Broker, initializing bool,
// Create and open the server.
s := influxdb.NewServer()
if err := s.Open(path); err != nil {
log.Fatalf("failed to open data server", err.Error())
log.Fatalf("failed to open data server: %v", err.Error())
}
// If the server is uninitialized then initialize or join it.
@ -188,7 +220,7 @@ func initializeServer(s *influxdb.Server, b *messaging.Broker) {
// Create replica on broker.
if err := b.CreateReplica(1); err != nil {
log.Fatalf("replica creation error: %d", err)
log.Fatalf("replica creation error: %s", err)
}
// Create messaging client.
@ -260,27 +292,26 @@ func fileExists(path string) bool {
func printRunUsage() {
log.Printf(`usage: run [flags]
run starts the node with any existing cluster configuration. If no cluster
configuration is found, then the node runs in "local" mode. "Local" mode is a
single-node mode that does not use Distributed Consensus, but is otherwise
fully-functional.
run starts the broker and data node server. If this is the first time running
the command then a new cluster will be initialized unless the -join argument
is used.
-config <path>
Set the path to the configuration file.
Defaults to %s.
-role <role>
Set the role to be 'combined', 'broker' or 'data'.
'broker' means it will take part in Raft distributed
consensus. 'data' means it will store time-series
data. 'combined' means it will do both. The default is
'combined'. A role other than these three is invalid.
Set the role to 'broker' or 'data'. 'broker' means
it will take part in Raft distributed consensus.
'data' means it will store time-series data.
If neither 'broker' or 'data' is specified then
the server will run as both a broker and data node.
-hostname <name>
Override the hostname, the 'hostname' configuration
option will be overridden.
-join <servers>
-join <url>
Joins the server to an existing cluster.
-pidfile <path>

View File

@ -479,7 +479,7 @@ func (d *database) createMeasurementIfNotExists(name string) *Measurement {
// AddField adds a field to the measurement name. Returns false if already present
func (d *database) AddField(name string, f *Field) bool {
panic("not implemented")
if true { panic("not implemented") }
return false
}

View File

@ -33,7 +33,7 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
idx.addSeriesToIndex(m.Name, s)
mm := idx.MeasurementBySeriesID(uint32(1))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
}
// now test that we can add another
@ -44,12 +44,12 @@ func TestDatabase_MeasurementBySeriesID(t *testing.T) {
idx.addSeriesToIndex(m.Name, s)
mm = idx.MeasurementBySeriesID(uint32(2))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
}
mm = idx.MeasurementBySeriesID(uint32(1))
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
}
}
@ -81,7 +81,7 @@ func TestDatabase_SeriesBySeriesID(t *testing.T) {
idx.addSeriesToIndex("foo", s)
ss := idx.SeriesByID(uint32(2))
if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
t.Fatalf("series not equal:\n%v\n%v", s, ss)
}
}
@ -100,9 +100,9 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
idx.addSeriesToIndex(m.Name, s)
mm, ss := idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
t.Fatalf("series not equal:\n%v\n%v", s, ss)
}
// now test that we can add another
@ -113,9 +113,9 @@ func TestDatabase_MeasurementAndSeries(t *testing.T) {
idx.addSeriesToIndex(m.Name, s)
mm, ss = idx.MeasurementAndSeries(m.Name, s.Tags)
if string(mustMarshalJSON(m)) != string(mustMarshalJSON(mm)) {
t.Fatalf("mesurement not equal:\n%s\n%s", m, mm)
t.Fatalf("mesurement not equal:\n%v\n%v", m, mm)
} else if string(mustMarshalJSON(s)) != string(mustMarshalJSON(ss)) {
t.Fatalf("series not equal:\n%s\n%s", s, ss)
t.Fatalf("series not equal:\n%v\n%v", s, ss)
}
}
@ -150,7 +150,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
l := idx.SeriesIDs([]string{"cpu_load"}, nil)
r := []uint32{1, 2}
if !l.Equals(r) {
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
t.Fatalf("series IDs not the same:\n%d\n%d", l, r)
}
// now add another in a different measurement
@ -165,7 +165,7 @@ func TestDatabase_SeriesIDs(t *testing.T) {
l = idx.SeriesIDs([]string{"cpu_load"}, nil)
r = []uint32{1, 2, 3}
if !l.Equals(r) {
t.Fatalf("series IDs not the same:\n%s\n%s", l, r)
t.Fatalf("series IDs not the same:\n%d\n%d", l, r)
}
}
@ -563,7 +563,7 @@ func TestDatabase_SeriesIDsIntersect(t *testing.T) {
for i, tt := range tests {
a := SeriesIDs(tt.left).Intersect(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s intersect %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
t.Fatalf("%d: %v intersect %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}
@ -620,7 +620,7 @@ func TestDatabase_SeriesIDsUnion(t *testing.T) {
for i, tt := range tests {
a := SeriesIDs(tt.left).Union(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s union %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
t.Fatalf("%d: %v union %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}
@ -677,7 +677,7 @@ func TestDatabase_SeriesIDsReject(t *testing.T) {
for i, tt := range tests {
a := SeriesIDs(tt.left).Reject(tt.right)
if !a.Equals(tt.expected) {
t.Fatalf("%d: %s reject %s: result mismatch:\n exp=%s\n got=%s", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
t.Fatalf("%d: %v reject %v: result mismatch:\n exp=%v\n got=%v", i, SeriesIDs(tt.left), SeriesIDs(tt.right), SeriesIDs(tt.expected), SeriesIDs(a))
}
}
}

View File

@ -35,8 +35,8 @@ port = 8083 # binding is disabled if the port isn't set
# Configure the http api
[api]
port = 8086 # binding is disabled if the node is not a Data node.
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem
# ssl-port = 8084 # SSL support is enabled if you set a port and cert
# ssl-cert = "/path/to/cert.pem"
# connections will timeout after this amount of time. Ensures that clients that misbehave
# and keep alive connections they don't use won't end up connection a million times.
@ -45,14 +45,6 @@ read-timeout = "5s"
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = false
# address = "0.0.0.0" # If not set, is actually set to bind-address.
# port = 2003
# database = "" # store graphite data in this database
# udp_enabled = true # enable udp interface on the same port as the tcp interface
# Configure the collectd api
[input_plugins.collectd]
enabled = false
@ -77,6 +69,14 @@ read-timeout = "5s"
# port = 5551
# database = "db1"
# Configure the Graphite plugins.
[[graphite]] # 1 or more of these sections may be present.
enabled = false
# protocol = "" # Set to "tcp" or "udp"
# address = "0.0.0.0" # If not set, is actually set to bind-address.
# port = 2003
# database = "" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.

View File

@ -1,300 +0,0 @@
package influxdb
import (
"bufio"
"errors"
"io"
"net"
"strings"
"sync"
log "code.google.com/p/log4go"
)
var (
// ErrBindAddressRequired is returned when starting the GraphiteServer
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
// ErrGraphiteServerClosed return when closing an already closed graphite server.
ErrGraphiteServerClosed = errors.New("graphite server already closed")
)
// GraphiteListener provides a tcp and/or udp listener that you can
// use to ingest metrics into influxdb via the graphite protocol. it
// behaves as a carbon daemon, except:
//
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
type GraphiteServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The TCP address to listen on.
TCPAddr *net.TCPAddr
// The UDP address to listen on.
UDPAddr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The cluster admin authorized to insert the data.
User *User
}
// NewGraphiteServer returns an instance of GraphiteServer attached to a Server.
func NewGraphiteServer(server *Server) *GraphiteServer {
return &GraphiteServer{server: server}
}
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
func (s *GraphiteServer) ListenAndServe() error {
// Make sure we have either a TCP address or a UDP address.
// Also validate that there is an admin user to insert data as.
if s.TCPAddr == nil && s.UDPAddr == nil {
return ErrBindAddressRequired
} else if s.User != nil {
return ErrUserNotFound
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
// Open the TCP connection.
if s.TCPAddr != nil {
l, err := net.ListenTCP("tcp", s.TCPAddr)
if err != nil {
return err
}
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveTCP(l, done)
}
// Open the UDP connection.
if s.UDPAddr != nil {
l, err := net.ListenUDP("udp", s.UDPAddr)
if err != nil {
return err
}
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveUDP(l, done)
}
return nil
}
// serveTCP handles incoming TCP connection requests.
func (s *GraphiteServer) serveTCP(l *net.TCPListener, done chan struct{}) {
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
l.Close()
}()
// Listen for new TCP connections.
for {
c, err := l.Accept()
if err != nil {
// TODO(benbjohnson): Check for connection closed.
log.Error("GraphiteServer: Accept: ", err)
continue
}
s.wg.Add(1)
go s.handleTCPConn(c)
}
}
func (s *GraphiteServer) handleTCPConn(conn net.Conn) {
defer conn.Close()
defer s.wg.Done()
reader := bufio.NewReader(conn)
for {
err := s.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("GraphiteServer: Client closed graphite connection")
return
}
log.Error("GraphiteServer:", err)
}
}
}
// serveUDP handles incoming UDP messages.
func (s *GraphiteServer) serveUDP(conn *net.UDPConn, done chan struct{}) {
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
conn.Close()
}()
buf := make([]byte, 65536)
for {
// Read from connection.
n, _, err := conn.ReadFromUDP(buf)
if err == io.EOF {
return
} else if err != nil {
log.Warn("GraphiteServer: Error when reading from UDP connection %s", err.Error())
}
// Read in data in a separate goroutine.
s.wg.Add(1)
go s.handleUDPMessage(string(buf[:n]))
}
}
// handleUDPMessage splits a UDP packet by newlines and processes each message.
func (s *GraphiteServer) handleUDPMessage(msg string) {
defer s.wg.Done()
for _, metric := range strings.Split(msg, "\n") {
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
// Close shuts down the server's listeners.
func (s *GraphiteServer) Close() error {
// Notify other goroutines of shutdown.
s.mu.Lock()
if s.done == nil {
s.mu.Unlock()
return ErrGraphiteServerClosed
}
close(s.done)
s.done = nil
s.mu.Unlock()
// Wait for all goroutines to shutdown.
s.wg.Wait()
return nil
}
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (s *GraphiteServer) handleMessage(r *bufio.Reader) error {
panic("not yet implemented: GraphiteServer.handleMessage()")
/* TEMPORARILY REMOVED FOR PROTOBUFS.
// Decode graphic metric.
m, err := decodeGraphiteMetric(r)
if err != nil {
return err
}
// Convert metric to a field value.
v := &protocol.FieldValue{}
if m.isInt {
v.Int64Value = &m.integerValue
} else {
v.DoubleValue = &m.floatValue
}
// Use a single sequence number to make sure last write wins.
sn := uint64(1)
// Send data point to committer.
p := &protocol.Point{
Timestamp: &m.timestamp,
Values: []*protocol.FieldValue{v},
SequenceNumber: &sn,
}
// Write data to server.
series := &protocol.Series{
Name: &m.name,
Fields: []string{"value"},
Points: []*protocol.Point{p},
}
// TODO: Validate user.
// Look up database.
db := s.server.Database(s.Database)
if db == nil {
return ErrDatabaseNotFound
}
// Write series data to database.
if err := db.WriteSeries(series); err != nil {
return fmt.Errorf("write series data: %s", err)
}
return nil
}
type graphiteMetric struct {
name string
isInt bool
integerValue int64
floatValue float64
timestamp int64
}
// returns err == io.EOF when we hit EOF without any further data
func decodeGraphiteMetric(r *bufio.Reader) (*graphiteMetric, error) {
// Read up to the next newline.
buf, err := r.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
if str == "" {
return nil, err
}
// else we got EOF but also data, so just try to process it as valid data
}
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(str)
if len(fields) != 3 {
return nil, fmt.Errorf("received '%s' which doesn't have three fields", str)
}
// Create a metric.
m := &graphiteMetric{name: fields[0]}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, err
}
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
m.integerValue, m.isInt = int64(v), true
} else {
m.floatValue = v
}
// Parse timestamp.
timestamp, err := strconv.ParseUint(fields[2], 10, 32)
if err != nil {
return nil, err
}
m.timestamp = int64(timestamp) * int64(time.Millisecond)
return m, nil
*/
}

134
graphite/graphite.go Normal file
View File

@ -0,0 +1,134 @@
package graphite
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
)
const (
// DefaultGraphitePort represents the default Graphite (Carbon) plaintext port.
DefaultGraphitePort = 2003
// DefaultGraphiteNameSeparator represents the default Graphite field separator.
DefaultGraphiteNameSeparator = "."
)
var (
// ErrBindAddressRequired is returned when starting the Server
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
// ErrServerClosed return when closing an already closed graphite server.
ErrServerClosed = errors.New("server already closed")
// ErrDatabaseNotSpecified retuned when no database was specified in the config file
ErrDatabaseNotSpecified = errors.New("database was not specified in config")
// ErrServerNotSpecified returned when Server is not specified.
ErrServerNotSpecified = errors.New("server not present")
)
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
}
// Metric represents a metric as processed by the Graphite parser.
type Metric struct {
Name string
Tags map[string]string
Value interface{}
Timestamp time.Time
}
// Parser encapulates a Graphite Parser.
type Parser struct {
Separator string
LastEnabled bool
}
// NewParser returns a GraphiteParser instance.
func NewParser() *Parser {
return &Parser{Separator: DefaultGraphiteNameSeparator}
}
// Parse performs Graphite parsing of a single line.
func (p *Parser) Parse(line string) (*Metric, error) {
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(line)
if len(fields) != 3 {
return nil, fmt.Errorf("received %q which doesn't have three fields", line)
}
m := new(Metric)
// decode the name and tags
name, tags, err := p.DecodeNameAndTags(fields[0])
if err != nil {
return nil, err
}
m.Name = name
m.Tags = tags
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, err
}
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
m.Value = int64(v)
} else {
m.Value = v
}
// Parse timestamp.
unixTime, err := strconv.ParseInt(fields[2], 10, 64)
if err != nil {
return nil, err
}
m.Timestamp = time.Unix(0, unixTime*int64(time.Millisecond))
return m, nil
}
// DecodeNameAndTags parses the name and tags of a single field of a Graphite datum.
func (p *Parser) DecodeNameAndTags(field string) (string, map[string]string, error) {
var (
name string
tags = make(map[string]string)
)
// decode the name and tags
values := strings.Split(field, p.Separator)
if len(values)%2 != 1 {
// There should always be an odd number of fields to map a metric name and tags
// ex: region.us-west.hostname.server01.cpu -> tags -> region: us-west, hostname: server01, metric name -> cpu
return name, tags, fmt.Errorf("received %q which doesn't conform to format of key.value.key.value.metric or metric", field)
}
if p.LastEnabled {
name = values[len(values)-1]
values = values[0 : len(values)-1]
} else {
name = values[0]
values = values[1:len(values)]
}
if name == "" {
return name, tags, fmt.Errorf("no name specified for metric. %q", field)
}
// Grab the pairs and throw them in the map
for i := 0; i < len(values); i += 2 {
k := values[i]
v := values[i+1]
tags[k] = v
}
return name, tags, nil
}

80
graphite/graphite_tcp.go Normal file
View File

@ -0,0 +1,80 @@
package graphite
import (
"bufio"
"log"
"net"
"strings"
)
// TCPServer processes Graphite data received over TCP connections.
type TCPServer struct {
writer SeriesWriter
parser *Parser
Database string
}
// NewTCPServer returns a new instance of a TCPServer.
func NewTCPServer(p *Parser, w SeriesWriter) *TCPServer {
return &TCPServer{
parser: p,
writer: w,
}
}
// ListenAndServe instructs the TCPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port
func (t *TCPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if t.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
}
ln, err := net.Listen("tcp", iface)
if err != nil {
return err
}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Println("error accepting TCP connection", err.Error())
continue
}
go t.handleConnection(conn)
}
}()
return nil
}
// handleConnection services an individual TCP connection.
func (t *TCPServer) handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err != nil {
return
}
// Trim the buffer, even though there should be no padding
line := strings.TrimSpace(string(buf))
// Parse it.
metric, err := t.parser.Parse(line)
if err != nil {
continue
}
// Convert metric to a field value.
var values = make(map[string]interface{})
values[metric.Name] = metric.Value
// Send the data to database
t.writer.WriteSeries(t.Database, "", metric.Name, metric.Tags, metric.Timestamp, values)
}
}

250
graphite/graphite_test.go Normal file
View File

@ -0,0 +1,250 @@
package graphite_test
import (
"strconv"
"testing"
"time"
"github.com/influxdb/influxdb/graphite"
)
func Test_DecodeNameAndTags(t *testing.T) {
var tests = []struct {
test string
str string
name string
tags map[string]string
position string
separator string
err string
}{
{test: "metric only", str: "cpu", name: "cpu"},
{test: "metric with single series", str: "cpu.hostname.server01", name: "cpu", tags: map[string]string{"hostname": "server01"}},
{test: "metric with multiple series", str: "cpu.region.us-west.hostname.server01", name: "cpu", tags: map[string]string{"hostname": "server01", "region": "us-west"}},
{test: "no metric", tags: make(map[string]string), err: `no name specified for metric. ""`},
{test: "wrong metric format", str: "foo.cpu", tags: make(map[string]string), err: `received "foo.cpu" which doesn't conform to format of key.value.key.value.metric or metric`},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
p := graphite.NewParser()
if test.separator != "" {
p.Separator = test.separator
}
name, tags, err := p.DecodeNameAndTags(test.str)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if name != test.name {
t.Fatalf("name parse failer. expected %v, got %v", test.name, name)
}
if len(tags) != len(test.tags) {
t.Fatalf("unexpected number of tags. expected %d, got %d", len(test.tags), len(tags))
}
for k, v := range test.tags {
if tags[k] != v {
t.Fatalf("unexpected tag value for tags[%s]. expected %q, got %q", k, v, tags[k])
}
}
}
}
func Test_DecodeMetric(t *testing.T) {
testTime := time.Now()
epochTime := testTime.UnixNano() / 1000000 // nanos to milliseconds
strTime := strconv.FormatInt(epochTime, 10)
var tests = []struct {
test string
line string
name string
tags map[string]string
isInt bool
iv int64
fv float64
timestamp time.Time
position, separator string
err string
}{
{
test: "position first by default",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "position first if unable to determine",
position: "foo",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "position last if specified",
position: "last",
line: `foo.bar.cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "position first if specified with no series",
position: "first",
line: `cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "position last if specified with no series",
position: "last",
line: `cpu 50 ` + strTime,
name: "cpu",
tags: map[string]string{},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "sepeartor is . by default",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "sepeartor is . if specified",
separator: ".",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "sepeartor is - if specified",
separator: "-",
line: `cpu-foo-bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "sepeartor is boo if specified",
separator: "boo",
line: `cpuboofooboobar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "series + metric + integer value",
line: `cpu.foo.bar 50 ` + strTime,
name: "cpu",
tags: map[string]string{"foo": "bar"},
isInt: true,
iv: 50,
timestamp: testTime,
},
{
test: "metric only with float value",
line: `cpu 50.554 ` + strTime,
name: "cpu",
isInt: false,
fv: 50.554,
timestamp: testTime,
},
{
test: "missing metric",
line: `50.554 1419972457825`,
err: `received "50.554 1419972457825" which doesn't have three fields`,
},
{
test: "should fail on invalid key",
line: `foo.cpu 50.554 1419972457825`,
err: `received "foo.cpu" which doesn't conform to format of key.value.key.value.metric or metric`,
},
{
test: "should fail parsing invalid float",
line: `cpu 50.554z 1419972457825`,
err: `strconv.ParseFloat: parsing "50.554z": invalid syntax`,
},
{
test: "should fail parsing invalid int",
line: `cpu 50z 1419972457825`,
err: `strconv.ParseFloat: parsing "50z": invalid syntax`,
},
{
test: "should fail parsing invalid time",
line: `cpu 50.554 14199724z57825`,
err: `strconv.ParseInt: parsing "14199724z57825": invalid syntax`,
},
}
for _, test := range tests {
t.Logf("testing %q...", test.test)
p := graphite.NewParser()
if test.separator != "" {
p.Separator = test.separator
}
p.LastEnabled = (test.position == "last")
m, err := p.Parse(test.line)
if errstr(err) != test.err {
t.Fatalf("err does not match. expected %v, got %v", test.err, err)
}
if err != nil {
// If we erred out,it was intended and the following tests won't work
continue
}
if m.Name != test.name {
t.Fatalf("name parse failer. expected %v, got %v", test.name, m.Name)
}
if len(m.Tags) != len(test.tags) {
t.Fatalf("tags len mismatch. expected %d, got %d", len(test.tags), len(m.Tags))
}
if test.isInt {
i := m.Value.(int64)
if i != test.iv {
t.Fatalf("integerValue value mismatch. expected %v, got %v", test.iv, m.Value)
}
} else {
f := m.Value.(float64)
if m.Value != f {
t.Fatalf("floatValue value mismatch. expected %v, got %v", test.fv, f)
}
}
if m.Timestamp.UnixNano()/1000000 != test.timestamp.UnixNano()/1000000 {
t.Fatalf("timestamp value mismatch. expected %v, got %v", test.timestamp.UnixNano(), m.Timestamp.UnixNano())
}
}
}
// Test Helpers
func errstr(err error) string {
if err != nil {
return err.Error()
}
return ""
}

71
graphite/graphite_udp.go Normal file
View File

@ -0,0 +1,71 @@
package graphite
import (
"net"
"strings"
)
const (
udpBufferSize = 65536
)
// UDPerver processes Graphite data received via UDP.
type UDPServer struct {
writer SeriesWriter
parser *Parser
Database string
}
// NewUDPServer returns a new instance of a UDPServer
func NewUDPServer(p *Parser, w SeriesWriter) *UDPServer {
u := UDPServer{
parser: p,
writer: w,
}
return &u
}
// ListenAndServer instructs the UDPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port.
func (u *UDPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if u.Database == "" { // Make sure they have a database
return ErrDatabaseNotSpecified
}
addr, err := net.ResolveUDPAddr("udp", iface)
if err != nil {
return nil
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return err
}
buf := make([]byte, udpBufferSize)
go func() {
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
return
}
for _, line := range strings.Split(string(buf[:n]), "\n") {
m, err := u.parser.Parse(line)
if err != nil {
continue
}
// Convert metric to a field value.
var values = make(map[string]interface{})
values[m.Name] = m.Value
// Send the data to database
u.writer.WriteSeries(u.Database, "", m.Name, m.Tags, m.Timestamp, values)
}
}
}()
return nil
}

View File

@ -537,7 +537,7 @@ func TestHandler_CreateDataNode_InternalServerError(t *testing.T) {
status, body := MustHTTP("POST", s.URL+`/data_nodes`, `{"url":""}`)
if status != http.StatusInternalServerError {
t.Fatalf("unexpected status: %d", status, body)
t.Fatalf("unexpected status: %d, %s", status, body)
} else if body != `data node url required` {
t.Fatalf("unexpected body: %s", body)
}

291
influxql/INFLUXQL.md Normal file
View File

@ -0,0 +1,291 @@
# The Influx Query Language Specification
## Introduction
This is a reference for the Influx Query Language ("InfluxQL").
InfluxQL is a SQL-like query language for interacting with InfluxDB. It was lovingly crafted to feel familiar to those coming from other
SQL or SQL-like environments while providing features specific to storing
and analyzing time series data.
## Notation
This specification uses the same notation used by Google's Go programming language, which can be found at http://golang.org. The syntax is specified in Extended Backus-Naur Form ("EBNF"):
```
Production = production_name "=" [ Expression ] "." .
Expression = Alternative { "|" Alternative } .
Alternative = Term { Term } .
Term = production_name | token [ "…" token ] | Group | Option | Repetition .
Group = "(" Expression ")" .
Option = "[" Expression "]" .
Repetition = "{" Expression "}" .
```
Notation operators in order of increasing precedence:
```
| alternation
() grouping
[] option (0 or 1 times)
{} repetition (0 to n times)
```
## Characters & Digits
```
newline = /* the Unicode code point U+000A */ .
unicode_char = /* an arbitrary Unicode code point except newline */ .
ascii_letter = "A" .. "Z" | "a" .. "z" .
decimal_digit = "0" .. "9" .
```
## Database name
Database names are more limited than other identifiers because they appear in URLs.
```
db_name = ascii_letter { ascii_letter | decimal_digit | "_" | "-" } .
```
## Identifiers
```
identifier = unquoted_identifier | quoted_identifier .
unquoted_identifier = ascii_letter { ascii_letter | decimal_digit | "_" | "." } .
quoted_identifier = `"` unicode_char { unicode_char } `"` .
```
## Keywords
```
ALL ALTER AS ASC BEGIN
BY CREATE CONTINUOUS DATABASE DEFAULT
DELETE DESC DROP DURATION END
EXISTS EXPLAIN FIELD FROM GRANT
GROUP IF INNER INSERT INTO
KEYS LIMIT LIST MEASUREMENT MEASUREMENTS
ON ORDER PASSWORD POLICY PRIVILEGES
QUERIES QUERY READ REPLICATION RETENTION
EVOKE SELECT SERIES TAG TO
USER VALUES WHERE WITH WRITE
```
## Literals
### Numbers
```
int_lit = decimal_lit .
decimal_lit = ( "1" .. "9" ) { decimal_digit } .
float_lit = decimals "." decimals .
decimals = decimal_digit { decimal_digit } .
```
### Strings
```
string_lit = '"' { unicode_char } '"' .
```
### Durations
```
duration_lit = decimals duration_unit .
duration_unit = "u" | "µ" | "s" | "h" | "d" | "w" | "ms" .
```
## Queries
A query is composed of one or more statements separated by a semicolon.
```
query = statement { ; statement } .
statement = alter_retention_policy_stmt |
create_continuous_query_stmt |
create_database_stmt |
create_retention_policy_stmt |
create_user_stmt |
delete_stmt |
drop_continuous_query_stmt |
drop_database_stmt |
drop_series_stmt |
drop_user_stmt |
grant_stmt |
list_continuous_queries_stmt |
list_databases_stmt |
list_field_key_stmt |
list_field_value_stmt |
list_measurements_stmt |
list_series_stmt |
list_tag_key_stmt |
list_tag_value_stmt |
revoke_stmt |
select_stmt .
```
## Statements
### ALTER RETENTION POLICY
```
alter_retention_policy_stmt = "ALTER RETENTION POLICY" policy_name "ON"
db_name retention_policy_option
[ retention_policy_option ]
[ retention_policy_option ] .
policy_name = identifier .
retention_policy_option = retention_policy_duration |
retention_policy_replication |
"DEFAULT" .
retention_policy_duration = "DURATION" duration_lit .
retention_policy_replication = "REPLICATION" int_lit
```
#### Examples:
```sql
-- Set default retention policy for mydb to 1h.cpu.
ALTER RETENTION POLICY "1h.cpu" ON mydb DEFAULT;
-- Change duration and replication factor.
ALTER RETENTION POLICY policy1 ON somedb DURATION 1h REPLICATION 4
```
### CREATE CONTINUOUS QUERY
```
create_continuous_query_stmt = "CREATE CONTINUOUS QUERY" query_name "ON" db_name
"BEGIN" select_stmt "END" .
query_name = identifier .
```
#### Examples:
```sql
CREATE CONTINUOUS QUERY 10m_event_count
ON db_name
BEGIN
SELECT count(value)
INTO 10m.events
FROM events
GROUP BY time(10m)
END;
-- this selects from the output of one continuous query and outputs to another series
CREATE CONTINUOUS QUERY 1h_event_count
ON db_name
BEGIN
SELECT sum(count) as count
INTO 1h.events
FROM events
GROUP BY time(1h)
END;
```
### CREATE DATABASE
```
create_database_stmt = "CREATE DATABASE" db_name
```
#### Example:
```sql
CREATE DATABASE foo
```
### CREATE RETENTION POLICY
```
create_retention_policy_stmt = "CREATE RETENTION POLICY" policy_name "ON"
db_name retention_policy_duration
retention_policy_replication
[ "DEFAULT" ] .
```
#### Examples
```sql
-- Create a retention policy.
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2;
-- Create a retention policy and set it as the default.
CREATE RETENTION POLICY "10m.events" ON somedb DURATION 10m REPLICATION 2 DEFAULT;
```
### CREATE USER
```
create_user_stmt = "CREATE USER" user_name "WITH PASSWORD" password
[ "WITH ALL PRIVILEGES" ] .
```
#### Examples:
```sql
-- Create a normal database user.
CREATE USER jdoe WITH PASSWORD "1337password";
-- Create a cluster admin.
-- Note: Unlike the GRANT statement, the "PRIVILEGES" keyword is required here.
CREATE USER jdoe WITH PASSWORD "1337password" WITH ALL PRIVILEGES;
```
### DELETE
```
delete_stmt = "DELETE" from_clause where_clause .
```
#### Example:
```sql
DELETE FROM cpu WHERE region = 'uswest'
```
### GRANT
```
grant_stmt = "GRANT" privilege [ on_clause ] to_clause
```
#### Examples:
```sql
-- grant cluster admin privileges
GRANT ALL TO jdoe;
-- grant read access to a database
GRANT READ ON mydb TO jdoe;
```
## Clauses
```
from_clause = "FROM" measurements .
where_clause = "WHERE" expr .
on_clause = db_name .
to_clause = user_name .
```
## Other
```
expr =
measurements =
user_name = identifier .
password = identifier .
privilege = "ALL" [ "PRIVILEGES" ] | "READ" | "WRITE" .
```

View File

@ -47,47 +47,49 @@ type Node interface {
func (_ *Query) node() {}
func (_ Statements) node() {}
func (_ *SelectStatement) node() {}
func (_ *DeleteStatement) node() {}
func (_ *ListSeriesStatement) node() {}
func (_ *ListMeasurementsStatement) node() {}
func (_ *ListTagKeysStatement) node() {}
func (_ *ListTagValuesStatement) node() {}
func (_ *ListFieldKeysStatement) node() {}
func (_ *ListFieldValuesStatement) node() {}
func (_ *ListContinuousQueriesStatement) node() {}
func (_ *DropSeriesStatement) node() {}
func (_ *DropContinuousQueryStatement) node() {}
func (_ *DropDatabaseStatement) node() {}
func (_ *DropUserStatement) node() {}
func (_ *AlterRetentionPolicyStatement) node() {}
func (_ *CreateContinuousQueryStatement) node() {}
func (_ *CreateDatabaseStatement) node() {}
func (_ *CreateUserStatement) node() {}
func (_ *CreateRetentionPolicyStatement) node() {}
func (_ *CreateUserStatement) node() {}
func (_ *DeleteStatement) node() {}
func (_ *DropContinuousQueryStatement) node() {}
func (_ *DropDatabaseStatement) node() {}
func (_ *DropSeriesStatement) node() {}
func (_ *DropUserStatement) node() {}
func (_ *GrantStatement) node() {}
func (_ *ListContinuousQueriesStatement) node() {}
func (_ *ListDatabasesStatement) node() {}
func (_ *ListFieldKeysStatement) node() {}
func (_ *ListFieldValuesStatement) node() {}
func (_ *ListMeasurementsStatement) node() {}
func (_ *ListSeriesStatement) node() {}
func (_ *ListTagKeysStatement) node() {}
func (_ *ListTagValuesStatement) node() {}
func (_ *RevokeStatement) node() {}
func (_ *AlterRetentionPolicyStatement) node() {}
func (_ *SelectStatement) node() {}
func (_ Fields) node() {}
func (_ *Field) node() {}
func (_ Dimensions) node() {}
func (_ *BinaryExpr) node() {}
func (_ *BooleanLiteral) node() {}
func (_ *Call) node() {}
func (_ *Dimension) node() {}
func (_ Dimensions) node() {}
func (_ *DurationLiteral) node() {}
func (_ *Field) node() {}
func (_ Fields) node() {}
func (_ *Join) node() {}
func (_ *Measurement) node() {}
func (_ Measurements) node() {}
func (_ *Join) node() {}
func (_ *Merge) node() {}
func (_ *VarRef) node() {}
func (_ *Call) node() {}
func (_ *NumberLiteral) node() {}
func (_ *StringLiteral) node() {}
func (_ *BooleanLiteral) node() {}
func (_ *TimeLiteral) node() {}
func (_ *DurationLiteral) node() {}
func (_ *BinaryExpr) node() {}
func (_ *ParenExpr) node() {}
func (_ *Wildcard) node() {}
func (_ SortFields) node() {}
func (_ *SortField) node() {}
func (_ SortFields) node() {}
func (_ *StringLiteral) node() {}
func (_ *Target) node() {}
func (_ *TimeLiteral) node() {}
func (_ *VarRef) node() {}
func (_ *Wildcard) node() {}
// Query represents a collection of ordered statements.
type Query struct {
@ -115,26 +117,27 @@ type Statement interface {
stmt()
}
func (_ *SelectStatement) stmt() {}
func (_ *DeleteStatement) stmt() {}
func (_ *ListSeriesStatement) stmt() {}
func (_ *DropSeriesStatement) stmt() {}
func (_ *ListContinuousQueriesStatement) stmt() {}
func (_ *AlterRetentionPolicyStatement) stmt() {}
func (_ *CreateContinuousQueryStatement) stmt() {}
func (_ *CreateDatabaseStatement) stmt() {}
func (_ *CreateRetentionPolicyStatement) stmt() {}
func (_ *CreateUserStatement) stmt() {}
func (_ *DeleteStatement) stmt() {}
func (_ *DropContinuousQueryStatement) stmt() {}
func (_ *ListMeasurementsStatement) stmt() {}
func (_ *ListTagKeysStatement) stmt() {}
func (_ *ListTagValuesStatement) stmt() {}
func (_ *DropDatabaseStatement) stmt() {}
func (_ *DropSeriesStatement) stmt() {}
func (_ *DropUserStatement) stmt() {}
func (_ *GrantStatement) stmt() {}
func (_ *ListContinuousQueriesStatement) stmt() {}
func (_ *ListDatabasesStatement) stmt() {}
func (_ *ListFieldKeysStatement) stmt() {}
func (_ *ListFieldValuesStatement) stmt() {}
func (_ *CreateDatabaseStatement) stmt() {}
func (_ *CreateUserStatement) stmt() {}
func (_ *GrantStatement) stmt() {}
func (_ *ListMeasurementsStatement) stmt() {}
func (_ *ListSeriesStatement) stmt() {}
func (_ *ListTagKeysStatement) stmt() {}
func (_ *ListTagValuesStatement) stmt() {}
func (_ *RevokeStatement) stmt() {}
func (_ *CreateRetentionPolicyStatement) stmt() {}
func (_ *DropDatabaseStatement) stmt() {}
func (_ *DropUserStatement) stmt() {}
func (_ *AlterRetentionPolicyStatement) stmt() {}
func (_ *SelectStatement) stmt() {}
// Expr represents an expression that can be evaluated to a value.
type Expr interface {
@ -142,15 +145,15 @@ type Expr interface {
expr()
}
func (_ *VarRef) expr() {}
func (_ *Call) expr() {}
func (_ *NumberLiteral) expr() {}
func (_ *StringLiteral) expr() {}
func (_ *BooleanLiteral) expr() {}
func (_ *TimeLiteral) expr() {}
func (_ *DurationLiteral) expr() {}
func (_ *BinaryExpr) expr() {}
func (_ *BooleanLiteral) expr() {}
func (_ *Call) expr() {}
func (_ *DurationLiteral) expr() {}
func (_ *NumberLiteral) expr() {}
func (_ *ParenExpr) expr() {}
func (_ *StringLiteral) expr() {}
func (_ *TimeLiteral) expr() {}
func (_ *VarRef) expr() {}
func (_ *Wildcard) expr() {}
// Source represents a source of data for a statement.
@ -159,8 +162,8 @@ type Source interface {
source()
}
func (_ *Measurement) source() {}
func (_ *Join) source() {}
func (_ *Measurement) source() {}
func (_ *Merge) source() {}
// SortField represens a field to sort results by.
@ -228,6 +231,9 @@ type CreateUserStatement struct {
// User's password
Password string
// User's privilege level.
Privilege *Privilege
}
// String returns a string representation of the create user statement.
@ -237,6 +243,12 @@ func (s *CreateUserStatement) String() string {
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" WITH PASSWORD ")
_, _ = buf.WriteString(s.Password)
if s.Privilege != nil {
_, _ = buf.WriteString(" WITH ")
_, _ = buf.WriteString(s.Privilege.String())
}
return buf.String()
}
@ -263,6 +275,9 @@ const (
AllPrivileges
)
// NewPrivilege returns an initialized *Privilege.
func NewPrivilege(p Privilege) *Privilege { return &p }
// String returns a string representation of a Privilege.
func (p Privilege) String() string {
switch p {
@ -334,7 +349,7 @@ type CreateRetentionPolicyStatement struct {
Name string
// Name of database this policy belongs to.
DB string
Database string
// Duration data written to this policy will be retained.
Duration time.Duration
@ -352,7 +367,7 @@ func (s *CreateRetentionPolicyStatement) String() string {
_, _ = buf.WriteString("CREATE RETENTION POLICY ")
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.DB)
_, _ = buf.WriteString(s.Database)
_, _ = buf.WriteString(" DURATION ")
_, _ = buf.WriteString(FormatDuration(s.Duration))
_, _ = buf.WriteString(" REPLICATION ")
@ -369,7 +384,7 @@ type AlterRetentionPolicyStatement struct {
Name string
// Name of the database this policy belongs to.
DB string
Database string
// Duration data written to this policy will be retained.
Duration *time.Duration
@ -387,7 +402,7 @@ func (s *AlterRetentionPolicyStatement) String() string {
_, _ = buf.WriteString("ALTER RETENTION POLICY ")
_, _ = buf.WriteString(s.Name)
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(s.DB)
_, _ = buf.WriteString(s.Database)
if s.Duration != nil {
_, _ = buf.WriteString(" DURATION ")
@ -411,6 +426,9 @@ type SelectStatement struct {
// Expressions returned from the selection.
Fields Fields
// Target (destination) for the result of the select.
Target *Target
// Expressions used for grouping the selection.
Dimensions Dimensions
@ -433,6 +451,11 @@ func (s *SelectStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("SELECT ")
_, _ = buf.WriteString(s.Fields.String())
if s.Target != nil {
_, _ = buf.WriteString(" ")
_, _ = buf.WriteString(s.Target.String())
}
_, _ = buf.WriteString(" FROM ")
_, _ = buf.WriteString(s.Source.String())
if s.Condition != nil {
@ -591,6 +614,38 @@ func MatchSource(src Source, name string) string {
return ""
}
// Target represents a target (destination) policy, measurment, and DB.
type Target struct {
// Retention policy to write into.
RetentionPolicy string
// Measurement to write into.
Measurement string
// Database to write into.
Database string
}
// String returns a string representation of the Target.
func (t *Target) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("INTO ")
if t.RetentionPolicy != "" {
_, _ = buf.WriteString(t.RetentionPolicy)
_, _ = buf.WriteString(".")
}
_, _ = buf.WriteString(t.Measurement)
if t.Database != "" {
_, _ = buf.WriteString(" ON ")
_, _ = buf.WriteString(t.Database)
}
return buf.String()
}
// DeleteStatement represents a command for removing data from the database.
type DeleteStatement struct {
// Data source that values are removed from.
@ -659,16 +714,27 @@ type ListContinuousQueriesStatement struct{}
// String returns a string representation of the list continuous queries statement.
func (s *ListContinuousQueriesStatement) String() string { return "LIST CONTINUOUS QUERIES" }
// ListDatabasesStatement represents a command for listing all databases in the cluster.
type ListDatabasesStatement struct{}
// String returns a string representation of the list databases command.
func (s *ListDatabasesStatement) String() string { return "LIST DATABASES" }
// CreateContinuousQueriesStatement represents a command for creating a continuous query.
type CreateContinuousQueryStatement struct {
Name string
// Name of the continuous query to be created.
Name string
// Name of the database to create the continuous query on.
Database string
// Source of data (SELECT statement).
Source *SelectStatement
Target string
}
// String returns a string representation of the statement.
func (s *CreateContinuousQueryStatement) String() string {
return fmt.Sprintf("CREATE CONTINUOUS QUERY %s AS %s INTO %s", s.Name, s.Source.String(), s.Target)
return fmt.Sprintf("CREATE CONTINUOUS QUERY %s ON %s BEGIN %s END", s.Name, s.Database, s.Source.String())
}
// DropContinuousQueriesStatement represents a command for removing a continuous query.

View File

@ -65,7 +65,7 @@ func (p *Parser) ParseStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
switch tok {
case SELECT:
return p.parseSelectStatement()
return p.parseSelectStatement(targetNotRequired)
case DELETE:
return p.parseDeleteStatement()
case LIST:
@ -93,6 +93,8 @@ func (p *Parser) parseListStatement() (Statement, error) {
return p.parseListSeriesStatement()
} else if tok == CONTINUOUS {
return p.parseListContinuousQueriesStatement()
} else if tok == DATABASES {
return p.parseListDatabasesStatement()
} else if tok == MEASUREMENTS {
return p.parseListMeasurementsStatement()
} else if tok == TAG {
@ -190,7 +192,7 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt
if err != nil {
return nil, err
}
stmt.DB = ident
stmt.Database = ident
// Parse required DURATION token.
tok, pos, lit := p.scanIgnoreWhitespace()
@ -249,9 +251,9 @@ func (p *Parser) parseAlterRetentionPolicyStatement() (*AlterRetentionPolicyStat
if err != nil {
return nil, err
}
stmt.DB = ident
stmt.Database = ident
// Loop through option tokens (DURATION, RETENTION, DEFAULT, etc.).
// Loop through option tokens (DURATION, REPLICATION, DEFAULT, etc.).
maxNumOptions := 3
Loop:
for i := 0; i < maxNumOptions; i++ {
@ -442,7 +444,7 @@ func (p *Parser) parsePrivilege() (Privilege, error) {
// parseSelectStatement parses a select string and returns a Statement AST object.
// This function assumes the SELECT token has already been consumed.
func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, error) {
stmt := &SelectStatement{}
// Parse fields: "SELECT FIELD+".
@ -452,6 +454,14 @@ func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
}
stmt.Fields = fields
// Parse target: "INTO"
target, err := p.parseTarget(tr)
if err != nil {
return nil, err
} else if target != nil {
stmt.Target = target
}
// Parse source.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
@ -493,6 +503,63 @@ func (p *Parser) parseSelectStatement() (*SelectStatement, error) {
return stmt, nil
}
// targetRequirement specifies whether or not a target clause is required.
type targetRequirement int
const (
targetRequired targetRequirement = iota
targetNotRequired
)
// parseTarget parses a string and returns a Target.
func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != INTO {
if tr == targetRequired {
return nil, newParseError(tokstr(tok, lit), []string{"INTO"}, pos)
}
p.unscan()
return nil, nil
}
// Parse identifier. Could be policy or measurement name.
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
target := &Target{}
tok, _, _ := p.scanIgnoreWhitespace()
if tok == DOT {
// Previous identifier was retention policy name.
target.RetentionPolicy = ident
// Parse required measurement.
ident, err = p.parseIdentifier()
if err != nil {
return nil, err
}
} else {
p.unscan()
}
target.Measurement = ident
// Parse optional ON.
if tok, _, _ := p.scanIgnoreWhitespace(); tok != ON {
p.unscan()
return target, nil
}
// Found an ON token so parse required identifier.
if ident, err = p.parseIdentifier(); err != nil {
return nil, err
}
target.Database = ident
return target, nil
}
// parseDeleteStatement parses a delete string and returns a DeleteStatement.
// This function assumes the DELETE token has already been consumed.
func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
@ -760,6 +827,13 @@ func (p *Parser) parseListContinuousQueriesStatement() (*ListContinuousQueriesSt
return stmt, nil
}
// parseListDatabasesStatement parses a string and returns a ListDatabasesStatement.
// This function assumes the "LIST DATABASE" tokens have already been consumed.
func (p *Parser) parseListDatabasesStatement() (*ListDatabasesStatement, error) {
stmt := &ListDatabasesStatement{}
return stmt, nil
}
// parseCreateContinuousQueriesStatement parses a string and returns a CreateContinuousQueryStatement.
// This function assumes the "CREATE CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQueryStatement, error) {
@ -771,39 +845,40 @@ func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQuerySt
}
// Read the id of the query to create.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.Name = lit
stmt.Name = ident
// Expect an "AS SELECT" keyword.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != AS {
return nil, newParseError(tokstr(tok, lit), []string{"AS"}, pos)
// Expect an "ON" keyword.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != ON {
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != SELECT {
return nil, newParseError(tokstr(tok, lit), []string{"SELECT"}, pos)
// Read the name of the database to create the query on.
if ident, err = p.parseIdentifier(); err != nil {
return nil, err
}
stmt.Database = ident
// Expect a "BEGIN SELECT" tokens.
if err := p.parseTokens([]Token{BEGIN, SELECT}); err != nil {
return nil, err
}
// Read the select statement to be used as the source.
source, err := p.parseSelectStatement()
source, err := p.parseSelectStatement(targetRequired)
if err != nil {
return nil, err
}
stmt.Source = source
// Expect an INTO keyword.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != INTO {
return nil, newParseError(tokstr(tok, lit), []string{"INTO"}, pos)
// Expect a "END" keyword.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != END {
return nil, newParseError(tokstr(tok, lit), []string{"END"}, pos)
}
// Read the target of the query.
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
}
stmt.Target = lit
return stmt, nil
}
@ -843,11 +918,11 @@ func (p *Parser) parseCreateUserStatement() (*CreateUserStatement, error) {
stmt := &CreateUserStatement{}
// Parse name of the user to be created.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.Name = lit
stmt.Name = ident
// Consume "WITH PASSWORD" tokens
if err := p.parseTokens([]Token{WITH, PASSWORD}); err != nil {
@ -855,11 +930,23 @@ func (p *Parser) parseCreateUserStatement() (*CreateUserStatement, error) {
}
// Parse new user's password
tok, pos, lit = p.scanIgnoreWhitespace()
if tok != IDENT && tok != STRING {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "string"}, pos)
if ident, err = p.parseIdentifier(); err != nil {
return nil, err
}
stmt.Password = lit
stmt.Password = ident
// Check for option WITH clause.
if tok, _, _ := p.scanIgnoreWhitespace(); tok != WITH {
p.unscan()
return stmt, nil
}
// We only allow granting of "ALL PRIVILEGES" during CREATE USER.
// All other privileges must be granted using a GRANT statement.
if err := p.parseTokens([]Token{ALL, PRIVILEGES}); err != nil {
return nil, err
}
stmt.Privilege = NewPrivilege(AllPrivileges)
return stmt, nil
}

View File

@ -147,6 +147,12 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// LIST DATABASES
{
s: `LIST DATABASES`,
stmt: &influxql.ListDatabasesStatement{},
},
// LIST SERIES statement
{
s: `LIST SERIES`,
@ -277,16 +283,34 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.ListContinuousQueriesStatement{},
},
// CREATE CONTINUOUS QUERY statement
// CREATE CONTINUOUS QUERY ... INTO <measurement>
{
s: `CREATE CONTINUOUS QUERY myquery AS SELECT count() FROM myseries INTO foo`,
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO measure1 FROM myseries END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
Target: &influxql.Target{Measurement: "measure1"},
Source: &influxql.Measurement{Name: "myseries"},
},
},
},
// CREATE CONTINUOUS QUERY ... INTO <retention-policy>.<measurement>
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO "1h.policy1"."cpu.load" FROM myseries END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: influxql.Fields{&influxql.Field{Expr: &influxql.Call{Name: "count"}}},
Target: &influxql.Target{
RetentionPolicy: "1h.policy1",
Measurement: "cpu.load",
},
Source: &influxql.Measurement{Name: "myseries"},
},
Target: "foo",
},
},
@ -307,6 +331,16 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// CREATE USER ... WITH ALL PRIVILEGES
{
s: `CREATE USER testuser WITH PASSWORD pwd1337 WITH ALL PRIVILEGES`,
stmt: &influxql.CreateUserStatement{
Name: "testuser",
Password: "pwd1337",
Privilege: influxql.NewPrivilege(influxql.AllPrivileges),
},
},
// DROP CONTINUOUS QUERY statement
{
s: `DROP CONTINUOUS QUERY myquery`,
@ -428,7 +462,7 @@ func TestParser_ParseStatement(t *testing.T) {
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 2`,
stmt: &influxql.CreateRetentionPolicyStatement{
Name: "policy1",
DB: "testdb",
Database: "testdb",
Duration: time.Hour,
Replication: 2,
},
@ -439,7 +473,7 @@ func TestParser_ParseStatement(t *testing.T) {
s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 2m REPLICATION 4 DEFAULT`,
stmt: &influxql.CreateRetentionPolicyStatement{
Name: "policy1",
DB: "testdb",
Database: "testdb",
Duration: 2 * time.Minute,
Replication: 4,
Default: true,
@ -506,6 +540,10 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DROP DATABASE`, err: `found EOF, expected identifier at line 1, char 15`},
{s: `DROP USER`, err: `found EOF, expected identifier at line 1, char 11`},
{s: `CREATE USER testuser`, err: `found EOF, expected WITH at line 1, char 22`},
{s: `CREATE USER testuser WITH`, err: `found EOF, expected PASSWORD at line 1, char 27`},
{s: `CREATE USER testuser WITH PASSWORD`, err: `found EOF, expected identifier at line 1, char 36`},
{s: `CREATE USER testuser WITH PASSWORD "pwd" WITH`, err: `found EOF, expected ALL at line 1, char 47`},
{s: `CREATE USER testuser WITH PASSWORD "pwd" WITH ALL`, err: `found EOF, expected PRIVILEGES at line 1, char 51`},
{s: `GRANT`, err: `found EOF, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
{s: `GRANT BOGUS`, err: `found BOGUS, expected READ, WRITE, ALL [PRIVILEGES] at line 1, char 7`},
{s: `GRANT READ`, err: `found EOF, expected ON at line 1, char 12`},
@ -545,6 +583,10 @@ func TestParser_ParseStatement(t *testing.T) {
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
exp := tt.stmt.(*influxql.CreateContinuousQueryStatement).Source.Target
got := stmt.(*influxql.CreateContinuousQueryStatement).Source.Target
t.Errorf("exp.String() = %#v\n", *exp)
t.Errorf("got.String() = %#v\n", *got)
}
}
}
@ -833,9 +875,9 @@ func errstring(err error) string {
// newAlterRetentionPolicyStatement creates an initialized AlterRetentionPolicyStatement.
func newAlterRetentionPolicyStatement(name string, DB string, d time.Duration, replication int, dfault bool) *influxql.AlterRetentionPolicyStatement {
stmt := &influxql.AlterRetentionPolicyStatement{
Name: name,
DB: DB,
Default: dfault,
Name: name,
Database: DB,
Default: dfault,
}
if d > -1 {

View File

@ -42,7 +42,14 @@ func (s *Scanner) Scan() (tok Token, pos Pos, lit string) {
return EOF, pos, ""
case '"', '\'':
return s.scanString()
case '.', '+', '-':
case '.':
ch1, _ := s.r.read()
s.r.unread()
if isDigit(ch1) {
return s.scanNumber()
}
return DOT, pos, ""
case '+', '-':
return s.scanNumber()
case '*':
return MUL, pos, ""
@ -233,7 +240,6 @@ func (s *Scanner) scanNumber() (tok Token, pos Pos, lit string) {
}
s.r.unread()
}
return NUMBER, pos, buf.String()
}

View File

@ -54,6 +54,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `)`, tok: influxql.RPAREN},
{s: `,`, tok: influxql.COMMA},
{s: `;`, tok: influxql.SEMICOLON},
{s: `.`, tok: influxql.DOT},
// Identifiers
{s: `foo`, tok: influxql.IDENT, lit: `foo`},
@ -79,7 +80,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `.23`, tok: influxql.NUMBER, lit: `.23`},
{s: `+.23`, tok: influxql.NUMBER, lit: `+.23`},
{s: `-.23`, tok: influxql.NUMBER, lit: `-.23`},
{s: `.`, tok: influxql.ILLEGAL, lit: `.`},
//{s: `.`, tok: influxql.ILLEGAL, lit: `.`},
{s: `-.`, tok: influxql.SUB, lit: ``},
{s: `+.`, tok: influxql.ADD, lit: ``},
{s: `10.3s`, tok: influxql.NUMBER, lit: `10.3`},
@ -100,15 +101,18 @@ func TestScanner_Scan(t *testing.T) {
{s: `ALTER`, tok: influxql.ALTER},
{s: `AS`, tok: influxql.AS},
{s: `ASC`, tok: influxql.ASC},
{s: `BEGIN`, tok: influxql.BEGIN},
{s: `BY`, tok: influxql.BY},
{s: `CREATE`, tok: influxql.CREATE},
{s: `CONTINUOUS`, tok: influxql.CONTINUOUS},
{s: `DATABASE`, tok: influxql.DATABASE},
{s: `DATABASES`, tok: influxql.DATABASES},
{s: `DEFAULT`, tok: influxql.DEFAULT},
{s: `DELETE`, tok: influxql.DELETE},
{s: `DESC`, tok: influxql.DESC},
{s: `DROP`, tok: influxql.DROP},
{s: `DURATION`, tok: influxql.DURATION},
{s: `END`, tok: influxql.END},
{s: `EXISTS`, tok: influxql.EXISTS},
{s: `EXPLAIN`, tok: influxql.EXPLAIN},
{s: `FIELD`, tok: influxql.FIELD},

View File

@ -47,6 +47,7 @@ const (
RPAREN // )
COMMA // ,
SEMICOLON // ;
DOT // .
keyword_beg
// Keywords
@ -54,15 +55,18 @@ const (
ALTER
AS
ASC
BEGIN
BY
CREATE
CONTINUOUS
DATABASE
DATABASES
DEFAULT
DELETE
DESC
DROP
DURATION
END
EXISTS
EXPLAIN
FIELD
@ -132,20 +136,24 @@ var tokens = [...]string{
RPAREN: ")",
COMMA: ",",
SEMICOLON: ";",
DOT: ".",
ALL: "ALL",
ALTER: "ALTER",
AS: "AS",
ASC: "ASC",
BEGIN: "BEGIN",
BY: "BY",
CREATE: "CREATE",
CONTINUOUS: "CONTINUOUS",
DATABASE: "DATABASE",
DATABASES: "DATABASES",
DEFAULT: "DEFAULT",
DELETE: "DELETE",
DESC: "DESC",
DROP: "DROP",
DURATION: "DURATION",
END: "END",
EXISTS: "EXISTS",
EXPLAIN: "EXPLAIN",
FIELD: "FIELD",

View File

@ -355,10 +355,6 @@ type brokerFSM Broker
func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
b := (*Broker)(fsm)
// Save highest applied index.
// TODO: Persist to disk for raft commands.
b.index = e.Index
// Create a message with the same index as Raft.
m := &Message{}
@ -394,6 +390,10 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) {
if err := t.encode(m); err != nil {
panic("encode: " + err.Error())
}
// Save highest applied index.
// TODO: Persist to disk for raft commands.
b.index = e.Index
}
// Index returns the highest index that the broker has seen.
@ -410,14 +410,14 @@ func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
// Calculate header under lock.
b.mu.RLock()
s, err := fsm.createSnapshot()
hdr, err := fsm.createSnapshotHeader()
b.mu.RUnlock()
if err != nil {
return 0, fmt.Errorf("create snapshot: %s", err)
}
// Encode snapshot header.
buf, err := json.Marshal(&s)
buf, err := json.Marshal(&hdr)
if err != nil {
return 0, fmt.Errorf("encode snapshot header: %s", err)
}
@ -431,22 +431,22 @@ func (fsm *brokerFSM) Snapshot(w io.Writer) (uint64, error) {
}
// Stream each topic sequentially.
for _, t := range s.Topics {
for _, t := range hdr.Topics {
if _, err := copyFileN(w, t.path, t.Size); err != nil {
return 0, err
}
}
// Return the snapshot and its current index.
return s.maxIndex(), nil
// Return the snapshot and its last applied index.
return hdr.maxIndex(), nil
}
// createSnapshot creates a snapshot header.
func (fsm *brokerFSM) createSnapshot() (*snapshot, error) {
// createSnapshotHeader creates a snapshot header.
func (fsm *brokerFSM) createSnapshotHeader() (*snapshotHeader, error) {
b := (*Broker)(fsm)
// Create parent header.
s := &snapshot{}
s := &snapshotHeader{}
// Append topics.
for _, t := range b.topics {
@ -500,7 +500,7 @@ func (fsm *brokerFSM) Restore(r io.Reader) error {
}
// Decode header.
s := &snapshot{}
s := &snapshotHeader{}
if err := json.Unmarshal(buf, &s); err != nil {
return fmt.Errorf("decode header: %s", err)
}
@ -558,14 +558,14 @@ func copyFileN(w io.Writer, path string, n int64) (int64, error) {
return io.CopyN(w, f, n)
}
// snapshot represents the header of a snapshot.
type snapshot struct {
// snapshotHeader represents the header of a snapshot.
type snapshotHeader struct {
Replicas []*snapshotReplica `json:"replicas"`
Topics []*snapshotTopic `json:"topics"`
}
// maxIndex returns the highest index across all topics.
func (s *snapshot) maxIndex() uint64 {
// maxIndex returns the highest applied index across all topics.
func (s *snapshotHeader) maxIndex() uint64 {
var idx uint64
for _, t := range s.Topics {
if t.Index > idx {

View File

@ -1211,6 +1211,15 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
return err
}
// If the retention policy is not set, use the default for this database.
if retentionPolicy == "" {
rp, err := s.DefaultRetentionPolicy(database)
if err != nil {
return fmt.Errorf("failed to determine default retention policy: %s", err.Error())
}
retentionPolicy = rp.Name
}
// Retrieve measurement.
m, err := s.measurement(database, name)
if err != nil {