Close #293. Merge branch 'pr-293'

pull/336/head v0.5.0
John Shahid 2014-03-24 12:50:29 -04:00
commit a11a11e1a6
11 changed files with 320 additions and 39 deletions

View File

@ -23,6 +23,14 @@ port = 8086 # binding is disabled if the port isn't set
# ssl-port = 8084 # Ssl support is enabled if you set a port and cert
# ssl-cert = /path/to/cert.pem
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = false
# port = 2003
# database = "" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.

145
src/api/graphite/api.go Normal file
View File

@ -0,0 +1,145 @@
// package Graphite provides a tcp listener that you can use to ingest metrics into influxdb
// via the graphite protocol.
// it behaves as a carbon daemon, except:
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
package graphite
import (
"bufio"
"cluster"
log "code.google.com/p/log4go"
. "common"
"configuration"
"coordinator"
"net"
"protocol"
"time"
)
type Server struct {
listenAddress string
database string
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn net.Listener
user *cluster.ClusterAdmin
shutdown chan bool
}
// TODO: check that database exists and create it if not
func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}
self.listenAddress = config.GraphitePortString()
self.database = config.GraphiteDatabase
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig
return self
}
// getAuth assures that the user property is a user with access to the graphite database
// only call this function after everything (i.e. Raft) is initialized, so that there's at least 1 admin user
func (self *Server) getAuth() {
// just use any (the first) of the list of admins.
names := self.clusterConfig.GetClusterAdmins()
self.user = self.clusterConfig.GetClusterAdmin(names[0])
}
func (self *Server) ListenAndServe() {
self.getAuth()
var err error
if self.listenAddress != "" {
self.conn, err = net.Listen("tcp", self.listenAddress)
if err != nil {
log.Error("GraphiteServer: Listen: ", err)
return
}
}
self.Serve(self.conn)
}
func (self *Server) Serve(listener net.Listener) {
// not really sure of the use of this shutdown channel,
// as all handling is done through goroutines. maybe we should use a waitgroup
defer func() { self.shutdown <- true }()
for {
conn_in, err := listener.Accept()
if err != nil {
log.Error("GraphiteServer: Accept: ", err)
continue
}
go self.handleClient(conn_in)
}
}
func (self *Server) Close() {
if self.conn != nil {
log.Info("GraphiteServer: Closing graphite server")
self.conn.Close()
log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process")
select {
case <-time.After(time.Second * 5):
log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway")
case <-self.shutdown:
}
}
}
func (self *Server) writePoints(series *protocol.Series) error {
err := self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, series)
if err != nil {
log.Warn("GraphiteServer: failed to write series after getting new auth: %s\n", err.Error())
}
default:
log.Warn("GraphiteServer: failed write series: %s\n", err.Error())
}
}
return err
}
func (self *Server) handleClient(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
graphiteMetric := &GraphiteMetric{}
err := graphiteMetric.Read(reader)
if err != nil {
log.Error(err)
return
}
values := []*protocol.FieldValue{}
if graphiteMetric.isInt {
values = append(values, &protocol.FieldValue{Int64Value: &graphiteMetric.integerValue})
} else {
values = append(values, &protocol.FieldValue{DoubleValue: &graphiteMetric.floatValue})
}
sn := uint64(1) // use same SN makes sure that we'll only keep the latest value for a given metric_id-timestamp pair
point := &protocol.Point{
Timestamp: &graphiteMetric.timestamp,
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &graphiteMetric.name,
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
self.writePoints(series)
}
}

View File

@ -0,0 +1,50 @@
package graphite
import (
"bufio"
"fmt"
"io"
"strconv"
"strings"
)
type GraphiteMetric struct {
name string
isInt bool
integerValue int64
floatValue float64
timestamp int64
}
func (self *GraphiteMetric) Read(reader *bufio.Reader) error {
buf, err := reader.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return fmt.Errorf("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error())
}
if len(str) > 0 {
return fmt.Errorf("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error())
}
return err
}
elements := strings.Split(str, " ")
if len(elements) != 3 {
return fmt.Errorf("Received '%s' which doesn't have three fields", str)
}
self.name = elements[0]
self.floatValue, err = strconv.ParseFloat(elements[1], 64)
if err != nil {
return err
}
if i := int64(self.floatValue); float64(i) == self.floatValue {
self.isInt = true
self.integerValue = int64(self.floatValue)
}
timestamp, err := strconv.ParseUint(elements[2], 10, 32)
if err != nil {
return err
}
self.timestamp = int64(timestamp * 1000000)
return nil
}

View File

@ -1,7 +1,7 @@
# Welcome to the InfluxDB configuration file.
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resovled here.
# hostname = ""
@ -20,6 +20,14 @@ assets = "./admin"
ssl-port = 8087 # Ssl support is enabled if you set a port and cert
ssl-cert = "../cert.pem"
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = false
port = 2003
database = "" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
@ -76,12 +84,12 @@ lru-cache-size = "200m"
# files. max-open-files is per shard so this * that will be max.
# max-open-shards = 0
# These options specify how data is sharded across the cluster. There are two
# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
# into the long term storage. Any series beginning with a lower case letter
# like exceptions will be written into short term. The idea being that you
# can write high precision data into short term and drop it after a couple
# can write high precision data into short term and drop it after a couple
# of days. Meanwhile, continuous queries can run downsampling on the short term
# data and write into the long term area.
[sharding]
@ -96,7 +104,7 @@ lru-cache-size = "200m"
# over the network when doing a query.
duration = "7d"
# split will determine how many shards to split each duration into. For example,
# split will determine how many shards to split each duration into. For example,
# if we created a shard for 2014-02-10 and split was set to 2. Then two shards
# would be created that have the data for 2014-02-10. By default, data will
# be split into those two shards deterministically by hashing the (database, serise)

View File

@ -62,6 +62,12 @@ type ApiConfig struct {
Port int
}
type GraphiteConfig struct {
Enabled bool
Port int
Database string
}
type RaftConfig struct {
Port int
Dir string
@ -87,10 +93,10 @@ type LoggingConfig struct {
}
type LevelDbConfiguration struct {
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
PointBatchSize int `toml:"point-batch-size"`
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
PointBatchSize int `toml:"point-batch-size"`
}
type ShardingDefinition struct {
@ -154,18 +160,23 @@ type WalConfig struct {
RequestsPerLogFile int `toml:"requests-per-log-file"`
}
type InputPlugins struct {
Graphite GraphiteConfig `toml:"graphite"`
}
type TomlConfiguration struct {
Admin AdminConfig
Api ApiConfig
Raft RaftConfig
Storage StorageConfig
Cluster ClusterConfig
Logging LoggingConfig
LevelDb LevelDbConfiguration
Hostname string
BindAddress string `toml:"bind-address"`
Sharding ShardingDefinition `toml:"sharding"`
WalConfig WalConfig `toml:"wal"`
Admin AdminConfig
HttpApi ApiConfig `toml:"api"`
InputPlugins InputPlugins `toml:"input_plugins"`
Raft RaftConfig
Storage StorageConfig
Cluster ClusterConfig
Logging LoggingConfig
LevelDb LevelDbConfiguration
Hostname string
BindAddress string `toml:"bind-address"`
Sharding ShardingDefinition `toml:"sharding"`
WalConfig WalConfig `toml:"wal"`
}
type Configuration struct {
@ -174,6 +185,9 @@ type Configuration struct {
ApiHttpSslPort int
ApiHttpCertPath string
ApiHttpPort int
GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string
RaftServerPort int
SeedServers []string
DataDir string
@ -188,7 +202,7 @@ type Configuration struct {
LevelDbMaxOpenFiles int
LevelDbLruCacheSize int
LevelDbMaxOpenShards int
LevelDbPointBatchSize int
LevelDbPointBatchSize int
ShortTermShard *ShardConfiguration
LongTermShard *ShardConfiguration
ReplicationFactor int
@ -246,9 +260,12 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
config := &Configuration{
AdminHttpPort: tomlConfiguration.Admin.Port,
AdminAssetsDir: tomlConfiguration.Admin.Assets,
ApiHttpPort: tomlConfiguration.Api.Port,
ApiHttpCertPath: tomlConfiguration.Api.SslCertPath,
ApiHttpSslPort: tomlConfiguration.Api.SslPort,
ApiHttpPort: tomlConfiguration.HttpApi.Port,
ApiHttpCertPath: tomlConfiguration.HttpApi.SslCertPath,
ApiHttpSslPort: tomlConfiguration.HttpApi.SslPort,
GraphiteEnabled: tomlConfiguration.InputPlugins.Graphite.Enabled,
GraphitePort: tomlConfiguration.InputPlugins.Graphite.Port,
GraphiteDatabase: tomlConfiguration.InputPlugins.Graphite.Database,
RaftServerPort: tomlConfiguration.Raft.Port,
RaftDir: tomlConfiguration.Raft.Dir,
ProtobufPort: tomlConfiguration.Cluster.ProtobufPort,
@ -339,6 +356,14 @@ func (self *Configuration) ApiHttpSslPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ApiHttpSslPort)
}
func (self *Configuration) GraphitePortString() string {
if self.GraphitePort <= 0 {
return ""
}
return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
}
func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
}

View File

@ -34,6 +34,10 @@ func (self *LoadConfigurationSuite) TestConfig(c *C) {
c.Assert(config.ApiHttpCertPath, Equals, "../cert.pem")
c.Assert(config.ApiHttpPortString(), Equals, "")
c.Assert(config.GraphiteEnabled, Equals, false)
c.Assert(config.GraphitePort, Equals, 2003)
c.Assert(config.GraphiteDatabase, Equals, "")
c.Assert(config.RaftDir, Equals, "/tmp/influxdb/development/raft")
c.Assert(config.RaftServerPort, Equals, 8090)

View File

@ -9,6 +9,7 @@ import (
"fmt"
"io/ioutil"
. "launchpad.net/gocheck"
"net"
"net/http"
"net/url"
"os"
@ -517,6 +518,26 @@ func (self *ServerSuite) TestGroupByDay(c *C) {
c.Assert(series.GetValueForPointAndColumn(1, "count", c).(float64), Equals, float64(1))
}
func (self *ServerSuite) TestGraphiteInterface(c *C) {
conn, err := net.Dial("tcp", "localhost:60513")
c.Assert(err, IsNil)
now := time.Now().UTC().Truncate(time.Minute)
data := fmt.Sprintf("some_metric 100 %d\nsome_metric 200.5 %d\n", now.Add(-time.Minute).Unix(), now.Unix())
_, err = conn.Write([]byte(data))
c.Assert(err, IsNil)
time.Sleep(time.Second)
collection := self.serverProcesses[0].QueryWithUsername("graphite_db", "select * from some_metric", false, c, "root", "root")
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("some_metric", c)
c.Assert(series.Points, HasLen, 2)
c.Assert(series.GetValueForPointAndColumn(0, "value", c).(float64), Equals, float64(200.5))
c.Assert(series.GetValueForPointAndColumn(1, "value", c).(float64), Equals, float64(100))
}
func (self *ServerSuite) TestLimitQueryOnSingleShard(c *C) {
data := `[{"points": [[4], [10], [5]], "name": "test_limit_query_single_shard", "columns": ["value"]}]`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)

View File

@ -1,7 +1,7 @@
# Welcome to the InfluxDB configuration file.
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resovled here.
# hostname = ""
@ -21,6 +21,14 @@ port = 60500
ssl-port = 60503
ssl-cert = "./cert.pem"
[input_plugins]
# Configure the graphite api
[input_plugins.graphite]
enabled = true
port = 60513
database = "graphite_db" # store graphite data in this database
# Raft configuration
[raft]
# The raft port should be open between all servers in a cluster.
@ -62,12 +70,12 @@ write-buffer-size = 1000
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
query-shard-buffer-size = 500
# These options specify how data is sharded across the cluster. There are two
# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
# into the long term storage. Any series beginning with a lower case letter
# like exceptions will be written into short term. The idea being that you
# can write high precision data into short term and drop it after a couple
# can write high precision data into short term and drop it after a couple
# of days. Meanwhile, continuous queries can run downsampling on the short term
# data and write into the long term area.
[sharding]
@ -82,7 +90,7 @@ query-shard-buffer-size = 500
# over the network when doing a query.
duration = "1h"
# split will determine how many shards to split each duration into. For example,
# split will determine how many shards to split each duration into. For example,
# if we created a shard for 2014-02-10 and split was set to 2. Then two shards
# would be created that have the data for 2014-02-10. By default, data will
# be split into those two shards deterministically by hashing the (database, serise)

View File

@ -1,7 +1,7 @@
# Welcome to the InfluxDB configuration file.
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resovled here.
# hostname = ""
@ -60,12 +60,12 @@ write-buffer-size = 1000
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
query-shard-buffer-size = 500
# These options specify how data is sharded across the cluster. There are two
# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
# into the long term storage. Any series beginning with a lower case letter
# like exceptions will be written into short term. The idea being that you
# can write high precision data into short term and drop it after a couple
# can write high precision data into short term and drop it after a couple
# of days. Meanwhile, continuous queries can run downsampling on the short term
# data and write into the long term area.
[sharding]
@ -80,7 +80,7 @@ query-shard-buffer-size = 500
# over the network when doing a query.
duration = "1h"
# split will determine how many shards to split each duration into. For example,
# split will determine how many shards to split each duration into. For example,
# if we created a shard for 2014-02-10 and split was set to 2. Then two shards
# would be created that have the data for 2014-02-10. By default, data will
# be split into those two shards deterministically by hashing the (database, serise)

View File

@ -1,7 +1,7 @@
# Welcome to the InfluxDB configuration file.
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# If hostname (on the OS) doesn't return a name that can be resolved by the other
# systems in the cluster, you'll have to set the hostname to an IP or something
# that can be resovled here.
# hostname = ""
@ -60,12 +60,12 @@ write-buffer-size = 1000
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
query-shard-buffer-size = 500
# These options specify how data is sharded across the cluster. There are two
# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
# into the long term storage. Any series beginning with a lower case letter
# like exceptions will be written into short term. The idea being that you
# can write high precision data into short term and drop it after a couple
# can write high precision data into short term and drop it after a couple
# of days. Meanwhile, continuous queries can run downsampling on the short term
# data and write into the long term area.
[sharding]
@ -80,7 +80,7 @@ query-shard-buffer-size = 500
# over the network when doing a query.
duration = "1h"
# split will determine how many shards to split each duration into. For example,
# split will determine how many shards to split each duration into. For example,
# if we created a shard for 2014-02-10 and split was set to 2. Then two shards
# would be created that have the data for 2014-02-10. By default, data will
# be split into those two shards deterministically by hashing the (database, serise)

View File

@ -2,6 +2,7 @@ package server
import (
"admin"
"api/graphite"
"api/http"
"cluster"
log "code.google.com/p/log4go"
@ -20,6 +21,7 @@ type Server struct {
ProtobufServer *coordinator.ProtobufServer
ClusterConfig *cluster.ClusterConfiguration
HttpApi *http.HttpServer
GraphiteApi *graphite.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
@ -57,6 +59,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
raftServer.AssignCoordinator(coord)
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())
return &Server{
@ -64,6 +67,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
ProtobufServer: protobufServer,
ClusterConfig: clusterConfig,
HttpApi: httpApi,
GraphiteApi: graphiteApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
@ -100,6 +104,14 @@ func (self *Server) ListenAndServe() error {
go self.ListenForSignals()
log.Info("Starting admin interface on port %d", self.Config.AdminHttpPort)
go self.AdminServer.ListenAndServe()
if self.Config.GraphiteEnabled {
if self.Config.GraphitePort <= 0 || self.Config.GraphiteDatabase == "" {
log.Warn("Cannot start graphite server. please check your configuration")
} else {
log.Info("Starting Graphite Listener on port %d", self.Config.GraphitePort)
go self.GraphiteApi.ListenAndServe()
}
}
log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort)
self.HttpApi.ListenAndServe()
return nil