Basic query engine integration.

pull/1330/head
Ben Johnson 2015-01-14 16:44:09 -07:00
commit 0e2aae61e3
24 changed files with 1240 additions and 62 deletions

View File

@ -7,7 +7,7 @@ install:
- go get -d -v ./... && go build -v ./...
- go get -u golang.org/x/tools/cmd/vet;
script: go test -v . ./messaging ./influxql; go tool vet -methods=false .
script: go test -v . ./messaging ./influxql; go tool vet .
notifications:
email:

View File

@ -70,6 +70,9 @@ DROP MEASUREMENT cpu WHERE region = 'uswest'
List series queries are for pulling out individual series from measurement names and tag data. They're useful for discovery.
```sql
-- list all databases
LIST DATABASES
-- list measurement names
LIST MEASUREMENTS
LIST MEASUREMENTS WHERE service = 'redis'
@ -82,6 +85,9 @@ LIST SERIES WHERE region = 'uswest'
LIST SERIES FROM cpu_load WHERE region = 'uswest' LIMIT 10
-- list all retention policies on a database
LIST RETENTION POLICIES mydb
-- get a list of all tag keys across all measurements
LIST TAG KEYS

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
)
@ -53,7 +54,15 @@ type Config struct {
Assets string `toml:"assets"`
} `toml:"admin"`
HTTPAPI struct {
Port int `toml:"port"`
SSLPort int `toml:"ssl-port"`
SSLCertPath string `toml:"ssl-cert"`
ReadTimeout Duration `toml:"read-timeout"`
} `toml:"api"`
Graphites []Graphite `toml:"graphite"`
Collectd Collectd `toml:"collectd"`
InputPlugins struct {
UDPInput struct {
@ -103,16 +112,6 @@ type Config struct {
} `toml:"logging"`
}
type Graphite struct {
Addr string `toml:"address"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Port uint16 `toml:"port"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
}
// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() *Config {
u, _ := user.Current()
@ -269,6 +268,43 @@ func ParseConfig(s string) (*Config, error) {
return c, nil
}
type Collectd struct {
Addr string `toml:"address"`
Port uint16 `toml:"port"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
TypesDB string `toml:"typesdb"`
}
// ConnnectionString returns the connection string for this collectd config in the form host:port.
func (c *Collectd) ConnectionString(defaultBindAddr string) string {
addr := c.Addr
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
}
port := c.Port
// If no port specified, use default.
if port == 0 {
port = collectd.DefaultPort
}
return fmt.Sprintf("%s:%d", addr, port)
}
type Graphite struct {
Addr string `toml:"address"`
Port uint16 `toml:"port"`
Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`
}
// ConnnectionString returns the connection string for this Graphite config in the form host:port.
func (g *Graphite) ConnectionString(defaultBindAddr string) string {

View File

@ -98,6 +98,19 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
}
switch {
case c.Collectd.Enabled != true:
t.Errorf("collectd enabled mismatch: expected: %v, got %v", true, c.Collectd.Enabled)
case c.Collectd.Addr != "192.168.0.3":
t.Errorf("collectd address mismatch: expected %v, got %v", "192.168.0.3", c.Collectd.Addr)
case c.Collectd.Port != 25827:
t.Errorf("collectd port mismatch: expected %v, got %v", 2005, c.Collectd.Port)
case c.Collectd.Database != "collectd_database":
t.Errorf("collectdabase mismatch: expected %v, got %v", "collectd_database", c.Collectd.Database)
case c.Collectd.TypesDB != "foo-db-type":
t.Errorf("collectd typesdb mismatch: expected %v, got %v", "foo-db-type", c.Collectd.TypesDB)
}
if c.Broker.Port != 8090 {
t.Fatalf("broker port mismatch: %v", c.Broker.Port)
} else if c.Broker.Dir != "/tmp/influxdb/development/broker" {
@ -190,6 +203,14 @@ address = "192.168.0.2"
port = 2005
database = "graphite_udp" # store graphite data in this database
# Configure collectd server
[collectd]
enabled = true
address = "192.168.0.3"
port = 25827
database = "collectd_database"
typesdb = "foo-db-type"
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
@ -270,3 +291,45 @@ lru-cache-size = "200m"
# they get flushed into backend.
point-batch-size = 50
`
func TestCollectd_ConnectionString(t *testing.T) {
var tests = []struct {
name string
defaultBindAddr string
connectionString string
config main.Collectd
}{
{
name: "No address or port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.1:25826",
config: main.Collectd{},
},
{
name: "address provided, no port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.2:25826",
config: main.Collectd{Addr: "192.168.0.2"},
},
{
name: "no address provided, port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.1:25827",
config: main.Collectd{Port: 25827},
},
{
name: "both address and port provided from config",
defaultBindAddr: "192.168.0.1",
connectionString: "192.168.0.2:25827",
config: main.Collectd{Addr: "192.168.0.2", Port: 25827},
},
}
for _, test := range tests {
t.Logf("test: %q", test.name)
s := test.config.ConnectionString(test.defaultBindAddr)
if s != test.connectionString {
t.Errorf("connection string mismatch, expected: %q, got: %q", test.connectionString, s)
}
}
}

View File

@ -12,6 +12,7 @@ import (
"strings"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/collectd"
"github.com/influxdb/influxdb/graphite"
"github.com/influxdb/influxdb/messaging"
)
@ -71,6 +72,16 @@ func execRun(args []string) {
}
log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr())
// Spin up the collectd server
if config.Collectd.Enabled {
c := config.Collectd
cs := collectd.NewServer(s, c.TypesDB)
cs.Database = c.Database
err := collectd.ListenAndServe(cs, c.ConnectionString(config.BindAddress))
if err != nil {
log.Printf("failed to start collectd Server: %v\n", err.Error())
}
}
// Spin up any Graphite servers
for _, c := range config.Graphites {
if !c.Enabled {
@ -88,14 +99,14 @@ func execRun(args []string) {
g.Database = c.Database
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start TCP Graphite Server", err.Error())
log.Printf("failed to start TCP Graphite Server: %v\n", err.Error())
}
} else if strings.ToLower(c.Protocol) == "udp" {
g := graphite.NewUDPServer(parser, s)
g.Database = c.Database
err := g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Println("failed to start UDP Graphite Server", err.Error())
log.Printf("failed to start UDP Graphite Server: %v\n", err.Error())
}
} else {
log.Fatalf("unrecognized Graphite Server prototcol %s", c.Protocol)

190
collectd/collectd.go Normal file
View File

@ -0,0 +1,190 @@
package collectd
import (
"errors"
"fmt"
"log"
"net"
"sync"
"time"
"github.com/kimor79/gollectd"
)
// DefaultPort for collectd is 25826
const DefaultPort = 25826
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error)
}
type Server struct {
mu sync.Mutex
wg sync.WaitGroup
conn *net.UDPConn
writer SeriesWriter
Database string
typesdb gollectd.Types
typesdbpath string
}
func NewServer(w SeriesWriter, typesDBPath string) *Server {
s := Server{
writer: w,
typesdbpath: typesDBPath,
typesdb: make(gollectd.Types),
}
return &s
}
func ListenAndServe(s *Server, iface string) error {
if iface == "" { // Make sure we have an address
return errors.New("bind address required")
} else if s.Database == "" { // Make sure they have a database
return errors.New("database was not specified in config")
}
addr, err := net.ResolveUDPAddr("udp", iface)
if err != nil {
return fmt.Errorf("unable to resolve UDP address: %v", err)
}
s.typesdb, err = gollectd.TypesDBFile(s.typesdbpath)
if err != nil {
return fmt.Errorf("unable to parse typesDBFile: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("unable to listen on UDP: %v", err)
}
s.conn = conn
s.wg.Add(1)
go s.serve(conn)
return nil
}
func (s *Server) serve(conn *net.UDPConn) {
defer s.wg.Done()
// From https://collectd.org/wiki/index.php/Binary_protocol
// 1024 bytes (payload only, not including UDP / IP headers)
// In versions 4.0 through 4.7, the receive buffer has a fixed size
// of 1024 bytes. When longer packets are received, the trailing data
// is simply ignored. Since version 4.8, the buffer size can be
// configured. Version 5.0 will increase the default buffer size to
// 1452 bytes (the maximum payload size when using UDP/IPv6 over
// Ethernet).
buffer := make([]byte, 1452)
for {
n, _, err := conn.ReadFromUDP(buffer)
if err != nil && s.conn != nil {
log.Printf("Collectd ReadFromUDP error: %s", err)
continue
}
log.Printf("received %d bytes", n)
if n > 0 {
s.handleMessage(buffer[:n])
}
if s.conn == nil {
// we closed the connection, time to go
return
}
}
}
func (s *Server) handleMessage(buffer []byte) {
log.Printf("handling message")
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
log.Printf("Collectd parse error: %s", err)
return
}
for _, packet := range *packets {
metrics := Unmarshal(&packet)
for _, m := range metrics {
// Convert metric to a field value.
var values = make(map[string]interface{})
values[m.Name] = m.Value
_, err := s.writer.WriteSeries(s.Database, "", m.Name, m.Tags, m.Timestamp, values)
if err != nil {
log.Printf("Collectd cannot write data: %s", err)
continue
}
}
}
}
// Close shuts down the server's listeners.
func (s *Server) Close() error {
// Notify other goroutines of shutdown.
s.mu.Lock()
defer s.mu.Unlock()
if s.conn == nil {
return errors.New("server already closed")
}
s.conn.Close()
s.conn = nil
// Wait for all goroutines to shutdown.
s.wg.Wait()
log.Printf("all waitgroups finished")
return nil
}
// TODO corylanou: This needs to be made a public `main.Point` so we can share this across packages.
type Metric struct {
Name string
Tags map[string]string
Value interface{}
Timestamp time.Time
}
func Unmarshal(data *gollectd.Packet) []Metric {
// Prefer high resolution timestamp.
var timestamp time.Time
if data.TimeHR > 0 {
// TimeHR is "near" nanosecond measurement, but not exactly nanasecond time
// Since we store time in microseconds, we round here (mostly so tests will work easier)
sec := data.TimeHR >> 30
// Shifting, masking, and dividing by 1 billion to get nanoseconds.
nsec := ((data.TimeHR & 0x3FFFFFFF) << 30) / 1000 / 1000 / 1000
timestamp = time.Unix(int64(sec), int64(nsec)).UTC().Round(time.Microsecond)
} else {
// If we don't have high resolution time, fall back to basic unix time
timestamp = time.Unix(int64(data.Time), 0).UTC()
}
var m []Metric
for i, _ := range data.Values {
metric := Metric{Name: fmt.Sprintf("%s_%s", data.Plugin, data.Values[i].Name)}
metric.Value = data.Values[i].Value
metric.Timestamp = timestamp
metric.Tags = make(map[string]string)
if data.Hostname != "" {
metric.Tags["host"] = data.Hostname
}
if data.PluginInstance != "" {
metric.Tags["instance"] = data.PluginInstance
}
if data.Type != "" {
metric.Tags["type"] = data.Type
}
if data.TypeInstance != "" {
metric.Tags["type_instance"] = data.TypeInstance
}
m = append(m, metric)
}
return m
}

209
collectd/collectd_test.conf Normal file
View File

@ -0,0 +1,209 @@
absolute value:ABSOLUTE:0:U
apache_bytes value:DERIVE:0:U
apache_connections value:GAUGE:0:65535
apache_idle_workers value:GAUGE:0:65535
apache_requests value:DERIVE:0:U
apache_scoreboard value:GAUGE:0:65535
ath_nodes value:GAUGE:0:65535
ath_stat value:DERIVE:0:U
backends value:GAUGE:0:65535
bitrate value:GAUGE:0:4294967295
bytes value:GAUGE:0:U
cache_eviction value:DERIVE:0:U
cache_operation value:DERIVE:0:U
cache_ratio value:GAUGE:0:100
cache_result value:DERIVE:0:U
cache_size value:GAUGE:0:U
charge value:GAUGE:0:U
compression_ratio value:GAUGE:0:2
compression uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
connections value:DERIVE:0:U
conntrack value:GAUGE:0:4294967295
contextswitch value:DERIVE:0:U
counter value:COUNTER:U:U
cpufreq value:GAUGE:0:U
cpu value:DERIVE:0:U
current_connections value:GAUGE:0:U
current_sessions value:GAUGE:0:U
current value:GAUGE:U:U
delay value:GAUGE:-1000000:1000000
derive value:DERIVE:0:U
df_complex value:GAUGE:0:U
df_inodes value:GAUGE:0:U
df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
disk_latency read:GAUGE:0:U, write:GAUGE:0:U
disk_merged read:DERIVE:0:U, write:DERIVE:0:U
disk_octets read:DERIVE:0:U, write:DERIVE:0:U
disk_ops_complex value:DERIVE:0:U
disk_ops read:DERIVE:0:U, write:DERIVE:0:U
disk_time read:DERIVE:0:U, write:DERIVE:0:U
dns_answer value:DERIVE:0:U
dns_notify value:DERIVE:0:U
dns_octets queries:DERIVE:0:U, responses:DERIVE:0:U
dns_opcode value:DERIVE:0:U
dns_qtype_cached value:GAUGE:0:4294967295
dns_qtype value:DERIVE:0:U
dns_query value:DERIVE:0:U
dns_question value:DERIVE:0:U
dns_rcode value:DERIVE:0:U
dns_reject value:DERIVE:0:U
dns_request value:DERIVE:0:U
dns_resolver value:DERIVE:0:U
dns_response value:DERIVE:0:U
dns_transfer value:DERIVE:0:U
dns_update value:DERIVE:0:U
dns_zops value:DERIVE:0:U
duration seconds:GAUGE:0:U
email_check value:GAUGE:0:U
email_count value:GAUGE:0:U
email_size value:GAUGE:0:U
entropy value:GAUGE:0:4294967295
fanspeed value:GAUGE:0:U
file_size value:GAUGE:0:U
files value:GAUGE:0:U
flow value:GAUGE:0:U
fork_rate value:DERIVE:0:U
frequency_offset value:GAUGE:-1000000:1000000
frequency value:GAUGE:0:U
fscache_stat value:DERIVE:0:U
gauge value:GAUGE:U:U
hash_collisions value:DERIVE:0:U
http_request_methods value:DERIVE:0:U
http_requests value:DERIVE:0:U
http_response_codes value:DERIVE:0:U
humidity value:GAUGE:0:100
if_collisions value:DERIVE:0:U
if_dropped rx:DERIVE:0:U, tx:DERIVE:0:U
if_errors rx:DERIVE:0:U, tx:DERIVE:0:U
if_multicast value:DERIVE:0:U
if_octets rx:DERIVE:0:U, tx:DERIVE:0:U
if_packets rx:DERIVE:0:U, tx:DERIVE:0:U
if_rx_errors value:DERIVE:0:U
if_rx_octets value:DERIVE:0:U
if_tx_errors value:DERIVE:0:U
if_tx_octets value:DERIVE:0:U
invocations value:DERIVE:0:U
io_octets rx:DERIVE:0:U, tx:DERIVE:0:U
io_packets rx:DERIVE:0:U, tx:DERIVE:0:U
ipt_bytes value:DERIVE:0:U
ipt_packets value:DERIVE:0:U
irq value:DERIVE:0:U
latency value:GAUGE:0:U
links value:GAUGE:0:U
load shortterm:GAUGE:0:5000, midterm:GAUGE:0:5000, longterm:GAUGE:0:5000
md_disks value:GAUGE:0:U
memcached_command value:DERIVE:0:U
memcached_connections value:GAUGE:0:U
memcached_items value:GAUGE:0:U
memcached_octets rx:DERIVE:0:U, tx:DERIVE:0:U
memcached_ops value:DERIVE:0:U
memory value:GAUGE:0:281474976710656
multimeter value:GAUGE:U:U
mutex_operations value:DERIVE:0:U
mysql_commands value:DERIVE:0:U
mysql_handler value:DERIVE:0:U
mysql_locks value:DERIVE:0:U
mysql_log_position value:DERIVE:0:U
mysql_octets rx:DERIVE:0:U, tx:DERIVE:0:U
nfs_procedure value:DERIVE:0:U
nginx_connections value:GAUGE:0:U
nginx_requests value:DERIVE:0:U
node_octets rx:DERIVE:0:U, tx:DERIVE:0:U
node_rssi value:GAUGE:0:255
node_stat value:DERIVE:0:U
node_tx_rate value:GAUGE:0:127
objects value:GAUGE:0:U
operations value:DERIVE:0:U
percent value:GAUGE:0:100.1
percent_bytes value:GAUGE:0:100.1
percent_inodes value:GAUGE:0:100.1
pf_counters value:DERIVE:0:U
pf_limits value:DERIVE:0:U
pf_source value:DERIVE:0:U
pf_states value:GAUGE:0:U
pf_state value:DERIVE:0:U
pg_blks value:DERIVE:0:U
pg_db_size value:GAUGE:0:U
pg_n_tup_c value:DERIVE:0:U
pg_n_tup_g value:GAUGE:0:U
pg_numbackends value:GAUGE:0:U
pg_scan value:DERIVE:0:U
pg_xact value:DERIVE:0:U
ping_droprate value:GAUGE:0:100
ping_stddev value:GAUGE:0:65535
ping value:GAUGE:0:65535
players value:GAUGE:0:1000000
power value:GAUGE:0:U
protocol_counter value:DERIVE:0:U
ps_code value:GAUGE:0:9223372036854775807
ps_count processes:GAUGE:0:1000000, threads:GAUGE:0:1000000
ps_cputime user:DERIVE:0:U, syst:DERIVE:0:U
ps_data value:GAUGE:0:9223372036854775807
ps_disk_octets read:DERIVE:0:U, write:DERIVE:0:U
ps_disk_ops read:DERIVE:0:U, write:DERIVE:0:U
ps_pagefaults minflt:DERIVE:0:U, majflt:DERIVE:0:U
ps_rss value:GAUGE:0:9223372036854775807
ps_stacksize value:GAUGE:0:9223372036854775807
ps_state value:GAUGE:0:65535
ps_vm value:GAUGE:0:9223372036854775807
queue_length value:GAUGE:0:U
records value:GAUGE:0:U
requests value:GAUGE:0:U
response_time value:GAUGE:0:U
response_code value:GAUGE:0:U
route_etx value:GAUGE:0:U
route_metric value:GAUGE:0:U
routes value:GAUGE:0:U
serial_octets rx:DERIVE:0:U, tx:DERIVE:0:U
signal_noise value:GAUGE:U:0
signal_power value:GAUGE:U:0
signal_quality value:GAUGE:0:U
snr value:GAUGE:0:U
spam_check value:GAUGE:0:U
spam_score value:GAUGE:U:U
spl value:GAUGE:U:U
swap_io value:DERIVE:0:U
swap value:GAUGE:0:1099511627776
tcp_connections value:GAUGE:0:4294967295
temperature value:GAUGE:U:U
threads value:GAUGE:0:U
time_dispersion value:GAUGE:-1000000:1000000
timeleft value:GAUGE:0:U
time_offset value:GAUGE:-1000000:1000000
total_bytes value:DERIVE:0:U
total_connections value:DERIVE:0:U
total_objects value:DERIVE:0:U
total_operations value:DERIVE:0:U
total_requests value:DERIVE:0:U
total_sessions value:DERIVE:0:U
total_threads value:DERIVE:0:U
total_time_in_ms value:DERIVE:0:U
total_values value:DERIVE:0:U
uptime value:GAUGE:0:4294967295
users value:GAUGE:0:65535
vcl value:GAUGE:0:65535
vcpu value:GAUGE:0:U
virt_cpu_total value:DERIVE:0:U
virt_vcpu value:DERIVE:0:U
vmpage_action value:DERIVE:0:U
vmpage_faults minflt:DERIVE:0:U, majflt:DERIVE:0:U
vmpage_io in:DERIVE:0:U, out:DERIVE:0:U
vmpage_number value:GAUGE:0:4294967295
volatile_changes value:GAUGE:0:U
voltage_threshold value:GAUGE:U:U, threshold:GAUGE:U:U
voltage value:GAUGE:U:U
vs_memory value:GAUGE:0:9223372036854775807
vs_processes value:GAUGE:0:65535
vs_threads value:GAUGE:0:65535
#
# Legacy types
# (required for the v5 upgrade target)
#
arc_counts demand_data:COUNTER:0:U, demand_metadata:COUNTER:0:U, prefetch_data:COUNTER:0:U, prefetch_metadata:COUNTER:0:U
arc_l2_bytes read:COUNTER:0:U, write:COUNTER:0:U
arc_l2_size value:GAUGE:0:U
arc_ratio value:GAUGE:0:U
arc_size current:GAUGE:0:U, target:GAUGE:0:U, minlimit:GAUGE:0:U, maxlimit:GAUGE:0:U
mysql_qcache hits:COUNTER:0:U, inserts:COUNTER:0:U, not_cached:COUNTER:0:U, lowmem_prunes:COUNTER:0:U, queries_in_cache:GAUGE:0:U
mysql_threads running:GAUGE:0:U, connected:GAUGE:0:U, cached:GAUGE:0:U, created:COUNTER:0:U

362
collectd/collectd_test.go Normal file
View File

@ -0,0 +1,362 @@
package collectd_test
import (
"encoding/hex"
"fmt"
"net"
"testing"
"time"
"github.com/influxdb/influxdb/collectd"
"github.com/kimor79/gollectd"
)
type testServer string
type serverResponses []serverResponse
type serverResponse struct {
database, retentionPolicy, name string
tags map[string]string
timestamp time.Time
values map[string]interface{}
}
var responses = make(chan *serverResponse, 1024)
func (testServer) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error) {
responses <- &serverResponse{
database: database,
retentionPolicy: retentionPolicy,
name: name,
tags: tags,
timestamp: timestamp,
values: values,
}
return 0, nil
}
func (testServer) ResponseN(n int) ([]*serverResponse, error) {
var a []*serverResponse
for {
select {
case r := <-responses:
a = append(a, r)
if len(a) == n {
return a, nil
}
case <-time.After(time.Second):
return a, fmt.Errorf("unexpected response count: expected: %d, actual: %d", n, len(a))
}
}
}
func TestServer_ListenAndServe_ErrBindAddressRequired(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
e := collectd.ListenAndServe(s, "")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrDatabaseNotSpecified(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
e := collectd.ListenAndServe(s, "127.0.0.1:25826")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrCouldNotParseTypesDBFile(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "foo")
)
s.Database = "foo"
e := collectd.ListenAndServe(s, "127.0.0.1:25829")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_Success(t *testing.T) {
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25830")
defer s.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
}
func TestServer_Close_ErrServerClosed(t *testing.T) {
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25830")
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
s.Close()
e = s.Close()
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrResolveUDPAddr(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "./collectd_test.conf")
)
s.Database = "counter"
e := collectd.ListenAndServe(s, "foo")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_ListenAndServe_ErrListenUDP(t *testing.T) {
var (
ts testServer
s = collectd.NewServer(ts, "./collectd_test.conf")
)
//Open a udp listener on the port prior to force it to err
addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:25826")
conn, _ := net.ListenUDP("udp", addr)
defer conn.Close()
s.Database = "counter"
e := collectd.ListenAndServe(s, "127.0.0.1:25826")
if e == nil {
t.Fatalf("expected an error, got %v", e)
}
}
func TestServer_Serve_Success(t *testing.T) {
// clear any previous responses
var (
ts testServer
// You can typically find this on your mac here: "/usr/local/Cellar/collectd/5.4.1/share/collectd/types.db"
s = collectd.NewServer(ts, "./collectd_test.conf")
addr = "127.0.0.1:25830"
)
s.Database = "counter"
e := collectd.ListenAndServe(s, addr)
defer s.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
conn, e := net.Dial("udp", addr)
defer conn.Close()
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
buf, e := hex.DecodeString("0000000e6c6f63616c686f7374000008000c1512b2e40f5da16f0009000c00000002800000000002000e70726f636573736573000004000d70735f7374617465000005000c72756e6e696e67000006000f000101000000000000f03f0008000c1512b2e40f5db90f0005000d736c656570696e67000006000f0001010000000000c06f400008000c1512b2e40f5dc4a40005000c7a6f6d62696573000006000f00010100000000000000000008000c1512b2e40f5de10b0005000c73746f70706564000006000f00010100000000000000000008000c1512b2e40f5deac20005000b706167696e67000006000f00010100000000000000000008000c1512b2e40f5df59b0005000c626c6f636b6564000006000f00010100000000000000000008000c1512b2e40f7ee0610004000e666f726b5f726174650000050005000006000f000102000000000004572f0008000c1512b2e68e0635e6000200086370750000030006300000040008637075000005000975736572000006000f0001020000000000204f9c0008000c1512b2e68e0665d6000500096e696365000006000f000102000000000000caa30008000c1512b2e68e06789c0005000b73797374656d000006000f00010200000000000607050008000c1512b2e68e06818e0005000969646c65000006000f0001020000000003b090ae0008000c1512b2e68e068bcf0005000977616974000006000f000102000000000000f6810008000c1512b2e68e069c7d0005000e696e74657272757074000006000f000102000000000000001d0008000c1512b2e68e069fec0005000c736f6674697271000006000f0001020000000000000a2a0008000c1512b2e68e06a2b20005000a737465616c000006000f00010200000000000000000008000c1512b2e68e0708d60003000631000005000975736572000006000f00010200000000001d48c60008000c1512b2e68e070c16000500096e696365000006000f0001020000000000007fe60008000c1512b2e68e0710790005000b73797374656d000006000f00010200000000000667890008000c1512b2e68e0713bb0005000969646c65000006000f00010200000000025d0e470008000c1512b2e68e0717790005000977616974000006000f000102000000000002500e0008000c1512b2e68e071bc00005000e696e74657272757074000006000f00010200000000000000000008000c1512b2e68e071f800005000c736f6674697271000006000f00010200000000000006050008000c1512b2e68e07221e0005000a737465616c000006000f00010200000000000000000008000c1512b2e68e0726eb0003000632000005000975736572000006000f00010200000000001ff3e40008000c1512b2e68e0728cb000500096e696365000006000f000102000000000000ca210008000c1512b2e68e072ae70005000b73797374656d000006000f000102000000000006eabe0008000c1512b2e68e072f2f0005000977616974000006000f000102000000000000c1300008000c1512b2e68e072ccb0005000969646c65000006000f00010200000000025b5abb0008000c1512b2e68e07312c0005000e696e74657272757074000006000f00010200000000000000070008000c1512b2e68e0733520005000c736f6674697271000006000f00010200000000000007260008000c1512b2e68e0735b60005000a737465616c000006000f00010200000000000000000008000c1512b2e68e07828d0003000633000005000975736572000006000f000102000000000020f50a0008000c1512b2e68e0787ac000500096e696365000006000f0001020000000000008368")
if e != nil {
t.Fatalf("err from hex.DecodeString does not match. expected %v, got %v", nil, e)
}
_, e = conn.Write(buf)
if e != nil {
t.Fatalf("err does not match. expected %v, got %v", nil, e)
}
if _, err := ts.ResponseN(33); err != nil {
t.Fatal(err)
}
}
func TestUnmarshal_Metrics(t *testing.T) {
/*
This is a sample of what data can be represented like in json
[
{
"values": [197141504, 175136768],
"dstypes": ["counter", "counter"],
"dsnames": ["read", "write"],
"time": 1251533299,
"interval": 10,
"host": "leeloo.lan.home.verplant.org",
"plugin": "disk",
"plugin_instance": "sda",
"type": "disk_octets",
"type_instance": ""
},
]
*/
var tests = []struct {
name string
packet gollectd.Packet
metrics []collectd.Metric
}{
{
name: "single value",
metrics: []collectd.Metric{
collectd.Metric{Name: "disk_read", Value: float64(1)},
},
packet: gollectd.Packet{
Plugin: "disk",
Values: []gollectd.Value{
gollectd.Value{Name: "read", Value: 1},
},
},
},
{
name: "multi value",
metrics: []collectd.Metric{
collectd.Metric{Name: "disk_read", Value: float64(1)},
collectd.Metric{Name: "disk_write", Value: float64(5)},
},
packet: gollectd.Packet{
Plugin: "disk",
Values: []gollectd.Value{
gollectd.Value{Name: "read", Value: 1},
gollectd.Value{Name: "write", Value: 5},
},
},
},
{
name: "tags",
metrics: []collectd.Metric{
collectd.Metric{
Name: "disk_read",
Value: float64(1),
Tags: map[string]string{"host": "server01", "instance": "sdk", "type": "disk_octets", "type_instance": "single"},
},
},
packet: gollectd.Packet{
Plugin: "disk",
Hostname: "server01",
PluginInstance: "sdk",
Type: "disk_octets",
TypeInstance: "single",
Values: []gollectd.Value{
gollectd.Value{Name: "read", Value: 1},
},
},
},
}
for _, test := range tests {
t.Logf("testing %q", test.name)
metrics := collectd.Unmarshal(&test.packet)
if len(metrics) != len(test.metrics) {
t.Errorf("metric len mismatch. expected %d, got %d", len(test.metrics), len(metrics))
}
for i, m := range test.metrics {
// test name
name := fmt.Sprintf("%s_%s", test.packet.Plugin, test.packet.Values[i].Name)
if m.Name != name {
t.Errorf("metric name mismatch. expected %q, got %q", name, m.Name)
}
// test value
mv := m.Value.(float64)
pv := test.packet.Values[i].Value
if mv != pv {
t.Errorf("metric value mismatch. expected %v, got %v", pv, mv)
}
// test tags
if test.packet.Hostname != m.Tags["host"] {
t.Errorf(`metric tags["host"] mismatch. expected %q, got %q`, test.packet.Hostname, m.Tags["host"])
}
if test.packet.PluginInstance != m.Tags["instance"] {
t.Errorf(`metric tags["instance"] mismatch. expected %q, got %q`, test.packet.PluginInstance, m.Tags["instance"])
}
if test.packet.Type != m.Tags["type"] {
t.Errorf(`metric tags["type"] mismatch. expected %q, got %q`, test.packet.Type, m.Tags["type"])
}
if test.packet.TypeInstance != m.Tags["type_instance"] {
t.Errorf(`metric tags["type_instance"] mismatch. expected %q, got %q`, test.packet.TypeInstance, m.Tags["type_instance"])
}
}
}
}
func TestUnmarshal_Time(t *testing.T) {
// Its important to remember that collectd stores high resolution time
// as "near" nanoseconds (2^30) so we have to take that into account
// when feeding time into the test.
// Since we only store microseconds, we round it off (mostly to make testing easier)
testTime := time.Now().UTC().Round(time.Microsecond)
var timeHR = func(tm time.Time) uint64 {
sec, nsec := tm.Unix(), tm.UnixNano()%1000000000
hr := (sec << 30) + (nsec * 1000000000 / 1073741824)
return uint64(hr)
}
var tests = []struct {
name string
packet gollectd.Packet
metrics []collectd.Metric
}{
{
name: "Should parse timeHR properly",
packet: gollectd.Packet{
TimeHR: timeHR(testTime),
Values: []gollectd.Value{
gollectd.Value{
Value: 1,
},
},
},
metrics: []collectd.Metric{
collectd.Metric{Timestamp: testTime},
},
},
{
name: "Should parse time properly",
packet: gollectd.Packet{
Time: uint64(testTime.Round(time.Second).Unix()),
Values: []gollectd.Value{
gollectd.Value{
Value: 1,
},
},
},
metrics: []collectd.Metric{
collectd.Metric{
Timestamp: testTime.Round(time.Second),
},
},
},
}
for _, test := range tests {
t.Logf("testing %q", test.name)
metrics := collectd.Unmarshal(&test.packet)
if len(metrics) != len(test.metrics) {
t.Errorf("metric len mismatch. expected %d, got %d", len(test.metrics), len(metrics))
}
for _, m := range metrics {
if test.packet.TimeHR > 0 {
if m.Timestamp.Format(time.RFC3339Nano) != testTime.Format(time.RFC3339Nano) {
t.Errorf("timestamp mis-match, got %v, expected %v", m.Timestamp.Format(time.RFC3339Nano), testTime.Format(time.RFC3339Nano))
} else if m.Timestamp.Format(time.RFC3339) != testTime.Format(time.RFC3339) {
t.Errorf("timestamp mis-match, got %v, expected %v", m.Timestamp.Format(time.RFC3339), testTime.Format(time.RFC3339))
}
}
}
}
}

View File

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/influxql"
)
@ -120,7 +121,7 @@ func NewMeasurement(name string) *Measurement {
}
}
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// createFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement.
func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType) (*Field, error) {
// Ignore if the field already exists.
@ -129,13 +130,13 @@ func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType)
}
// Only 255 fields are allowed. If we go over that then return an error.
if len(m.Fields) > math.MaxUint8 {
if len(m.Fields)+1 > math.MaxUint8 {
return nil, ErrFieldOverflow
}
// Create and append a new field.
f := &Field{
ID: uint8(len(m.Fields)),
ID: uint8(len(m.Fields) + 1),
Name: name,
Type: typ,
}
@ -787,7 +788,7 @@ func (dbi *dbi) Field(name, field string) (fieldID uint8, typ influxql.DataType)
}
// Find field by name.
f := m.FieldByName(name)
f := m.FieldByName(field)
if f == nil {
return 0, influxql.Unknown
}
@ -797,9 +798,145 @@ func (dbi *dbi) Field(name, field string) (fieldID uint8, typ influxql.DataType)
// CreateIterator returns an iterator given a series data id, field id, & field data type.
func (dbi *dbi) CreateIterator(seriesID uint32, fieldID uint8, typ influxql.DataType, min, max time.Time, interval time.Duration) influxql.Iterator {
// TODO: Find shard group.
// TODO: Find shard for series.
// TODO: Open bolt cursor.
// TODO: Return wrapper cursor.
panic("TODO")
// TODO: Add retention policy to the arguments.
// Create an iterator to hold the transaction and series ids.
itr := &iterator{
seriesID: seriesID,
fieldID: fieldID,
typ: typ,
imin: -1,
interval: int64(interval),
}
if !min.IsZero() {
itr.min = min.UnixNano()
}
if !max.IsZero() {
itr.max = max.UnixNano()
}
// Retrieve the policy.
// Ignore if there are no groups created on the retention policy.
rp := dbi.db.policies[dbi.db.defaultRetentionPolicy]
if len(rp.groups) == 0 {
return itr
}
// Find all shards which match the the time range and series id.
// TODO: Support multiple groups.
g := rp.groups[0]
// Ignore shard groups that our time range does not cross.
if !timeBetweenInclusive(g.StartTime, min, max) &&
!timeBetweenInclusive(g.EndTime, min, max) {
return itr
}
// Find appropriate shard by series id.
sh := g.ShardBySeriesID(seriesID)
// Open a transaction on the shard.
tx, err := sh.store.Begin(false)
assert(err == nil, "read-only tx error: %s", err)
itr.tx = tx
// Open and position cursor.
b := tx.Bucket(u32tob(seriesID))
if b != nil {
cur := b.Cursor()
itr.k, itr.v = cur.Seek(u64tob(uint64(itr.min)))
itr.cur = cur
}
return itr
}
// iterator represents a series data iterator for a shard.
// It can iterate over all data for a given time range for multiple series in a shard.
type iterator struct {
tx *bolt.Tx
cur *bolt.Cursor
seriesID uint32
fieldID uint8
typ influxql.DataType
k, v []byte // lookahead buffer
min, max int64 // time range
imin, imax int64 // interval time range
interval int64 // interval duration
}
// close closes the iterator.
func (i *iterator) Close() error {
if i.tx != nil {
return i.tx.Rollback()
}
return nil
}
// Next returns the next value from the iterator.
func (i *iterator) Next() (key int64, value interface{}) {
for {
// Read raw key/value from lookhead buffer, if available.
// Otherwise read from cursor.
var k, v []byte
if i.k != nil {
k, v = i.k, i.v
i.k, i.v = nil, nil
} else if i.cur != nil {
k, v = i.cur.Next()
}
// Exit at the end of the cursor.
if k == nil {
return 0, nil
}
// Extract timestamp & field value.
key = int64(btou64(k))
value = unmarshalValue(v, i.fieldID)
// If timestamp is beyond interval time range then push onto lookahead buffer.
if key >= i.imax && i.imax != 0 {
i.k, i.v = k, v
return 0, nil
}
// Return value if it is non-nil.
// Otherwise loop again and try the next point.
if value != nil {
return
}
}
}
// NextIterval moves to the next iterval. Returns true unless EOF.
func (i *iterator) NextIterval() bool {
// Determine the next interval's lower bound.
imin := i.imin + i.interval
// Initialize or move interval forward.
if i.imin == -1 { // initialize first interval
i.imin = i.min
} else if i.interval != 0 && (i.max == 0 || imin < i.max) { // move forward
i.imin = imin
} else { // no interval or beyond max time.
return false
}
// Interval end time should be the start time plus interval duration.
// If the end time is beyond the iterator end time then shorten it.
i.imax = i.imin + i.interval
if max := i.max; i.imax > max {
i.imax = max
}
return true
}
// Time returns start time of the current interval.
func (i *iterator) Time() int64 { return i.imin }
// Interval returns the group by duration.
func (i *iterator) Interval() time.Duration { return time.Duration(i.interval) }

View File

@ -33,7 +33,7 @@ var (
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error
WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error)
}
// Metric represents a metric as processed by the Graphite parser.

View File

@ -71,6 +71,10 @@ var (
// ErrRetentionPolicyNameRequired is returned using a blank shard space name.
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")
// ErrDefaultRetentionPolicyNotFound is returned when using the default
// policy on a database but the default has not been set.
ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")
// ErrShardNotFound is returned writing to a non-existent shard.
ErrShardNotFound = errors.New("shard not found")

View File

@ -118,6 +118,7 @@ statement = alter_retention_policy_stmt |
list_field_key_stmt |
list_field_value_stmt |
list_measurements_stmt |
list_retention_policies |
list_series_stmt |
list_tag_key_stmt |
list_tag_value_stmt |
@ -264,6 +265,32 @@ GRANT ALL TO jdoe;
GRANT READ ON mydb TO jdoe;
```
### LIST DATABASES
```
list_databases_stmt = "LIST DATABASES"
```
#### Example:
```sql
-- list all databases
LIST DATABASES;
```
### LIST RETENTION POLICIES
```
list_retention_policies = "LIST RETENTION POLICIES" db_name
```
#### Example:
```sql
-- list all retention policies on a database
LIST RETENTION POLICIES mydb;
```
## Clauses
```

View File

@ -62,6 +62,7 @@ func (_ *ListContinuousQueriesStatement) node() {}
func (_ *ListDatabasesStatement) node() {}
func (_ *ListFieldKeysStatement) node() {}
func (_ *ListFieldValuesStatement) node() {}
func (_ *ListRetentionPoliciesStatement) node() {}
func (_ *ListMeasurementsStatement) node() {}
func (_ *ListSeriesStatement) node() {}
func (_ *ListTagKeysStatement) node() {}
@ -133,6 +134,7 @@ func (_ *ListDatabasesStatement) stmt() {}
func (_ *ListFieldKeysStatement) stmt() {}
func (_ *ListFieldValuesStatement) stmt() {}
func (_ *ListMeasurementsStatement) stmt() {}
func (_ *ListRetentionPoliciesStatement) stmt() {}
func (_ *ListSeriesStatement) stmt() {}
func (_ *ListTagKeysStatement) stmt() {}
func (_ *ListTagValuesStatement) stmt() {}
@ -780,6 +782,20 @@ func (s *ListMeasurementsStatement) String() string {
return buf.String()
}
// ListRetentionPoliciesStatement represents a command for listing retention policies.
type ListRetentionPoliciesStatement struct {
// Name of the database to list policies for.
Database string
}
// String returns a string representation of a ListRetentionPoliciesStatement.
func (s *ListRetentionPoliciesStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("LIST RETENTION POLICIES ")
_, _ = buf.WriteString(s.Database)
return buf.String()
}
// ListTagKeysStatement represents a command for listing tag keys.
type ListTagKeysStatement struct {
// Data source that fields are extracted from.

View File

@ -113,9 +113,15 @@ func (p *Parser) parseListStatement() (Statement, error) {
} else {
return nil, newParseError(tokstr(tok, lit), []string{"KEYS", "VALUES"}, pos)
}
} else if tok == RETENTION {
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == POLICIES {
return p.parseListRetentionPoliciesStatement()
} else {
return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos)
}
}
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENTS", "TAG", "FIELD"}, pos)
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENTS", "TAG", "FIELD", "RETENTION"}, pos)
}
// parseCreateStatement parses a string and returns a create statement.
@ -643,6 +649,20 @@ func (p *Parser) parseListMeasurementsStatement() (*ListMeasurementsStatement, e
return stmt, nil
}
// parseListRetentionPoliciesStatement parses a string and returns a ListRetentionPoliciesStatement.
// This function assumes the "LIST RETENTION POLICIES" tokens have been consumed.
func (p *Parser) parseListRetentionPoliciesStatement() (*ListRetentionPoliciesStatement, error) {
stmt := &ListRetentionPoliciesStatement{}
ident, err := p.parseIdentifier()
if err != nil {
return nil, err
}
stmt.Database = ident
return stmt, nil
}
// parseListTagKeysStatement parses a string and returns a ListSeriesStatement.
// This function assumes the "LIST TAG KEYS" tokens have already been consumed.
func (p *Parser) parseListTagKeysStatement() (*ListTagKeysStatement, error) {

View File

@ -195,6 +195,14 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// LIST RETENTION POLICIES
{
s: `LIST RETENTION POLICIES mydb`,
stmt: &influxql.ListRetentionPoliciesStatement{
Database: "mydb",
},
},
// LIST TAG KEYS
{
s: `LIST TAG KEYS FROM src WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
@ -533,7 +541,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERIES`, err: `found EOF, expected identifier, string at line 1, char 13`},
{s: `LIST CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `LIST FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENTS, TAG, FIELD at line 1, char 6`},
{s: `LIST RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `LIST RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `LIST FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENTS, TAG, FIELD, RETENTION at line 1, char 6`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier, string at line 1, char 23`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},

View File

@ -132,6 +132,7 @@ func TestScanner_Scan(t *testing.T) {
{s: `ORDER`, tok: influxql.ORDER},
{s: `PASSWORD`, tok: influxql.PASSWORD},
{s: `POLICY`, tok: influxql.POLICY},
{s: `POLICIES`, tok: influxql.POLICIES},
{s: `PRIVILEGES`, tok: influxql.PRIVILEGES},
{s: `QUERIES`, tok: influxql.QUERIES},
{s: `QUERY`, tok: influxql.QUERY},

View File

@ -86,6 +86,7 @@ const (
ORDER
PASSWORD
POLICY
POLICIES
PRIVILEGES
QUERIES
QUERY
@ -173,6 +174,7 @@ var tokens = [...]string{
ORDER: "ORDER",
PASSWORD: "PASSWORD",
POLICY: "POLICY",
POLICIES: "POLICIES",
PRIVILEGES: "PRIVILEGES",
QUERIES: "QUERIES",
QUERY: "QUERY",

View File

@ -634,7 +634,7 @@ func (t *topic) Close() error {
// writeTo writes the topic to a replica since a given index.
// Returns an error if the starting index is unavailable.
func (t *topic) writeTo(r *Replica, index uint64) (int, error) {
func (t *topic) writeTo(r *Replica, index uint64) (int64, error) {
// TODO: If index is too old then return an error.
// Open topic file for reading.
@ -648,7 +648,7 @@ func (t *topic) writeTo(r *Replica, index uint64) (int, error) {
defer func() { _ = f.Close() }()
// Stream out all messages until EOF.
total := 0
var total int64
dec := NewMessageDecoder(bufio.NewReader(f))
for {
// Decode message.
@ -775,7 +775,7 @@ func (r *Replica) Write(p []byte) (int, error) {
// WriteTo begins writing messages to a named stream.
// Only one writer is allowed on a stream at a time.
func (r *Replica) WriteTo(w io.Writer) (int, error) {
func (r *Replica) WriteTo(w io.Writer) (int64, error) {
// Close previous writer, if set.
r.closeWriter()
@ -866,14 +866,14 @@ type Message struct {
}
// WriteTo encodes and writes the message to a writer. Implements io.WriterTo.
func (m *Message) WriteTo(w io.Writer) (n int, err error) {
func (m *Message) WriteTo(w io.Writer) (n int64, err error) {
if n, err := w.Write(m.marshalHeader()); err != nil {
return n, err
return int64(n), err
}
if n, err := w.Write(m.Data); err != nil {
return messageHeaderSize + n, err
return int64(messageHeaderSize + n), err
}
return messageHeaderSize + len(m.Data), nil
return int64(messageHeaderSize + len(m.Data)), nil
}
// MarshalBinary returns a binary representation of the message.

View File

@ -66,7 +66,7 @@ func (m *metastore) mustView(fn func(*metatx) error) (err error) {
err = fn(tx)
return nil
}); e != nil {
panic("metastore view: " + err.Error())
panic("metastore view: " + e.Error())
}
return
}
@ -78,7 +78,7 @@ func (m *metastore) mustUpdate(fn func(*metatx) error) (err error) {
err = fn(tx)
return nil
}); e != nil {
panic("metastore update: " + err.Error())
panic("metastore update: " + e.Error())
}
return
}

View File

@ -166,7 +166,7 @@ func (h *HTTPHandler) serveStream(w http.ResponseWriter, r *http.Request) {
// TODO(benbjohnson): Redirect to leader.
// Write to the response.
if err := h.log.WriteTo(w, id, term, index); err != nil && err != io.EOF {
if err := h.log.WriteEntriesTo(w, id, term, index); err != nil && err != io.EOF {
w.Header().Set("X-Raft-Error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return

View File

@ -1201,9 +1201,9 @@ func (l *Log) elector(done chan chan struct{}) {
}
}
// WriteTo attaches a writer to the log from a given index.
// WriteEntriesTo attaches a writer to the log from a given index.
// The index specified must be a committed index.
func (l *Log) WriteTo(w io.Writer, id, term, index uint64) error {
func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// Validate and initialize the writer.
writer, err := l.initWriter(w, id, term, index)
if err != nil {

View File

@ -173,15 +173,15 @@ func (s *Server) Close() error {
return ErrServerClosed
}
// Remove path.
s.path = ""
// Close message processing.
s.setClient(nil)
// Close metastore.
_ = s.meta.close()
// Remove path.
s.path = ""
return nil
}
@ -1213,6 +1213,7 @@ type createSeriesIfNotExistsCommand struct {
}
// WriteSeries writes series data to the database.
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error) {
// Find the id for the series and tagset
seriesID, err := s.createSeriesIfNotExists(database, name, tags)
@ -1225,6 +1226,8 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
rp, err := s.DefaultRetentionPolicy(database)
if err != nil {
return 0, fmt.Errorf("failed to determine default retention policy: %s", err.Error())
} else if rp == nil {
return 0, ErrDefaultRetentionPolicyNotFound
}
retentionPolicy = rp.Name
}
@ -1240,11 +1243,11 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
// Retrieve shard group.
g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp)
if err != nil {
return 0, fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
return 0, fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
}
// Find appropriate shard within the shard group.
sh := g.Shards[int(seriesID)%len(g.Shards)]
sh := g.ShardBySeriesID(seriesID)
// Ignore requests that have no values.
if len(values) == 0 {
@ -1265,12 +1268,11 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
})
// Publish "write series" message on shard's topic to broker.
_, err = s.client.Publish(&messaging.Message{
return s.client.Publish(&messaging.Message{
Type: writeSeriesMessageType,
TopicID: sh.ID,
Data: data,
})
return 0, err
}
// If we can successfully encode the string keys to raw field ids then
@ -1359,10 +1361,10 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
return sh.writeSeries(c.SeriesID, c.Timestamp, data, overwrite)
}
// applyRawWriteSeries writes raw series data to the database.
// applyWriteRawSeries writes raw series data to the database.
// Raw series data has already converted field names to ids so the
// representation is fast and compact.
func (s *Server) applyRawWriteSeries(m *messaging.Message) error {
func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
// Retrieve the shard.
sh := s.Shard(m.TopicID)
if sh == nil {
@ -1458,6 +1460,9 @@ func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[str
// Decode into a raw value map.
rawValues := unmarshalValues(data)
if rawValues == nil {
return nil, nil
}
// Decode into a string-key value map.
values := make(map[string]interface{}, len(rawValues))
@ -1509,7 +1514,7 @@ func (s *Server) measurement(database, name string) (*Measurement, error) {
// ExecuteQuery executes an InfluxQL query against the server.
// Returns a resultset for each statement in the query.
// Stops on first execution error that occurs.
func (s *Server) Execute(q *influxql.Query, database string) []*Result {
func (s *Server) ExecuteQuery(q *influxql.Query, database string) []*Result {
// Build empty resultsets.
results := make([]*Result, len(q.Statements))
@ -1524,7 +1529,7 @@ func (s *Server) Execute(q *influxql.Query, database string) []*Result {
// Fill any empty results after error.
for i, res := range results {
if res == nil {
results[i] = &Result{Error: ErrNotExecuted}
results[i] = &Result{Err: ErrNotExecuted}
}
}
@ -1536,13 +1541,13 @@ func (s *Server) executeSelectStatement(stmt *influxql.SelectStatement, database
// Plan statement execution.
e, err := s.planSelectStatement(stmt, database)
if err != nil {
return &Result{Error: err}
return &Result{Err: err}
}
// Execute plan.
ch, err := e.Execute()
if err != nil {
return &Result{Error: err}
return &Result{Err: err}
}
// Read all rows from channel.
@ -1575,10 +1580,20 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
for {
// Read incoming message.
var m *messaging.Message
var ok bool
select {
case <-done:
return
case m = <-client.C():
case m, ok = <-client.C():
if !ok {
return
}
}
// Exit if closed.
// TODO: Wrap this check in a lock with the apply itself.
if !s.opened() {
continue
}
// Process message.
@ -1587,7 +1602,7 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
case writeSeriesMessageType:
err = s.applyWriteSeries(m)
case writeRawSeriesMessageType:
err = s.applyRawWriteSeries(m)
err = s.applyWriteRawSeries(m)
case createDataNodeMessageType:
err = s.applyCreateDataNode(m)
case deleteDataNodeMessageType:
@ -1628,8 +1643,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
// Result represents a resultset returned from a single statement.
type Result struct {
Rows []*influxql.Row `json:"rows"`
Error error `json:"error"`
Rows []*influxql.Row `json:"rows,omitempty"`
Err error `json:"error,omitempty"`
}
// MessagingClient represents the client used to receive messages from brokers.

View File

@ -7,11 +7,13 @@ import (
"net/url"
"os"
"reflect"
"strings"
"testing"
"time"
"code.google.com/p/go.crypto/bcrypt"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
)
@ -505,7 +507,7 @@ func TestServer_WriteSeries(t *testing.T) {
// Write series with one point to the database.
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": 23.2})
index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": float64(23.2)})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
@ -513,7 +515,7 @@ func TestServer_WriteSeries(t *testing.T) {
}
// Write another point 10 seconds later so it goes through "raw series".
index, err = s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), map[string]interface{}{"value": 100})
index, err = s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), map[string]interface{}{"value": float64(100)})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
@ -528,15 +530,15 @@ func TestServer_WriteSeries(t *testing.T) {
// Retrieve first series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z")); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(v, map[string]interface{}{"value": 23.2}) {
} else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(23.2)}) {
t.Fatalf("values mismatch: %#v", v)
}
// Retrieve second series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z")); err != nil {
t.Fatal(err)
} else if mustMarshalJSON(v) != mustMarshalJSON(map[string]interface{}{"value": 100}) {
t.Fatalf("values mismatch: %#v", mustMarshalJSON(v))
} else if mustMarshalJSON(v) != mustMarshalJSON(map[string]interface{}{"value": float64(100)}) {
t.Fatalf("values mismatch: %#v", v)
}
// Retrieve non-existent series data point.
@ -547,6 +549,34 @@ func TestServer_WriteSeries(t *testing.T) {
}
}
// Ensure the server can execute a query and return the data correctly.
func TestServer_ExecuteQuery(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
s.CreateUser("susy", "pass", false)
// Write series with one point to the database.
s.MustWriteSeries("foo", "raw", "cpu", map[string]string{"region": "us-east"}, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": float64(20)})
s.MustWriteSeries("foo", "raw", "cpu", map[string]string{"region": "us-east"}, mustParseTime("2000-01-01T00:00:10Z"), map[string]interface{}{"value": float64(30)})
s.MustWriteSeries("foo", "raw", "cpu", map[string]string{"region": "us-west"}, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": float64(100)})
// Select data from the server.
results := s.ExecuteQuery(MustParseQuery("SELECT sum(value) FROM cpu"), "foo")
if len(results) != 1 {
t.Fatalf("unexpected result count: %d", len(results))
}
if res := results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Rows) != 1 {
t.Fatalf("unexpected row count: %s", len(res.Rows))
} else if s := mustMarshalJSON(res); s != `{"rows":[{"name":"cpu","columns":["time","sum"],"values":[[0,150]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
@ -584,7 +614,7 @@ func TestServer_Measurements(t *testing.T) {
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatal("sync error: %s", err)
t.Fatalf("sync error: %s", err)
}
expectedMeasurementNames := []string{"cpu_load"}
@ -687,6 +717,18 @@ func (s *Server) Close() {
s.Server.Close()
}
// MustWriteSeries writes series data and waits for the data to be applied.
// Returns the messaging index for the write.
func (s *Server) MustWriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) uint64 {
index, err := s.WriteSeries(database, retentionPolicy, name, tags, timestamp, values)
if err != nil {
panic(err.Error())
} else if err = s.Sync(index); err != nil {
panic("sync error: " + err.Error())
}
return index
}
// MessagingClient represents a test client for the messaging broker.
type MessagingClient struct {
index uint64
@ -766,6 +808,15 @@ func mustParseTime(s string) time.Time {
return t
}
// MustParseQuery parses an InfluxQL query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.NewParser(strings.NewReader(s)).ParseQuery()
if err != nil {
panic(err.Error())
}
return q
}
// errstr is an ease-of-use function to convert an error to a string.
func errstr(err error) string {
if err != nil {

View File

@ -26,6 +26,11 @@ func (g *ShardGroup) close() {
}
}
// ShardBySeriesID returns the shard that a series is assigned to in the group.
func (g *ShardGroup) ShardBySeriesID(seriesID uint32) *Shard {
return g.Shards[int(seriesID)%len(g.Shards)]
}
// Shard represents the logical storage for a given time range.
// The instance on a local server may contain the raw data in "store" if the
// shard is assigned to the server's data node id.
@ -131,7 +136,7 @@ func (s *Shard) deleteSeries(name string) error {
type Shards []*Shard
// pointHeaderSize represents the size of a point header, in bytes.
const pointHeaderSize = 4 + 12 // seriesID + timestamp
const pointHeaderSize = 4 + 8 // seriesID + timestamp
// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice.
func marshalPointHeader(seriesID uint32, timestamp int64) []byte {
@ -167,9 +172,15 @@ func marshalValues(values map[uint8]interface{}) []byte {
buf := make([]byte, 9)
buf[0] = fieldID
// Convert integers to floats.
v := values[fieldID]
if intval, ok := v.(int); ok {
v = float64(intval)
}
// Encode value after field id.
// TODO: Support non-float types.
switch v := values[fieldID].(type) {
switch v := v.(type) {
case float64:
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
default:
@ -214,6 +225,13 @@ func unmarshalValues(b []byte) map[uint8]interface{} {
return values
}
// unmarshalValue extracts a single value by field id from an encoded byte slice.
func unmarshalValue(b []byte, fieldID uint8) interface{} {
// OPTIMIZE: Don't materialize entire map. Just search for value.
values := unmarshalValues(b)
return values[fieldID]
}
type uint8Slice []uint8
func (p uint8Slice) Len() int { return len(p) }