Merge branch 'master' of https://github.com/influxdb/influxdb into election
commit
232b4cfb34
|
@ -30,8 +30,15 @@ protocol/protocol.pb.go
|
|||
build/
|
||||
|
||||
# executables
|
||||
/influxd
|
||||
/influxdb
|
||||
|
||||
influxd
|
||||
**/influxd
|
||||
!**/influxd/
|
||||
|
||||
influxdb
|
||||
**/influxdb
|
||||
!**/influxdb/
|
||||
|
||||
/benchmark-tool
|
||||
/main
|
||||
/benchmark-storage
|
||||
|
|
|
@ -113,6 +113,11 @@ go test -run=TestDatabase . -v
|
|||
go test -coverprofile /tmp/cover . && go tool cover -html /tmp/cover
|
||||
```
|
||||
|
||||
To install go cover, run the following command:
|
||||
```
|
||||
go get golang.org/x/tools/cmd/cover
|
||||
```
|
||||
|
||||
Useful links
|
||||
------------
|
||||
- [Useful techniques in Go](http://arslan.io/ten-useful-techniques-in-go)
|
||||
|
|
|
@ -35,5 +35,5 @@ You don't need to build the project to use it. Pre-built
|
|||
the recommended way to get it running. However, if you want to
|
||||
contribute to the core of InfluxDB, you'll need to build. For those
|
||||
adventurous enough, you can
|
||||
[follow along on our docs](http://github.com/influxdb/influxdb/blob/master/docs/contributing.md)
|
||||
[follow along on our docs](http://github.com/influxdb/influxdb/blob/master/CONTRIBUTING.md).
|
||||
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/messaging"
|
||||
)
|
||||
|
||||
// Broker represents an InfluxDB specific messaging broker.
|
||||
type Broker struct {
|
||||
*messaging.Broker
|
||||
|
||||
done chan struct{}
|
||||
|
||||
// send CQ processing requests to the same data node
|
||||
currentCQProcessingNode *messaging.Replica
|
||||
|
||||
// variables to control when to trigger processing and when to timeout
|
||||
TriggerInterval time.Duration
|
||||
TriggerTimeout time.Duration
|
||||
TriggerFailurePause time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
|
||||
// in the cluster to run any continuous queries that should be run.
|
||||
DefaultContinuousQueryCheckTime = 1 * time.Second
|
||||
|
||||
// DefaultDataNodeTimeout is how long the broker will wait before timing out on a data node
|
||||
// that it has requested process continuous queries.
|
||||
DefaultDataNodeTimeout = 1 * time.Second
|
||||
|
||||
// DefaultFailureSleep is how long the broker will sleep before trying the next data node in
|
||||
// the cluster if the current data node failed to respond
|
||||
DefaultFailureSleep = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// NewBroker returns a new instance of a Broker with default values.
|
||||
func NewBroker() *Broker {
|
||||
b := &Broker{
|
||||
TriggerInterval: 5 * time.Second,
|
||||
TriggerTimeout: 20 * time.Second,
|
||||
TriggerFailurePause: 1 * time.Second,
|
||||
}
|
||||
b.Broker = messaging.NewBroker()
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *Broker) RunContinuousQueryLoop() {
|
||||
b.done = make(chan struct{})
|
||||
go b.continuousQueryLoop(b.done)
|
||||
}
|
||||
|
||||
func (b *Broker) Close() error {
|
||||
if b.done != nil {
|
||||
close(b.done)
|
||||
b.done = nil
|
||||
}
|
||||
return b.Broker.Close()
|
||||
}
|
||||
|
||||
func (b *Broker) continuousQueryLoop(done chan struct{}) {
|
||||
for {
|
||||
// Check if broker is currently leader.
|
||||
if b.Broker.IsLeader() {
|
||||
b.runContinuousQueries()
|
||||
}
|
||||
|
||||
// Sleep until either the broker is closed or we need to run continuous queries again
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-time.After(DefaultContinuousQueryCheckTime):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) runContinuousQueries() {
|
||||
next := 0
|
||||
for {
|
||||
// if the current node hasn't been set it's our first time or we're reset. move to the next one
|
||||
if b.currentCQProcessingNode == nil {
|
||||
dataNodes := b.Broker.Replicas()
|
||||
if len(dataNodes) == 0 {
|
||||
return // don't have any nodes to try, give it up
|
||||
}
|
||||
next = next % len(dataNodes)
|
||||
b.currentCQProcessingNode = dataNodes[next]
|
||||
next++
|
||||
}
|
||||
|
||||
// if no error, we're all good
|
||||
err := b.requestContinuousQueryProcessing()
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
log.Printf("broker cq: error hitting data node: %s: %s\n", b.currentCQProcessingNode.URL, err.Error())
|
||||
|
||||
// reset and let the loop try the next data node in the cluster
|
||||
b.currentCQProcessingNode = nil
|
||||
<-time.After(DefaultFailureSleep)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) requestContinuousQueryProcessing() error {
|
||||
// Send request.
|
||||
cqURL := copyURL(b.currentCQProcessingNode.URL)
|
||||
cqURL.Path = "/process_continuous_queries"
|
||||
cqURL.Scheme = "http"
|
||||
client := &http.Client{
|
||||
Timeout: DefaultDataNodeTimeout,
|
||||
}
|
||||
resp, err := client.Post(cqURL.String(), "application/octet-stream", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Check if created.
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
return fmt.Errorf("request returned status %s", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
package influxdb_test
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
)
|
||||
|
||||
func TestBroker_WillRunQueries(t *testing.T) {
|
||||
// this handler should just work
|
||||
testHandler := &BrokerTestHandler{}
|
||||
server := httptest.NewServer(testHandler)
|
||||
defer server.Close()
|
||||
// this will timeout on the trigger request
|
||||
timeoutHandler := &BrokerTestHandler{wait: 1100 * time.Millisecond}
|
||||
timeoutServer := httptest.NewServer(timeoutHandler)
|
||||
defer timeoutServer.Close()
|
||||
// this will return a 500
|
||||
badHandler := &BrokerTestHandler{sendError: true}
|
||||
badServer := httptest.NewServer(badHandler)
|
||||
defer badServer.Close()
|
||||
|
||||
b := influxdb.NewBroker()
|
||||
|
||||
// set the trigger times and failure sleeps for the test
|
||||
b.TriggerInterval = 2 * time.Millisecond
|
||||
b.TriggerTimeout = 100 * time.Millisecond
|
||||
b.TriggerFailurePause = 2 * time.Millisecond
|
||||
|
||||
f := tempfile()
|
||||
defer os.Remove(f)
|
||||
|
||||
if err := b.Open(f, &url.URL{Host: "127.0.0.1:8080"}); err != nil {
|
||||
t.Fatalf("error opening broker: %s", err)
|
||||
}
|
||||
if err := b.Initialize(); err != nil {
|
||||
t.Fatalf("error initializing broker: %s", err)
|
||||
}
|
||||
defer b.Close()
|
||||
|
||||
// set the data nodes (replicas) so all the failure cases get hit first
|
||||
if err := b.Broker.CreateReplica(1, &url.URL{Host: "127.0.0.1:8090"}); err != nil {
|
||||
t.Fatalf("couldn't create replica %s", err.Error())
|
||||
}
|
||||
b.Broker.CreateReplica(2, &url.URL{Host: timeoutServer.URL[7:]})
|
||||
b.Broker.CreateReplica(3, &url.URL{Host: badServer.URL[7:]})
|
||||
b.Broker.CreateReplica(4, &url.URL{Host: server.URL[7:]})
|
||||
|
||||
b.RunContinuousQueryLoop()
|
||||
// every failure and success case should be hit in this time frame
|
||||
time.Sleep(1400 * time.Millisecond)
|
||||
if timeoutHandler.requestCount != 1 {
|
||||
t.Fatal("broker should have only sent 1 request to the server that times out.")
|
||||
}
|
||||
if badHandler.requestCount != 1 {
|
||||
t.Fatal("broker should have only sent 1 request to the bad server. i.e. it didn't keep the state to make request to the good server")
|
||||
}
|
||||
if testHandler.requestCount < 1 || testHandler.processRequestCount < 1 {
|
||||
t.Fatal("broker failed to send multiple continuous query requests to the data node")
|
||||
}
|
||||
}
|
||||
|
||||
type BrokerTestHandler struct {
|
||||
requestCount int
|
||||
processRequestCount int
|
||||
sendError bool
|
||||
wait time.Duration
|
||||
}
|
||||
|
||||
// ServeHTTP serves an HTTP request.
|
||||
func (h *BrokerTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.requestCount++
|
||||
<-time.After(h.wait)
|
||||
if h.sendError {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
switch r.URL.Path {
|
||||
case "/process_continuous_queries":
|
||||
if r.Method == "POST" {
|
||||
h.processRequestCount++
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
} else {
|
||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
}
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
|
@ -13,9 +13,10 @@ import (
|
|||
)
|
||||
|
||||
type Config struct {
|
||||
URL url.URL
|
||||
Username string
|
||||
Password string
|
||||
URL url.URL
|
||||
Username string
|
||||
Password string
|
||||
UserAgent string
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
|
@ -23,6 +24,7 @@ type Client struct {
|
|||
username string
|
||||
password string
|
||||
httpClient *http.Client
|
||||
userAgent string
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
|
@ -42,6 +44,7 @@ func NewClient(c Config) (*Client, error) {
|
|||
username: c.Username,
|
||||
password: c.Password,
|
||||
httpClient: &http.Client{},
|
||||
userAgent: c.UserAgent,
|
||||
}
|
||||
return &client, nil
|
||||
}
|
||||
|
@ -55,7 +58,14 @@ func (c *Client) Query(q Query) (*Results, error) {
|
|||
values.Set("db", q.Database)
|
||||
u.RawQuery = values.Encode()
|
||||
|
||||
resp, err := c.httpClient.Get(u.String())
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.userAgent != "" {
|
||||
req.Header.Set("User-Agent", c.userAgent)
|
||||
}
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -87,7 +97,15 @@ func (c *Client) Write(writes ...Write) (*Results, error) {
|
|||
b := []byte{}
|
||||
err := json.Unmarshal(b, &d)
|
||||
|
||||
resp, err := c.httpClient.Post(c.url.String(), "application/json", bytes.NewBuffer(b))
|
||||
req, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(b))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if c.userAgent != "" {
|
||||
req.Header.Set("User-Agent", c.userAgent)
|
||||
}
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -108,7 +126,15 @@ func (c *Client) Ping() (time.Duration, string, error) {
|
|||
now := time.Now()
|
||||
u := c.url
|
||||
u.Path = "ping"
|
||||
resp, err := c.httpClient.Get(u.String())
|
||||
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
if c.userAgent != "" {
|
||||
req.Header.Set("User-Agent", c.userAgent)
|
||||
}
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
|
|
|
@ -118,6 +118,80 @@ func TestClient_Write(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestClient_UserAgent(t *testing.T) {
|
||||
receivedUserAgent := ""
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
receivedUserAgent = r.UserAgent()
|
||||
|
||||
var data influxdb.Results
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_ = json.NewEncoder(w).Encode(data)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
_, err := http.Get(ts.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
|
||||
defaultUserAgent := receivedUserAgent
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
userAgent string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "Empty user agent",
|
||||
userAgent: "",
|
||||
expected: defaultUserAgent,
|
||||
},
|
||||
{
|
||||
name: "Custom user agent",
|
||||
userAgent: "Test Influx Client",
|
||||
expected: "Test Influx Client",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
u, _ := url.Parse(ts.URL)
|
||||
config := client.Config{URL: *u, UserAgent: test.userAgent}
|
||||
c, err := client.NewClient(config)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
|
||||
receivedUserAgent = ""
|
||||
query := client.Query{}
|
||||
_, err = c.Query(query)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
if receivedUserAgent != test.expected {
|
||||
t.Fatalf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent)
|
||||
}
|
||||
|
||||
receivedUserAgent = ""
|
||||
write := client.Write{}
|
||||
_, err = c.Write(write)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
if receivedUserAgent != test.expected {
|
||||
t.Fatalf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent)
|
||||
}
|
||||
|
||||
receivedUserAgent = ""
|
||||
_, _, err = c.Ping()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
|
||||
}
|
||||
if receivedUserAgent != test.expected {
|
||||
t.Fatalf("Unexpected user agent. expected %v, actual %v", test.expected, receivedUserAgent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoint_UnmarshalEpoch(t *testing.T) {
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
|
|
|
@ -18,6 +18,11 @@ import (
|
|||
"github.com/peterh/liner"
|
||||
)
|
||||
|
||||
// These variables are populated via the Go linker.
|
||||
var (
|
||||
version string = "0.9"
|
||||
)
|
||||
|
||||
const (
|
||||
default_host = "localhost"
|
||||
default_port = 8086
|
||||
|
@ -60,7 +65,7 @@ func main() {
|
|||
}
|
||||
|
||||
// TODO Determine if we are an ineractive shell or running commands
|
||||
fmt.Println("InfluxDB shell")
|
||||
fmt.Println("InfluxDB shell " + version)
|
||||
|
||||
c.Line = liner.NewLiner()
|
||||
defer c.Line.Close()
|
||||
|
@ -182,9 +187,10 @@ func (c *CommandLine) connect(cmd string) {
|
|||
}
|
||||
cl, err := client.NewClient(
|
||||
client.Config{
|
||||
URL: u,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
URL: u,
|
||||
Username: c.Username,
|
||||
Password: c.Password,
|
||||
UserAgent: "InfluxDBShell/" + version,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("Could not create client %s", err)
|
||||
|
|
|
@ -105,6 +105,35 @@ type Config struct {
|
|||
Logging struct {
|
||||
File string `toml:"file"`
|
||||
} `toml:"logging"`
|
||||
|
||||
ContinuousQuery struct {
|
||||
// when continuous queries are run we'll automatically recompute previous intervals
|
||||
// in case lagged data came in. Set to zero if you never have lagged data. We do
|
||||
// it this way because invalidating previously computed intervals would be insanely hard
|
||||
// and expensive.
|
||||
RecomputePreviousN int `toml:"recompute-previous-n"`
|
||||
|
||||
// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
|
||||
// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
|
||||
// and have this set to 10m, then we'd only compute the previous two intervals for any
|
||||
// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
|
||||
RecomputeNoOlderThan Duration `toml:"recompute-no-older-than"`
|
||||
|
||||
// ComputeRunsPerInterval will determine how many times the current and previous N intervals
|
||||
// will be computed. The group by time will be divided by this and it will get computed this many times:
|
||||
// group by time seconds / runs per interval
|
||||
// This will give partial results for current group by intervals and will determine how long it will
|
||||
// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
|
||||
// will be a minute past the previous 10m bucket of time before lagged data is picked up
|
||||
ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`
|
||||
|
||||
// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
|
||||
// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
|
||||
// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
|
||||
// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
|
||||
// than 10m will get computed 10 times for each interval.
|
||||
ComputeNoMoreThan Duration `toml:"compute-no-more-than"`
|
||||
} `toml:"continuous_queries"`
|
||||
}
|
||||
|
||||
// NewConfig returns an instance of Config with reasonable defaults.
|
||||
|
@ -121,6 +150,10 @@ func NewConfig() *Config {
|
|||
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
|
||||
c.Admin.Enabled = true
|
||||
c.Admin.Port = 8083
|
||||
c.ContinuousQuery.RecomputePreviousN = 2
|
||||
c.ContinuousQuery.RecomputeNoOlderThan = Duration(10 * time.Minute)
|
||||
c.ContinuousQuery.ComputeRunsPerInterval = 10
|
||||
c.ContinuousQuery.ComputeNoMoreThan = Duration(2 * time.Minute)
|
||||
|
||||
// Detect hostname (or set to localhost).
|
||||
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
|
||||
|
|
|
@ -44,7 +44,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
// Start the broker handler.
|
||||
var h *Handler
|
||||
if b != nil {
|
||||
h = &Handler{brokerHandler: messaging.NewHandler(b)}
|
||||
h = &Handler{brokerHandler: messaging.NewHandler(b.Broker)}
|
||||
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
|
||||
listener, err := net.Listen("tcp", config.BrokerAddr())
|
||||
if err != nil {
|
||||
|
@ -52,10 +52,13 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
}
|
||||
go func() { log.Fatal(http.Serve(listener, h)) }()
|
||||
log.Printf("broker listening on %s", config.BrokerAddr())
|
||||
|
||||
// have it occasionally tell a data node in the cluster to run continuous queries
|
||||
b.RunContinuousQueryLoop()
|
||||
}
|
||||
|
||||
// Open server, initialize or join as necessary.
|
||||
s := openServer(config.DataDir(), config.DataURL(), b, initializing, configExists, joinURLs, logWriter)
|
||||
s := openServer(config, b, initializing, configExists, joinURLs, logWriter)
|
||||
s.SetAuthenticationEnabled(config.Authentication.Enabled)
|
||||
|
||||
// Enable retention policy enforcement if requested.
|
||||
|
@ -132,7 +135,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
}
|
||||
}
|
||||
}
|
||||
return b, s
|
||||
return b.Broker, s
|
||||
}
|
||||
|
||||
// write the current process id to a file specified by path.
|
||||
|
@ -176,15 +179,16 @@ func parseConfig(path, hostname string) *Config {
|
|||
}
|
||||
|
||||
// creates and initializes a broker.
|
||||
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *messaging.Broker {
|
||||
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
|
||||
// Ignore if there's no existing broker and we're not initializing or joining.
|
||||
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create broker.
|
||||
b := messaging.NewBroker()
|
||||
b := influxdb.NewBroker()
|
||||
b.SetLogOutput(w)
|
||||
|
||||
if err := b.Open(path, u); err != nil {
|
||||
log.Fatalf("failed to open broker: %s", err)
|
||||
}
|
||||
|
@ -204,14 +208,14 @@ func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL,
|
|||
}
|
||||
|
||||
// initializes a new broker.
|
||||
func initializeBroker(b *messaging.Broker) {
|
||||
func initializeBroker(b *influxdb.Broker) {
|
||||
if err := b.Initialize(); err != nil {
|
||||
log.Fatalf("initialize: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// joins a broker to an existing cluster.
|
||||
func joinBroker(b *messaging.Broker, joinURLs []*url.URL) {
|
||||
func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
|
||||
// Attempts to join each server until successful.
|
||||
for _, u := range joinURLs {
|
||||
if err := b.Join(u); err != nil {
|
||||
|
@ -225,25 +229,30 @@ func joinBroker(b *messaging.Broker, joinURLs []*url.URL) {
|
|||
}
|
||||
|
||||
// creates and initializes a server.
|
||||
func openServer(path string, u *url.URL, b *messaging.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
// Ignore if there's no existing server and we're not initializing or joining.
|
||||
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
|
||||
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create and open the server.
|
||||
s := influxdb.NewServer()
|
||||
s.SetLogOutput(w)
|
||||
if err := s.Open(path); err != nil {
|
||||
s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN
|
||||
s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan)
|
||||
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
|
||||
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)
|
||||
|
||||
if err := s.Open(config.Data.Dir); err != nil {
|
||||
log.Fatalf("failed to open data server: %v", err.Error())
|
||||
}
|
||||
|
||||
// If the server is uninitialized then initialize or join it.
|
||||
if initializing {
|
||||
if len(joinURLs) == 0 {
|
||||
initializeServer(s, b, w)
|
||||
initializeServer(config.DataURL(), s, b, w)
|
||||
} else {
|
||||
joinServer(s, u, joinURLs)
|
||||
joinServer(s, config.DataURL(), joinURLs)
|
||||
openServerClient(s, joinURLs, w)
|
||||
}
|
||||
} else if !configExists {
|
||||
|
@ -264,11 +273,11 @@ func openServer(path string, u *url.URL, b *messaging.Broker, initializing, conf
|
|||
}
|
||||
|
||||
// initializes a new server that does not yet have an ID.
|
||||
func initializeServer(s *influxdb.Server, b *messaging.Broker, w io.Writer) {
|
||||
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
|
||||
// TODO: Create replica using the messaging client.
|
||||
|
||||
// Create replica on broker.
|
||||
if err := b.CreateReplica(1); err != nil {
|
||||
if err := b.CreateReplica(1, u); err != nil {
|
||||
log.Fatalf("replica creation error: %s", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
# Continuous Queries
|
||||
|
||||
This document lays out continuous queries and a proposed architecture for how they'll work within an InfluxDB cluster.
|
||||
|
||||
## Definition of Continuous Queries
|
||||
|
||||
Continuous queries serve two purposes in InfluxDB:
|
||||
|
||||
1. Combining many series into a single series (i.e. removing 1 or more tag dimensions to make queries more efficient)
|
||||
2. Aggregating and downsampling series
|
||||
|
||||
The purpose of both types of continuous queries is to duplicate or downsample data automatically in the background to make querying thier results fast and efficient. Think of them as another way to create indexes on data.
|
||||
|
||||
Generally, there are continuous queries that create copyies of data into another measurement or tagset and queries that downsample and aggregate data. The only difference between the two types is if the query has a `GROUP BY time` clause.
|
||||
|
||||
Before we get to the continuous query examples, we need to define the `INTO` syntax of queries.
|
||||
|
||||
### INTO
|
||||
|
||||
`INTO` is a method for running a query and having it output into either another measurement name, retention policy, or database. The syntax looks like this:
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
INTO [<retention policy>.]<measurement> [ON <database>]
|
||||
FROM <measurement>
|
||||
[WHERE ...]
|
||||
[GROUP BY ...]
|
||||
```
|
||||
|
||||
The syntax states that the retention policy, database, where clause, and group by clause are all optional. If a retention policy isn't specified, the database's default retention policy will be written into. If the database isn't specified, the database the query is running from will be written into.
|
||||
|
||||
By selecting specific fields, `INTO` can merge many series into one that will go into a new either a new measurement, retention policy, or database. For example:
|
||||
|
||||
```sql
|
||||
SELECT mean(value) as value, region
|
||||
INTO 1h.cpu_load
|
||||
FROM cpu_load
|
||||
GROUP BY time(1h), region
|
||||
```
|
||||
|
||||
That will give 1h summaries of the mean value of the `cpu_load` for each `region`. Specifying `region` in the `GROUP BY` clause is unnecessary since having it in the `SELECT` clause forces it to be grouped by that tag, we've just included it in the example for clarity.
|
||||
|
||||
With `SELECT ... INTO`, fields will be written as fields and tags will be written as tags.
|
||||
|
||||
### Continuous Query Syntax
|
||||
|
||||
The `INTO` queries run once. Continuous queries will turn `INTO` queries into something that run in the background in the cluster. They're kind of like triggers in SQL.
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 1h_cpu_load
|
||||
ON database_name
|
||||
BEGIN
|
||||
SELECT mean(value) as value, region
|
||||
INTO 1h.cpu_load
|
||||
FROM cpu_load
|
||||
GROUP BY time(1h), region
|
||||
END
|
||||
```
|
||||
|
||||
Or chain them together:
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 10m_event_count
|
||||
ON database_name
|
||||
BEGIN
|
||||
SELECT count(value)
|
||||
INTO 10m.events
|
||||
FROM events
|
||||
GROUP BY time(10m)
|
||||
END
|
||||
|
||||
-- this selects from the output of one continuous query and outputs to another series
|
||||
CREATE CONTINUOUS QUERY 1h_event_count
|
||||
ON database_name
|
||||
BEGIN
|
||||
SELECT sum(count) as count
|
||||
INTO 1h.events
|
||||
FROM events
|
||||
GROUP BY time(1h)
|
||||
END
|
||||
```
|
||||
|
||||
Or multiple aggregations from all series in a measurement. This example assumes you have a retention policy named `1h`.
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 1h_cpu_load
|
||||
ON database_name
|
||||
BEGIN
|
||||
SELECT mean(value), percentile(80, value) as percentile_80, percentile(95, value) as percentile_95
|
||||
INTO 1h.cpu_load
|
||||
FROM cpu_load
|
||||
GROUP BY time(1h), *
|
||||
END
|
||||
```
|
||||
|
||||
The `GROUP BY *` indicates that we want to group by the tagset of the points written in. The same tags will be written to the output series. The multiple aggregates in the `SELECT` clause (percentile, mean) will be written in as fields to the resulting series.
|
||||
|
||||
Showing what continuous queries we have:
|
||||
|
||||
```sql
|
||||
LIST CONTINUOUS QUERIES
|
||||
```
|
||||
|
||||
Dropping continuous queries:
|
||||
|
||||
```sql
|
||||
DROP CONTINUOUS QUERY <name>
|
||||
ON <database>
|
||||
```
|
||||
|
||||
### Security
|
||||
|
||||
To create or drop a continuous query, the user must be an admin.
|
||||
|
||||
### Limitations
|
||||
|
||||
In order to prevent cycles and endless copying of data, the following limitation is enforced on continuous queries at create time:
|
||||
|
||||
*The output of a continuous query must go to either a different measurement or to a different retention policy.*
|
||||
|
||||
In theory they'd still be able to create a cycle with multiple continuous queries. We should check for these and disallow.
|
||||
|
||||
## Proposed Architecture
|
||||
|
||||
Continuous queries should be stored in the metastore cluster wide. That is, they amount to database schema that should be stored in every server in a cluster.
|
||||
|
||||
Continuous queries will have to be handled in a different way for two different use cases: those that simply copy data (CQs without a group by time) and those that aggregate and downsample data (those with a group by time).
|
||||
|
||||
### No group by time
|
||||
|
||||
For CQs that have no `GROUP BY time` clause, they should be evaluated at the data node as part of the write. The single write should create any other writes for the CQ and submit those in the same request to the brokers to ensure that all writes succeed (both the original and the new CQ writes) or none do.
|
||||
|
||||
I imagine the process going something like this:
|
||||
|
||||
1. Convert the data point into its compact form `<series id><time><values>`
|
||||
2. For each CQ on the measurement and retention policy without a group by time:
|
||||
3. Run the data point through a special query engine that will output 0 or 1 data point
|
||||
4. Goto #1 for each newly generated data point
|
||||
5. Write all the data points in a single call to the brokers
|
||||
6. Return success to the user
|
||||
|
||||
Note that for the generated data points, we need to go through and run this process against them since they can feed into different retention policies, measurements, and new tagsets. On #3 I mention that the output will either be a data point or not. That's because of `WHERE` clauses on the query. However, it will never be more than a single data point.
|
||||
|
||||
I mention that we'll need a special query engine for these types of queries. In this case, they never have an aggregate function. Any query with an aggregate function also has a group by time, and these queries by definition don't have that.
|
||||
|
||||
The only thing we have to worry about is which fields are being selected, and what the where clause looks like. We should be able to put the raw data point through a simple transform function that either outputs another raw points or doesn't.
|
||||
|
||||
I think this transform function be something separate from the regular query planner and engine. It can be in `influxQL` but it should be something fairly simply since the only purpose of these types of queries is to either filter some data out and output to a new series or transform into a new series by dropping tags.
|
||||
|
||||
### Has group by time
|
||||
|
||||
CQs that have a `GROUP BY time` (or aggregate CQs) will need to be handled differently.
|
||||
|
||||
One key point on continuous queries with a group by time is that all their writes should always be `overwrite = true`. That is, they should only have a single data point for each timestamp. This distinction means that continuous queries for previous blocks of time can be safely run multiple times without duplicating data (i.e. they're idempotent).
|
||||
|
||||
There are two different ideas I have for how CQs with group by time could be handled. The first is through periodic updates handled by the Raft Leader. The second would be to expand out writes for each CQ and handle them on the data node.
|
||||
|
||||
#### Periodic Updates
|
||||
|
||||
In this approach the management of how CQs run in a cluster will be centrally located on the Raft Leader. It will be responsible for orchestrating which data nodes run CQs and when.
|
||||
|
||||
The naive approach would be to have the leader hand out each CQ for a block of time periodically. The leader could also rerun CQ for periods of time that have recently passed. This would be an easy way to handle the "lagging data" problem, but it's not precise.
|
||||
|
||||
Unfortunately, there's no easy way to tell cluster wide if there were data points written in an already passed window of time for a CQ. We might be able to add this at the data nodes and have them track it, but it would be quite a bit more work.
|
||||
|
||||
The easy way would just be to have CQs re-execute for periods that recently passed and have some user-configurable window of time that they stop checking after. Then we could give the user the ability to recalculate CQs ranges of time if they need to correct for some problem that occurred or the loading of a bunch of historical data.
|
||||
|
||||
With this approach, we'd have the metadata in the database store the last time each CQ was run. Whenever the Raft leader sent out a command to a data node to handle a CQ, the data node would use this metadata to determine which windows of time it should compute.
|
||||
|
||||
This approach is like what exists in 0.8, with the exception that it will automatically catch data that is lagged behind in a small window of time and give the user the ability to force recalculation.
|
||||
|
||||
#### Expanding writes
|
||||
|
||||
When a write comes into a data node, we could have it evaluated against group by CQs in addition to the non-group by ones. It would then create writes that would then go through the brokers. When the CQ writes arrive at the data nodes, they would have to handle each write differently depending on if it was a write to a raw series or if it was a CQ write.
|
||||
|
||||
Let's lay out a concrete example.
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 10m_cpu_by_region
|
||||
ON foo
|
||||
BEGIN
|
||||
SELECT mean(value)
|
||||
INTO cpu_by_region
|
||||
FROM cpu
|
||||
GROUP BY time(10m), region
|
||||
END
|
||||
```
|
||||
|
||||
In this example we write values into `cpu` with the tags `region` and `host`.
|
||||
|
||||
Here's another example CQ:
|
||||
|
||||
```sql
|
||||
CREATE CONTINUOUS QUERY 1h_cpu
|
||||
ON foo
|
||||
BEGIN
|
||||
SELECT mean(value)
|
||||
INTO 1h.cpu
|
||||
FROM raw.cpu
|
||||
GROUP BY time(10m), *
|
||||
END
|
||||
```
|
||||
|
||||
That would output one series into the `1h` retention policy for the `cpu` measurement for every series from the `raw` retention policy and the `cpu` measurement.
|
||||
|
||||
Both of these examples would be handled the same way despite one being a big merge of a bunch of series into one and the other being an aggregation of series in a 1-to-1 mapping.
|
||||
|
||||
Say we're collecting data for two hosts in a single region. Then we'd have two distinct series like this:
|
||||
|
||||
```
|
||||
1 - cpu host=serverA region=uswest
|
||||
2 - cpu host=serverB region=uswest
|
||||
```
|
||||
|
||||
Whenever a write came into a server, we'd look at the continuous queries and see if we needed to create new writes. If we had the two CQ examples above, we'd have to expand a single write into two more writes (one for each CQ).
|
||||
|
||||
The first CQ would have to create a new series:
|
||||
|
||||
```
|
||||
3 - cpu_by_region region=uswest
|
||||
```
|
||||
|
||||
The second CQ would use the same series id as the write, but would send it to another retention policy (and thus shard).
|
||||
|
||||
We'd need to keep track of which series + retention policy combinations were the result of a CQ. When the data nodes get writes replicated downward, they would have to handle them like this:
|
||||
|
||||
1. If write is normal, write through
|
||||
2. If write is CQ write, compute based on existing values, write to DB
|
||||
|
||||
#### Approach tradeoffs
|
||||
|
||||
The first approach of periodically running queries would almost certainly be the easiest to implement quickly. It also has the added advantage of not putting additional load on the brokers by ballooning up the number of writes that go through the system.
|
||||
|
||||
The second approach is appealing because it would be accurate regardless of when writes come in. However, it would take more work and cause the number of writes going through the brokers to be multiplied by the number of continuous queries, which might not scale to where we need.
|
||||
|
||||
Also, if the data nodes write for every single update, the load on the underlying storage engine would go up significantly as well.
|
269
database.go
269
database.go
|
@ -1,6 +1,7 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -12,6 +13,10 @@ import (
|
|||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
const (
|
||||
maxStringLength = 64 * 1024
|
||||
)
|
||||
|
||||
// database is a collection of retention policies and shards. It also has methods
|
||||
// for keeping an in memory index of all the measurements, series, and tags in the database.
|
||||
// Methods on this struct aren't goroutine safe. They assume that the server is handling
|
||||
|
@ -19,7 +24,8 @@ import (
|
|||
type database struct {
|
||||
name string
|
||||
|
||||
policies map[string]*RetentionPolicy // retention policies by name
|
||||
policies map[string]*RetentionPolicy // retention policies by name
|
||||
continuousQueries []*ContinuousQuery // continuous queries
|
||||
|
||||
defaultRetentionPolicy string
|
||||
|
||||
|
@ -32,10 +38,11 @@ type database struct {
|
|||
// newDatabase returns an instance of database.
|
||||
func newDatabase() *database {
|
||||
return &database{
|
||||
policies: make(map[string]*RetentionPolicy),
|
||||
measurements: make(map[string]*Measurement),
|
||||
series: make(map[uint32]*Series),
|
||||
names: make([]string, 0),
|
||||
policies: make(map[string]*RetentionPolicy),
|
||||
continuousQueries: make([]*ContinuousQuery, 0),
|
||||
measurements: make(map[string]*Measurement),
|
||||
series: make(map[uint32]*Series),
|
||||
names: make([]string, 0),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -48,15 +55,6 @@ func (db *database) shardGroupByTimestamp(policy string, timestamp time.Time) (*
|
|||
return p.shardGroupByTimestamp(timestamp), nil
|
||||
}
|
||||
|
||||
// MeasurementNames returns a list of measurement names.
|
||||
func (db *database) MeasurementNames() []string {
|
||||
names := make([]string, 0, len(db.measurements))
|
||||
for k := range db.measurements {
|
||||
names = append(names, k)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// Series takes a series ID and returns a series.
|
||||
func (db *database) Series(id uint32) *Series {
|
||||
return db.series[id]
|
||||
|
@ -71,6 +69,7 @@ func (db *database) MarshalJSON() ([]byte, error) {
|
|||
for _, rp := range db.policies {
|
||||
o.Policies = append(o.Policies, rp)
|
||||
}
|
||||
o.ContinuousQueries = db.continuousQueries
|
||||
return json.Marshal(&o)
|
||||
}
|
||||
|
||||
|
@ -92,6 +91,13 @@ func (db *database) UnmarshalJSON(data []byte) error {
|
|||
db.policies[rp.Name] = rp
|
||||
}
|
||||
|
||||
// we need the parsed continuous queries to be in the in memory index
|
||||
db.continuousQueries = make([]*ContinuousQuery, 0, len(o.ContinuousQueries))
|
||||
for _, cq := range o.ContinuousQueries {
|
||||
c, _ := NewContinuousQuery(cq.Query)
|
||||
db.continuousQueries = append(db.continuousQueries, c)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -100,6 +106,7 @@ type databaseJSON struct {
|
|||
Name string `json:"name,omitempty"`
|
||||
DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"`
|
||||
Policies []*RetentionPolicy `json:"policies,omitempty"`
|
||||
ContinuousQueries []*ContinuousQuery `json:"continuousQueries,omitempty"`
|
||||
}
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
|
@ -219,22 +226,6 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {
|
|||
return m.series[string(marshalTags(tags))]
|
||||
}
|
||||
|
||||
// mapValues converts a map of values with string keys to field id keys.
|
||||
// Returns nil if any field doesn't exist.
|
||||
func (m *Measurement) mapValues(values map[string]interface{}) map[uint8]interface{} {
|
||||
other := make(map[uint8]interface{}, len(values))
|
||||
for k, v := range values {
|
||||
// TODO: Cast value to original field type.
|
||||
|
||||
f := m.FieldByName(k)
|
||||
if f == nil {
|
||||
panic(fmt.Sprintf("Field does not exist for %s", k))
|
||||
}
|
||||
other[f.ID] = v
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (seriesIDs, map[uint32]influxql.Expr) {
|
||||
seriesIdsToExpr := make(map[uint32]influxql.Expr)
|
||||
if stmt.Condition == nil {
|
||||
|
@ -635,6 +626,195 @@ type Field struct {
|
|||
// Fields represents a list of fields.
|
||||
type Fields []*Field
|
||||
|
||||
// FieldCodec providecs encoding and decoding functionality for the fields of a given
|
||||
// Measurement. It is a distinct type to avoid locking writes on this node while
|
||||
// potentially long-running queries are executing.
|
||||
//
|
||||
// It is not affected by changes to the Measurement object after codec creation.
|
||||
type FieldCodec struct {
|
||||
fieldsByID map[uint8]*Field
|
||||
fieldsByName map[string]*Field
|
||||
}
|
||||
|
||||
// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
|
||||
// a RLock that protects the Measurement.
|
||||
func NewFieldCodec(m *Measurement) *FieldCodec {
|
||||
fieldsByID := make(map[uint8]*Field, len(m.Fields))
|
||||
fieldsByName := make(map[string]*Field, len(m.Fields))
|
||||
for _, f := range m.Fields {
|
||||
fieldsByID[f.ID] = f
|
||||
fieldsByName[f.Name] = f
|
||||
}
|
||||
return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName}
|
||||
}
|
||||
|
||||
// EncodeFields converts a map of values with string keys to a byte slice of field
|
||||
// IDs and values.
|
||||
//
|
||||
// If a field exists in the codec, but its type is different, an error is returned. If
|
||||
// a field is not present in the codec, the system panics.
|
||||
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) {
|
||||
// Allocate byte slice and write field count.
|
||||
b := make([]byte, 1, 10)
|
||||
b[0] = byte(len(values))
|
||||
|
||||
for k, v := range values {
|
||||
field := f.fieldsByName[k]
|
||||
if field == nil {
|
||||
panic(fmt.Sprintf("field does not exist for %s", k))
|
||||
} else if influxql.InspectDataType(v) != field.Type {
|
||||
return nil, fmt.Errorf("field %s is not of type %s", k, field.Type)
|
||||
}
|
||||
|
||||
var buf []byte
|
||||
|
||||
switch field.Type {
|
||||
case influxql.Number:
|
||||
var value float64
|
||||
// Convert integers to floats.
|
||||
if intval, ok := v.(int); ok {
|
||||
value = float64(intval)
|
||||
} else {
|
||||
value = v.(float64)
|
||||
}
|
||||
|
||||
buf = make([]byte, 9)
|
||||
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value))
|
||||
case influxql.Boolean:
|
||||
value := v.(bool)
|
||||
|
||||
// Only 1 byte need for a boolean.
|
||||
buf = make([]byte, 2)
|
||||
if value {
|
||||
buf[1] = byte(1)
|
||||
}
|
||||
case influxql.String:
|
||||
value := v.(string)
|
||||
if len(value) > maxStringLength {
|
||||
value = value[:maxStringLength]
|
||||
}
|
||||
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
|
||||
buf = make([]byte, len(value)+3)
|
||||
|
||||
// Set the string length, then copy the string itself.
|
||||
binary.BigEndian.PutUint16(buf[1:3], uint16(len(value)))
|
||||
for i, c := range []byte(value) {
|
||||
buf[i+3] = byte(c)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported value type: %T", v))
|
||||
}
|
||||
|
||||
// Always set the field ID as the leading byte.
|
||||
buf[0] = field.ID
|
||||
|
||||
// Append temp buffer to the end.
|
||||
b = append(b, buf...)
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// DecodeByID scans a byte slice for a field with the given ID, converts it to its
|
||||
// expected type, and return that value.
|
||||
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
|
||||
if len(b) == 0 {
|
||||
return 0, ErrFieldNotFound
|
||||
}
|
||||
|
||||
// Read the field count from the field byte.
|
||||
n := int(b[0])
|
||||
// Start from the second byte and iterate over until we're done decoding.
|
||||
b = b[1:]
|
||||
for i := 0; i < n; i++ {
|
||||
field, ok := f.fieldsByID[b[0]]
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("field ID %d has no mapping", b[0]))
|
||||
}
|
||||
|
||||
var value interface{}
|
||||
switch field.Type {
|
||||
case influxql.Number:
|
||||
// Move bytes forward.
|
||||
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
|
||||
b = b[9:]
|
||||
case influxql.Boolean:
|
||||
if b[1] == 1 {
|
||||
value = true
|
||||
} else {
|
||||
value = false
|
||||
}
|
||||
// Move bytes forward.
|
||||
b = b[2:]
|
||||
case influxql.String:
|
||||
size := binary.BigEndian.Uint16(b[1:3])
|
||||
value = string(b[3 : 3+size])
|
||||
// Move bytes forward.
|
||||
b = b[size+3:]
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported value type: %T", field.Type))
|
||||
}
|
||||
|
||||
if field.ID == targetID {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return 0, ErrFieldNotFound
|
||||
}
|
||||
|
||||
// DecodeFields decodes a byte slice into a set of field ids and values.
|
||||
func (f *FieldCodec) DecodeFields(b []byte) map[uint8]interface{} {
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read the field count from the field byte.
|
||||
n := int(b[0])
|
||||
|
||||
// Create a map to hold the decoded data.
|
||||
values := make(map[uint8]interface{}, n)
|
||||
|
||||
// Start from the second byte and iterate over until we're done decoding.
|
||||
b = b[1:]
|
||||
for i := 0; i < n; i++ {
|
||||
// First byte is the field identifier.
|
||||
fieldID := b[0]
|
||||
field := f.fieldsByID[fieldID]
|
||||
if field == nil {
|
||||
panic(fmt.Sprintf("field ID %d has no mapping", fieldID))
|
||||
}
|
||||
|
||||
var value interface{}
|
||||
switch field.Type {
|
||||
case influxql.Number:
|
||||
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
|
||||
// Move bytes forward.
|
||||
b = b[9:]
|
||||
case influxql.Boolean:
|
||||
if b[1] == 1 {
|
||||
value = true
|
||||
} else {
|
||||
value = false
|
||||
}
|
||||
// Move bytes forward.
|
||||
b = b[2:]
|
||||
case influxql.String:
|
||||
size := binary.BigEndian.Uint16(b[1:3])
|
||||
value = string(b[3:size])
|
||||
// Move bytes forward.
|
||||
b = b[size+3:]
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID]))
|
||||
}
|
||||
|
||||
values[fieldID] = value
|
||||
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// Series belong to a Measurement and represent unique time series in a database
|
||||
type Series struct {
|
||||
ID uint32
|
||||
|
@ -1016,6 +1196,35 @@ func (db *database) MeasurementAndSeries(name string, tags map[string]string) (*
|
|||
return idx, idx.seriesByTags(tags)
|
||||
}
|
||||
|
||||
// SeriesByID returns the Series that has the given id.
|
||||
func (d *database) SeriesByID(id uint32) *Series {
|
||||
return d.series[id]
|
||||
}
|
||||
|
||||
// Names returns all measurement names in sorted order.
|
||||
func (d *database) MeasurementNames() []string {
|
||||
return d.names
|
||||
}
|
||||
|
||||
// DropSeries will clear the index of all references to a series.
|
||||
func (d *database) DropSeries(id uint32) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// DropMeasurement will clear the index of all references to a measurement and its child series.
|
||||
func (d *database) DropMeasurement(name string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (d *database) continuousQueryByName(name string) *ContinuousQuery {
|
||||
for _, cq := range d.continuousQueries {
|
||||
if cq.cq.Name == name {
|
||||
return cq
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// used to convert the tag set to bytes for use as a lookup key
|
||||
func marshalTags(tags map[string]string) []byte {
|
||||
s := make([]string, 0, len(tags))
|
||||
|
|
|
@ -94,6 +94,10 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
|
|||
"ping-head",
|
||||
"HEAD", "/ping", h.servePing, true,
|
||||
},
|
||||
route{ // Tell data node to run CQs that should be run
|
||||
"process_continuous_queries",
|
||||
"POST", "/process_continuous_queries", h.serveProcessContinuousQueries, false,
|
||||
},
|
||||
)
|
||||
|
||||
for _, r := range h.routes {
|
||||
|
@ -290,7 +294,7 @@ func (h *Handler) serveCreateDataNode(w http.ResponseWriter, r *http.Request) {
|
|||
node := h.server.DataNodeByURL(u)
|
||||
|
||||
// Create a new replica on the broker.
|
||||
if err := h.server.Client().CreateReplica(node.ID); err != nil {
|
||||
if err := h.server.Client().CreateReplica(node.ID, node.URL); err != nil {
|
||||
httpError(w, err.Error(), false, http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
@ -322,6 +326,16 @@ func (h *Handler) serveDeleteDataNode(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// serveProcessContinuousQueries will execute any continuous queries that should be run
|
||||
func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, u *influxdb.User) {
|
||||
if err := h.server.RunContinuousQueries(); err != nil {
|
||||
httpError(w, err.Error(), false, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}
|
||||
|
||||
type dataNodeJSON struct {
|
||||
ID uint64 `json:"id"`
|
||||
URL string `json:"url"`
|
||||
|
|
|
@ -1040,6 +1040,36 @@ func TestHandler_RevokeDBPrivilege(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandler_ShowContinuousQueries(t *testing.T) {
|
||||
srvr := OpenAuthlessServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||
srvr.SetDefaultRetentionPolicy("foo", "bar")
|
||||
|
||||
// create and check
|
||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing query %s", err.Error())
|
||||
}
|
||||
cq := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if err := srvr.CreateContinuousQuery(cq); err != nil {
|
||||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
|
||||
s := NewHTTPServer(srvr)
|
||||
defer s.Close()
|
||||
|
||||
query := map[string]string{"q": "SHOW CONTINUOUS QUERIES"}
|
||||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
} else if body != `{"results":[{"rows":[{"name":"foo","columns":["name","query"],"values":[["myquery","CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"]]}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestHandler_serveWriteSeries(t *testing.T) {
|
||||
srvr := OpenAuthenticatedServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
|
@ -1215,7 +1245,89 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandler_serveWriteSeriesInvalidField(t *testing.T) {
|
||||
func TestHandler_serveWriteSeriesStringValues(t *testing.T) {
|
||||
srvr := OpenAuthlessServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||
srvr.SetDefaultRetentionPolicy("foo", "bar")
|
||||
|
||||
s := NewHTTPServer(srvr)
|
||||
defer s.Close()
|
||||
|
||||
status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "logs", "tags": {"host": "server01"},"values": {"event": "disk full"}}]}`)
|
||||
if status != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.
|
||||
|
||||
query := map[string]string{"db": "foo", "q": "select event from logs"}
|
||||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusOK {
|
||||
t.Logf("query %s\n", query)
|
||||
t.Log(body)
|
||||
t.Errorf("unexpected status: %d", status)
|
||||
}
|
||||
|
||||
r := &influxdb.Results{}
|
||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||
t.Logf("query : %s\n", query)
|
||||
t.Log(body)
|
||||
t.Error(err)
|
||||
}
|
||||
if len(r.Results) != 1 {
|
||||
t.Fatalf("unexpected results count")
|
||||
}
|
||||
result := r.Results[0]
|
||||
if len(result.Rows) != 1 {
|
||||
t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
|
||||
}
|
||||
if result.Rows[0].Values[0][1] != "disk full" {
|
||||
t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_serveWriteSeriesBoolValues(t *testing.T) {
|
||||
srvr := OpenAuthlessServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||
srvr.SetDefaultRetentionPolicy("foo", "bar")
|
||||
|
||||
s := NewHTTPServer(srvr)
|
||||
defer s.Close()
|
||||
|
||||
status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "disk", "tags": {"host": "server01"},"values": {"full": false}}]}`)
|
||||
if status != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", status)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.
|
||||
|
||||
query := map[string]string{"db": "foo", "q": "select full from disk"}
|
||||
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
|
||||
if status != http.StatusOK {
|
||||
t.Logf("query %s\n", query)
|
||||
t.Log(body)
|
||||
t.Errorf("unexpected status: %d", status)
|
||||
}
|
||||
|
||||
r := &influxdb.Results{}
|
||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||
t.Logf("query : %s\n", query)
|
||||
t.Log(body)
|
||||
t.Error(err)
|
||||
}
|
||||
if len(r.Results) != 1 {
|
||||
t.Fatalf("unexpected results count")
|
||||
}
|
||||
result := r.Results[0]
|
||||
if len(result.Rows) != 1 {
|
||||
t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
|
||||
}
|
||||
if result.Rows[0].Values[0][1] != false {
|
||||
t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandler_serveWriteSeriesInvalidQueryField(t *testing.T) {
|
||||
srvr := OpenAuthlessServer(NewMessagingClient())
|
||||
srvr.CreateDatabase("foo")
|
||||
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
|
||||
|
@ -2010,7 +2122,7 @@ type MessagingClient struct {
|
|||
mu sync.Mutex // Ensure all publishing is serialized.
|
||||
|
||||
PublishFunc func(*messaging.Message) (uint64, error)
|
||||
CreateReplicaFunc func(replicaID uint64) error
|
||||
CreateReplicaFunc func(replicaID uint64, connectURL *url.URL) error
|
||||
DeleteReplicaFunc func(replicaID uint64) error
|
||||
SubscribeFunc func(replicaID, topicID uint64) error
|
||||
UnsubscribeFunc func(replicaID, topicID uint64) error
|
||||
|
@ -2020,7 +2132,7 @@ type MessagingClient struct {
|
|||
func NewMessagingClient() *MessagingClient {
|
||||
c := &MessagingClient{c: make(chan *messaging.Message, 1)}
|
||||
c.PublishFunc = c.send
|
||||
c.CreateReplicaFunc = func(replicaID uint64) error { return nil }
|
||||
c.CreateReplicaFunc = func(replicaID uint64, connectURL *url.URL) error { return nil }
|
||||
c.DeleteReplicaFunc = func(replicaID uint64) error { return nil }
|
||||
c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil }
|
||||
c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil }
|
||||
|
@ -2045,8 +2157,8 @@ func (c *MessagingClient) send(m *messaging.Message) (uint64, error) {
|
|||
}
|
||||
|
||||
// Creates a new replica with a given ID on the broker.
|
||||
func (c *MessagingClient) CreateReplica(replicaID uint64) error {
|
||||
return c.CreateReplicaFunc(replicaID)
|
||||
func (c *MessagingClient) CreateReplica(replicaID uint64, connectURL *url.URL) error {
|
||||
return c.CreateReplicaFunc(replicaID, connectURL)
|
||||
}
|
||||
|
||||
// Deletes an existing replica with a given ID from the broker.
|
||||
|
|
|
@ -106,6 +106,9 @@ var (
|
|||
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
|
||||
ErrFieldTypeConflict = errors.New("field type conflict")
|
||||
|
||||
// ErrFieldNotFound
|
||||
ErrFieldNotFound = errors.New("field not found")
|
||||
|
||||
// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
|
||||
ErrSeriesNotFound = errors.New("series not found")
|
||||
|
||||
|
@ -119,6 +122,9 @@ var (
|
|||
// ErrInvalidGrantRevoke is returned when a statement requests an invalid
|
||||
// privilege for a user on the cluster or a database.
|
||||
ErrInvalidGrantRevoke = errors.New("invalid privilege requested")
|
||||
|
||||
// ErrContinuousQueryExists is returned when creating a duplicate continuous query.
|
||||
ErrContinuousQueryExists = errors.New("continuous query already exists")
|
||||
)
|
||||
|
||||
// BatchPoints is used to send batched data in a single write.
|
||||
|
|
|
@ -548,6 +548,9 @@ type SelectStatement struct {
|
|||
|
||||
// Returns rows starting at an offset from the first row.
|
||||
Offset int
|
||||
|
||||
// memoize the group by interval
|
||||
groupByInterval time.Duration
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of the statement.
|
||||
|
@ -682,6 +685,76 @@ func (s *SelectStatement) walkForTime(node Node) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// GroupByIterval extracts the time interval, if specified.
|
||||
func (s *SelectStatement) GroupByInterval() (time.Duration, error) {
|
||||
// return if we've already pulled it out
|
||||
if s.groupByInterval != 0 {
|
||||
return s.groupByInterval, nil
|
||||
}
|
||||
|
||||
// Ignore if there are no dimensions.
|
||||
if len(s.Dimensions) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
for _, d := range s.Dimensions {
|
||||
if call, ok := d.Expr.(*Call); ok && strings.ToLower(call.Name) == "time" {
|
||||
// Make sure there is exactly one argument.
|
||||
if len(call.Args) != 1 {
|
||||
return 0, errors.New("time dimension expected one argument")
|
||||
}
|
||||
|
||||
// Ensure the argument is a duration.
|
||||
lit, ok := call.Args[0].(*DurationLiteral)
|
||||
if !ok {
|
||||
return 0, errors.New("time dimension must have one duration argument")
|
||||
}
|
||||
s.groupByInterval = lit.Val
|
||||
return lit.Val, nil
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// SetTimeRange sets the start and end time of the select statement to [start, end). i.e. start inclusive, end exclusive.
|
||||
// This is used commonly for continuous queries so the start and end are in buckets.
|
||||
func (s *SelectStatement) SetTimeRange(start, end time.Time) error {
|
||||
cond := fmt.Sprintf("time >= '%s' AND time < '%s'", start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano))
|
||||
if s.Condition != nil {
|
||||
cond = fmt.Sprintf("%s AND %s", s.rewriteWithoutTimeDimensions(), cond)
|
||||
}
|
||||
|
||||
expr, err := NewParser(strings.NewReader(cond)).ParseExpr()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fold out any previously replaced time dimensios and set the condition
|
||||
s.Condition = Reduce(expr, nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rewriteWithoutTimeDimensions will remove any WHERE time... clauses from the select statement
|
||||
// This is necessary when setting an explicit time range to override any that previously existed.
|
||||
func (s *SelectStatement) rewriteWithoutTimeDimensions() string {
|
||||
n := RewriteFunc(s.Condition, func(n Node) Node {
|
||||
switch n := n.(type) {
|
||||
case *BinaryExpr:
|
||||
if n.LHS.String() == "time" {
|
||||
return &BooleanLiteral{Val: true}
|
||||
}
|
||||
return n
|
||||
case *Call:
|
||||
return &BooleanLiteral{Val: true}
|
||||
default:
|
||||
return n
|
||||
}
|
||||
})
|
||||
|
||||
return n.String()
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
BinaryExpr
|
||||
|
@ -809,6 +882,7 @@ func MatchSource(src Source, name string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// TODO pauldix: Target should actually have a Database, RetentionPolicy, and Measurement. These should be set based on the ON part of the query, and the SplitIdent of the INTO name
|
||||
// Target represents a target (destination) policy, measurment, and DB.
|
||||
type Target struct {
|
||||
// Measurement to write into.
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package influxql_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
@ -94,6 +96,125 @@ func TestSelectStatement_Substatement(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the SELECT statement can extract GROUP BY interval.
|
||||
func TestSelectStatement_GroupByInterval(t *testing.T) {
|
||||
q := "SELECT sum(value) from foo GROUP BY time(10m)"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
d, err := s.GroupByInterval()
|
||||
if d != 10*time.Minute {
|
||||
t.Fatalf("group by interval not equal:\nexp=%s\ngot=%s", 10*time.Minute, d)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing group by interval: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the SELECT statment can have its start and end time set
|
||||
func TestSelectStatement_SetTimeRange(t *testing.T) {
|
||||
q := "SELECT sum(value) from foo GROUP BY time(10m)"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s := stmt.(*influxql.SelectStatement)
|
||||
min, max := influxql.TimeRange(s.Condition)
|
||||
start := time.Now().Add(-20 * time.Hour).Round(time.Second).UTC()
|
||||
end := time.Now().Add(10 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure we can set a time on a select that already has one set
|
||||
start = time.Now().Add(-20 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(10 * time.Hour).Round(time.Second).UTC()
|
||||
q = fmt.Sprintf("SELECT sum(value) from foo WHERE time >= %ds and time <= %ds GROUP BY time(10m)", start.Unix(), end.Unix())
|
||||
stmt, err = influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s = stmt.(*influxql.SelectStatement)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
if start != min || end != max {
|
||||
t.Fatalf("start and end times weren't equal:\n exp: %s\n got: %s\n exp: %s\n got:%s\n", start, min, end, max)
|
||||
}
|
||||
|
||||
// update and ensure it saves it
|
||||
start = time.Now().Add(-40 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(20 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
// TODO: right now the SetTimeRange can't override the start time if it's more recent than what they're trying to set it to.
|
||||
// shouldn't matter for our purposes with continuous queries, but fix this later
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure that when we set a time range other where clause conditions are still there
|
||||
q = "SELECT sum(value) from foo WHERE foo = 'bar' GROUP BY time(10m)"
|
||||
stmt, err = influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("invalid statement: %q: %s", stmt, err)
|
||||
}
|
||||
|
||||
s = stmt.(*influxql.SelectStatement)
|
||||
|
||||
// update and ensure it saves it
|
||||
start = time.Now().Add(-40 * time.Hour).Round(time.Second).UTC()
|
||||
end = time.Now().Add(20 * time.Hour).Round(time.Second).UTC()
|
||||
s.SetTimeRange(start, end)
|
||||
min, max = influxql.TimeRange(s.Condition)
|
||||
|
||||
if min != start {
|
||||
t.Fatalf("start time wasn't set properly.\n exp: %s\n got: %s", start, min)
|
||||
}
|
||||
// the end range is actually one microsecond before the given one since end is exclusive
|
||||
end = end.Add(-time.Microsecond)
|
||||
if max != end {
|
||||
t.Fatalf("end time wasn't set properly.\n exp: %s\n got: %s", end, max)
|
||||
}
|
||||
|
||||
// ensure the where clause is there
|
||||
hasWhere := false
|
||||
influxql.WalkFunc(s.Condition, func(n influxql.Node) {
|
||||
if ex, ok := n.(*influxql.BinaryExpr); ok {
|
||||
if lhs, ok := ex.LHS.(*influxql.VarRef); ok {
|
||||
if lhs.Val == "foo" {
|
||||
if rhs, ok := ex.RHS.(*influxql.StringLiteral); ok {
|
||||
if rhs.Val == "bar" {
|
||||
hasWhere = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
if !hasWhere {
|
||||
t.Fatal("set time range cleared out the where clause")
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the time range of an expression can be extracted.
|
||||
func TestTimeRange(t *testing.T) {
|
||||
for i, tt := range []struct {
|
||||
|
@ -157,19 +278,19 @@ func TestSelectStatement_OnlyTimeDimensions(t *testing.T) {
|
|||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05' AND time < '2000-01-01T00:00:05'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z'`,
|
||||
exp: true,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05' AND asdf = 'bar'`,
|
||||
stmt: `SELECT value FROM foo WHERE time >= '2000-01-01T00:00:05Z' AND asdf = 'bar'`,
|
||||
exp: false,
|
||||
},
|
||||
{
|
||||
stmt: `SELECT value FROM foo WHERE asdf = 'jkl' AND (time >= '2000-01-01T00:00:05' AND time < '2000-01-01T00:00:05')`,
|
||||
stmt: `SELECT value FROM foo WHERE asdf = 'jkl' AND (time >= '2000-01-01T00:00:05Z' AND time < '2000-01-01T00:00:05Z')`,
|
||||
exp: false,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ type Iterator interface {
|
|||
Tags() string
|
||||
|
||||
// Next returns the next value from the iterator.
|
||||
Next() (key int64, value interface{})
|
||||
Next() (key int64, data []byte, value interface{})
|
||||
}
|
||||
|
||||
// Planner represents an object for creating execution plans.
|
||||
|
@ -319,7 +319,7 @@ loop:
|
|||
for _, row := range rows {
|
||||
for _, values := range row.Values {
|
||||
t := time.Unix(0, values[0].(int64))
|
||||
values[0] = t.UTC().Format(time.RFC3339Nano)
|
||||
values[0] = t.UTC()
|
||||
}
|
||||
a = append(a, row)
|
||||
}
|
||||
|
@ -409,7 +409,7 @@ func (m *Mapper) run(e *Emitter) {
|
|||
var tmin int64
|
||||
if m.interval > 0 {
|
||||
// Align start time to interval.
|
||||
tmin, _ = bufItr.Peek()
|
||||
tmin, _, _ = bufItr.Peek()
|
||||
tmin -= (tmin % m.interval)
|
||||
}
|
||||
|
||||
|
@ -439,6 +439,7 @@ type bufIterator struct {
|
|||
|
||||
buf struct {
|
||||
key int64
|
||||
data []byte
|
||||
value interface{}
|
||||
}
|
||||
buffered bool
|
||||
|
@ -448,27 +449,27 @@ type bufIterator struct {
|
|||
func (i *bufIterator) Tags() string { return i.itr.Tags() }
|
||||
|
||||
// Next returns the next key/value pair from the iterator.
|
||||
func (i *bufIterator) Next() (key int64, value interface{}) {
|
||||
func (i *bufIterator) Next() (key int64, data []byte, value interface{}) {
|
||||
// Read the key/value pair off the buffer or underlying iterator.
|
||||
if i.buffered {
|
||||
i.buffered = false
|
||||
} else {
|
||||
i.buf.key, i.buf.value = i.itr.Next()
|
||||
i.buf.key, i.buf.data, i.buf.value = i.itr.Next()
|
||||
}
|
||||
key, value = i.buf.key, i.buf.value
|
||||
key, data, value = i.buf.key, i.buf.data, i.buf.value
|
||||
|
||||
// If key is greater than tmax then put it back on the buffer.
|
||||
if i.tmax != 0 && key > i.tmax {
|
||||
i.buffered = true
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
return key, value
|
||||
return key, data, value
|
||||
}
|
||||
|
||||
// Peek returns the next key/value pair but does not move the iterator forward.
|
||||
func (i *bufIterator) Peek() (key int64, value interface{}) {
|
||||
key, value = i.Next()
|
||||
func (i *bufIterator) Peek() (key int64, data []byte, value interface{}) {
|
||||
key, data, value = i.Next()
|
||||
i.buffered = true
|
||||
return
|
||||
}
|
||||
|
@ -482,7 +483,7 @@ type MapFunc func(Iterator, *Emitter, int64)
|
|||
// MapCount computes the number of values in an iterator.
|
||||
func MapCount(itr Iterator, e *Emitter, tmin int64) {
|
||||
n := 0
|
||||
for k, _ := itr.Next(); k != 0; k, _ = itr.Next() {
|
||||
for k, _, _ := itr.Next(); k != 0; k, _, _ = itr.Next() {
|
||||
n++
|
||||
}
|
||||
e.Emit(Key{tmin, itr.Tags()}, float64(n))
|
||||
|
@ -491,7 +492,7 @@ func MapCount(itr Iterator, e *Emitter, tmin int64) {
|
|||
// MapSum computes the summation of values in an iterator.
|
||||
func MapSum(itr Iterator, e *Emitter, tmin int64) {
|
||||
n := float64(0)
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
n += v.(float64)
|
||||
}
|
||||
e.Emit(Key{tmin, itr.Tags()}, n)
|
||||
|
@ -655,7 +656,7 @@ func ReduceSum(key Key, values []interface{}, e *Emitter) {
|
|||
func MapMean(itr Iterator, e *Emitter, tmin int64) {
|
||||
out := &meanMapOutput{}
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
out.Count++
|
||||
out.Sum += v.(float64)
|
||||
}
|
||||
|
@ -683,7 +684,7 @@ func MapMin(itr Iterator, e *Emitter, tmin int64) {
|
|||
var min float64
|
||||
pointsYielded := false
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
val := v.(float64)
|
||||
// Initialize min
|
||||
if !pointsYielded {
|
||||
|
@ -722,7 +723,7 @@ func MapMax(itr Iterator, e *Emitter, tmax int64) {
|
|||
var max float64
|
||||
pointsYielded := false
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
val := v.(float64)
|
||||
// Initialize max
|
||||
if !pointsYielded {
|
||||
|
@ -764,7 +765,7 @@ func MapSpread(itr Iterator, e *Emitter, tmax int64) {
|
|||
var out spreadMapOutput
|
||||
pointsYielded := false
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
val := v.(float64)
|
||||
// Initialize
|
||||
if !pointsYielded {
|
||||
|
@ -805,7 +806,7 @@ func ReduceSpread(key Key, values []interface{}, e *Emitter) {
|
|||
func MapStddev(itr Iterator, e *Emitter, tmax int64) {
|
||||
var values []float64
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
values = append(values, v.(float64))
|
||||
// Emit in batches.
|
||||
// unbounded emission of data can lead to excessive memory use
|
||||
|
@ -866,7 +867,7 @@ func MapFirst(itr Iterator, e *Emitter, tmax int64) {
|
|||
out := firstLastMapOutput{}
|
||||
pointsYielded := false
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
// Initialize first
|
||||
if !pointsYielded {
|
||||
out.Time = k
|
||||
|
@ -911,7 +912,7 @@ func MapLast(itr Iterator, e *Emitter, tmax int64) {
|
|||
out := firstLastMapOutput{}
|
||||
pointsYielded := false
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
// Initialize last
|
||||
if !pointsYielded {
|
||||
out.Time = k
|
||||
|
@ -955,7 +956,7 @@ func ReduceLast(key Key, values []interface{}, e *Emitter) {
|
|||
func MapEcho(itr Iterator, e *Emitter, tmin int64) {
|
||||
var values []interface{}
|
||||
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
values = append(values, v)
|
||||
}
|
||||
e.Emit(Key{tmin, itr.Tags()}, values)
|
||||
|
@ -986,7 +987,7 @@ func ReducePercentile(percentile float64) ReduceFunc {
|
|||
}
|
||||
|
||||
func MapRawQuery(itr Iterator, e *Emitter, tmin int64) {
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
e.Emit(Key{k, itr.Tags()}, v)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -765,16 +765,16 @@ func NewIterator(tags []string, points []Point) *Iterator {
|
|||
func (i *Iterator) Tags() string { return i.tags }
|
||||
|
||||
// Next returns the next point's timestamp and field value.
|
||||
func (i *Iterator) Next() (key int64, value interface{}) {
|
||||
func (i *Iterator) Next() (key int64, data []byte, value interface{}) {
|
||||
// If index is beyond points range then return nil.
|
||||
if i.index > len(i.points)-1 {
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
// Retrieve point and extract value.
|
||||
p := i.points[i.index]
|
||||
i.index++
|
||||
return p.Time(), p.Value
|
||||
return p.Time(), nil, p.Value
|
||||
}
|
||||
|
||||
// Point represents a single value at a given time.
|
||||
|
|
|
@ -935,6 +935,22 @@ func (p *Parser) parseCreateContinuousQueryStatement() (*CreateContinuousQuerySt
|
|||
}
|
||||
stmt.Source = source
|
||||
|
||||
// validate that the statement has a non-zero group by interval if it is aggregated
|
||||
if source.Aggregated() {
|
||||
d, err := source.GroupByInterval()
|
||||
if d == 0 || err != nil {
|
||||
// rewind so we can output an error with some info
|
||||
p.unscan() // unscan the whitespace
|
||||
p.unscan() // unscan the last token
|
||||
tok, pos, lit := p.scanIgnoreWhitespace()
|
||||
expected := []string{"GROUP BY time(...)"}
|
||||
if err != nil {
|
||||
expected = append(expected, err.Error())
|
||||
}
|
||||
return nil, newParseError(tokstr(tok, lit), expected, pos)
|
||||
}
|
||||
}
|
||||
|
||||
// Expect a "END" keyword.
|
||||
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != END {
|
||||
return nil, newParseError(tokstr(tok, lit), []string{"END"}, pos)
|
||||
|
@ -1465,7 +1481,12 @@ func (p *Parser) parseUnaryExpr() (Expr, error) {
|
|||
if isDateTimeString(lit) {
|
||||
t, err := time.Parse(DateTimeFormat, lit)
|
||||
if err != nil {
|
||||
return nil, &ParseError{Message: "unable to parse datetime", Pos: pos}
|
||||
// try to parse it as an RFCNano time
|
||||
t, err := time.Parse(time.RFC3339Nano, lit)
|
||||
if err != nil {
|
||||
return nil, &ParseError{Message: "unable to parse datetime", Pos: pos}
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
}
|
||||
return &TimeLiteral{Val: t}, nil
|
||||
} else if isDateString(lit) {
|
||||
|
@ -1672,7 +1693,7 @@ func isDateString(s string) bool { return dateStringRegexp.MatchString(s) }
|
|||
func isDateTimeString(s string) bool { return dateTimeStringRegexp.MatchString(s) }
|
||||
|
||||
var dateStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`)
|
||||
var dateTimeStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?$`)
|
||||
var dateTimeStringRegexp = regexp.MustCompile(`^\d{4}-\d{2}-\d{2}.+`)
|
||||
|
||||
// ErrInvalidDuration is returned when parsing a malformatted duration.
|
||||
var ErrInvalidDuration = errors.New("invalid duration")
|
||||
|
|
|
@ -331,7 +331,7 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
|
||||
// CREATE CONTINUOUS QUERY ... INTO <measurement>
|
||||
{
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO measure1 FROM myseries END`,
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(5m) END`,
|
||||
stmt: &influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "testdb",
|
||||
|
@ -339,13 +339,23 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "count"}}},
|
||||
Target: &influxql.Target{Measurement: "measure1"},
|
||||
Source: &influxql.Measurement{Name: "myseries"},
|
||||
Dimensions: []*influxql.Dimension{
|
||||
&influxql.Dimension{
|
||||
Expr: &influxql.Call{
|
||||
Name: "time",
|
||||
Args: []influxql.Expr{
|
||||
&influxql.DurationLiteral{Val: 5 * time.Minute},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
// CREATE CONTINUOUS QUERY ... INTO <retention-policy>.<measurement>
|
||||
{
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO "1h.policy1"."cpu.load" FROM myseries END`,
|
||||
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT count() INTO "1h.policy1"."cpu.load" FROM myseries GROUP BY time(5m) END`,
|
||||
stmt: &influxql.CreateContinuousQueryStatement{
|
||||
Name: "myquery",
|
||||
Database: "testdb",
|
||||
|
@ -355,6 +365,16 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
Measurement: `"1h.policy1"."cpu.load"`,
|
||||
},
|
||||
Source: &influxql.Measurement{Name: "myseries"},
|
||||
Dimensions: []*influxql.Dimension{
|
||||
&influxql.Dimension{
|
||||
Expr: &influxql.Call{
|
||||
Name: "time",
|
||||
Args: []influxql.Expr{
|
||||
&influxql.DurationLiteral{Val: 5 * time.Minute},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -642,6 +662,12 @@ func TestParser_ParseStatement(t *testing.T) {
|
|||
|
||||
for i, tt := range tests {
|
||||
stmt, err := influxql.NewParser(strings.NewReader(tt.s)).ParseStatement()
|
||||
|
||||
// if it's a CQ, there is a non-exported field that gets memoized during parsing that needs to be set
|
||||
if _, ok := stmt.(*influxql.CreateContinuousQueryStatement); ok {
|
||||
tt.stmt.(*influxql.CreateContinuousQueryStatement).Source.GroupByInterval()
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tt.err, errstring(err)) {
|
||||
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
|
||||
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
|
||||
|
|
|
@ -172,7 +172,7 @@ func (b *Broker) load() error {
|
|||
// Update the replicas.
|
||||
for _, sr := range hdr.Replicas {
|
||||
// Create replica.
|
||||
r := newReplica(b, sr.ID)
|
||||
r := newReplica(b, sr.ID, sr.URL)
|
||||
b.replicas[r.id] = r
|
||||
|
||||
// Append replica's topics.
|
||||
|
@ -249,7 +249,7 @@ func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) {
|
|||
|
||||
// Append replicas and the current index for each topic.
|
||||
for _, r := range b.replicas {
|
||||
sr := &snapshotReplica{ID: r.id}
|
||||
sr := &snapshotReplica{ID: r.id, URL: r.URL.String()}
|
||||
|
||||
for topicID, index := range r.topics {
|
||||
sr.Topics = append(sr.Topics, &snapshotReplicaTopic{
|
||||
|
@ -275,6 +275,9 @@ func (b *Broker) LeaderURL() *url.URL {
|
|||
return u
|
||||
}
|
||||
|
||||
// IsLeader returns true if the broker is the current leader.
|
||||
func (b *Broker) IsLeader() bool { return b.log.State() == raft.Leader }
|
||||
|
||||
// Initialize creates a new cluster.
|
||||
func (b *Broker) Initialize() error {
|
||||
if err := b.log.Initialize(); err != nil {
|
||||
|
@ -326,6 +329,18 @@ func (b *Broker) Replica(id uint64) *Replica {
|
|||
return b.replicas[id]
|
||||
}
|
||||
|
||||
// Replicas returns a list of the replicas in the system
|
||||
func (b *Broker) Replicas() []*Replica {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
a := make([]*Replica, 0, len(b.replicas))
|
||||
for _, r := range b.replicas {
|
||||
a = append(a, r)
|
||||
}
|
||||
sort.Sort(replicas(a))
|
||||
return a
|
||||
}
|
||||
|
||||
// initializes a new topic object.
|
||||
func (b *Broker) createTopic(id uint64) *topic {
|
||||
t := &topic{
|
||||
|
@ -348,7 +363,7 @@ func (b *Broker) createTopicIfNotExists(id uint64) *topic {
|
|||
}
|
||||
|
||||
// CreateReplica creates a new named replica.
|
||||
func (b *Broker) CreateReplica(id uint64) error {
|
||||
func (b *Broker) CreateReplica(id uint64, connectURL *url.URL) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
|
@ -361,7 +376,7 @@ func (b *Broker) CreateReplica(id uint64) error {
|
|||
// Add command to create replica.
|
||||
return b.PublishSync(&Message{
|
||||
Type: CreateReplicaMessageType,
|
||||
Data: mustMarshalJSON(&CreateReplicaCommand{ID: id}),
|
||||
Data: mustMarshalJSON(&CreateReplicaCommand{ID: id, URL: connectURL.String()}),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -370,7 +385,7 @@ func (b *Broker) mustApplyCreateReplica(m *Message) {
|
|||
mustUnmarshalJSON(m.Data, &c)
|
||||
|
||||
// Create replica.
|
||||
r := newReplica(b, c.ID)
|
||||
r := newReplica(b, c.ID, c.URL)
|
||||
|
||||
// Automatically subscribe to the config topic.
|
||||
t := b.createTopicIfNotExists(BroadcastTopicID)
|
||||
|
@ -657,7 +672,7 @@ func (fsm *brokerFSM) Restore(r io.Reader) error {
|
|||
// Update the replicas.
|
||||
for _, sr := range s.Replicas {
|
||||
// Create replica.
|
||||
r := newReplica(b, sr.ID)
|
||||
r := newReplica(b, sr.ID, sr.URL)
|
||||
b.replicas[r.id] = r
|
||||
|
||||
// Append replica's topics.
|
||||
|
@ -702,6 +717,7 @@ func (s *snapshotHeader) maxIndex() uint64 {
|
|||
type snapshotReplica struct {
|
||||
ID uint64 `json:"id"`
|
||||
Topics []*snapshotReplicaTopic `json:"topics"`
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
type snapshotTopic struct {
|
||||
|
@ -834,12 +850,19 @@ func (t *topic) encode(m *Message) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type replicas []*Replica
|
||||
|
||||
func (a replicas) Len() int { return len(a) }
|
||||
func (a replicas) Less(i, j int) bool { return a[i].id < a[j].id }
|
||||
func (a replicas) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// Replica represents a collection of subscriptions to topics on the broker.
|
||||
// The replica maintains the highest index read for each topic so that the
|
||||
// broker can use this high water mark for trimming the topic logs.
|
||||
type Replica struct {
|
||||
URL *url.URL
|
||||
|
||||
id uint64
|
||||
url *url.URL // TODO
|
||||
broker *Broker
|
||||
|
||||
writer io.Writer // currently attached writer
|
||||
|
@ -849,8 +872,15 @@ type Replica struct {
|
|||
}
|
||||
|
||||
// newReplica returns a new Replica instance associated with a broker.
|
||||
func newReplica(b *Broker, id uint64) *Replica {
|
||||
func newReplica(b *Broker, id uint64, urlstr string) *Replica {
|
||||
// get the url of the replica
|
||||
u, err := url.Parse(urlstr)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
return &Replica{
|
||||
URL: u,
|
||||
broker: b,
|
||||
id: id,
|
||||
topics: make(map[uint64]uint64),
|
||||
|
@ -943,7 +973,8 @@ func (r *Replica) WriteTo(w io.Writer) (int64, error) {
|
|||
|
||||
// CreateReplica creates a new replica.
|
||||
type CreateReplicaCommand struct {
|
||||
ID uint64 `json:"id"`
|
||||
ID uint64 `json:"id"`
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// DeleteReplicaCommand removes a replica.
|
||||
|
|
|
@ -48,7 +48,7 @@ func TestBroker_Publish(t *testing.T) {
|
|||
defer b.Close()
|
||||
|
||||
// Create a new named replica.
|
||||
if err := b.CreateReplica(2000); err != nil {
|
||||
if err := b.CreateReplica(2000, &url.URL{Host: "localhost"}); err != nil {
|
||||
t.Fatalf("create replica: %s", err)
|
||||
}
|
||||
|
||||
|
@ -122,8 +122,8 @@ func TestBroker_CreateReplica_ErrReplicaExists(t *testing.T) {
|
|||
defer b.Close()
|
||||
|
||||
// Create a replica twice.
|
||||
b.CreateReplica(2000)
|
||||
if err := b.CreateReplica(2000); err != messaging.ErrReplicaExists {
|
||||
b.CreateReplica(2000, &url.URL{Host: "localhost"})
|
||||
if err := b.CreateReplica(2000, &url.URL{Host: "localhost"}); err != messaging.ErrReplicaExists {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ func TestBroker_DeleteReplica(t *testing.T) {
|
|||
defer b.Close()
|
||||
|
||||
// Create a new named replica.
|
||||
if err := b.CreateReplica(2000); err != nil {
|
||||
if err := b.CreateReplica(2000, &url.URL{Host: "localhost"}); err != nil {
|
||||
t.Fatalf("create replica: %s", err)
|
||||
}
|
||||
|
||||
|
@ -179,7 +179,7 @@ func TestBroker_DeleteReplica_ErrReplicaNotFound(t *testing.T) {
|
|||
func TestBroker_Subscribe_ErrReplicaNotFound(t *testing.T) {
|
||||
b := NewBroker(nil)
|
||||
defer b.Close()
|
||||
b.CreateReplica(2000)
|
||||
b.CreateReplica(2000, &url.URL{Host: "localhost"})
|
||||
if err := b.Subscribe(3000, 20); err != messaging.ErrReplicaNotFound {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
|
|
|
@ -225,15 +225,18 @@ func (c *Client) Publish(m *Message) (uint64, error) {
|
|||
}
|
||||
|
||||
// CreateReplica creates a replica on the broker.
|
||||
func (c *Client) CreateReplica(id uint64) error {
|
||||
func (c *Client) CreateReplica(id uint64, u *url.URL) error {
|
||||
var resp *http.Response
|
||||
var err error
|
||||
|
||||
u := *c.LeaderURL()
|
||||
leaderURL := *c.LeaderURL()
|
||||
for {
|
||||
u.Path = "/messaging/replicas"
|
||||
u.RawQuery = url.Values{"id": {strconv.FormatUint(id, 10)}}.Encode()
|
||||
resp, err = http.Post(u.String(), "application/octet-stream", nil)
|
||||
leaderURL.Path = "/messaging/replicas"
|
||||
leaderURL.RawQuery = url.Values{
|
||||
"id": {strconv.FormatUint(id, 10)},
|
||||
"url": {u.String()},
|
||||
}.Encode()
|
||||
resp, err = http.Post(leaderURL.String(), "application/octet-stream", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -246,7 +249,7 @@ func (c *Client) CreateReplica(id uint64) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("bad redirect: %s", resp.Header.Get("Location"))
|
||||
}
|
||||
u = *redirectURL
|
||||
leaderURL = *redirectURL
|
||||
continue
|
||||
} else if resp.StatusCode != http.StatusCreated {
|
||||
return errors.New(resp.Header.Get("X-Broker-Error"))
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestClient_Open(t *testing.T) {
|
|||
defer c.Close()
|
||||
|
||||
// Create replica on broker.
|
||||
c.Server.Handler.Broker().CreateReplica(1000)
|
||||
c.Server.Handler.Broker().CreateReplica(1000, &url.URL{Host: "localhost"})
|
||||
|
||||
// Open client to broker.
|
||||
f := NewTempFile()
|
||||
|
@ -81,7 +81,7 @@ func TestClient_Close(t *testing.T) {
|
|||
defer c.Close()
|
||||
|
||||
// Create replica on broker.
|
||||
c.Server.Handler.Broker().CreateReplica(1000)
|
||||
c.Server.Handler.Broker().CreateReplica(1000, &url.URL{Host: "localhost"})
|
||||
|
||||
// Open client to broker.
|
||||
f := NewTempFile()
|
||||
|
@ -141,7 +141,7 @@ func TestClient_CreateReplica(t *testing.T) {
|
|||
defer c.Close()
|
||||
|
||||
// Create replica through client.
|
||||
if err := c.CreateReplica(123); err != nil {
|
||||
if err := c.CreateReplica(123, &url.URL{Host: "localhost"}); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
|
@ -155,8 +155,8 @@ func TestClient_CreateReplica(t *testing.T) {
|
|||
func TestClient_CreateReplica_Err(t *testing.T) {
|
||||
c := OpenClient(0)
|
||||
defer c.Close()
|
||||
c.Server.Handler.Broker().CreateReplica(123)
|
||||
if err := c.CreateReplica(123); err == nil || err.Error() != `replica already exists` {
|
||||
c.Server.Handler.Broker().CreateReplica(123, &url.URL{Host: "localhost"})
|
||||
if err := c.CreateReplica(123, &url.URL{Host: "localhost"}); err == nil || err.Error() != `replica already exists` {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -165,7 +165,7 @@ func TestClient_CreateReplica_Err(t *testing.T) {
|
|||
func TestClient_DeleteReplica(t *testing.T) {
|
||||
c := OpenClient(0)
|
||||
defer c.Close()
|
||||
c.Server.Handler.Broker().CreateReplica(123)
|
||||
c.Server.Handler.Broker().CreateReplica(123, &url.URL{Host: "localhost"})
|
||||
|
||||
// Delete replica through client.
|
||||
if err := c.DeleteReplica(123); err != nil {
|
||||
|
@ -182,7 +182,7 @@ func TestClient_DeleteReplica(t *testing.T) {
|
|||
func TestClient_Subscribe(t *testing.T) {
|
||||
c := OpenClient(0)
|
||||
defer c.Close()
|
||||
c.Server.Broker().CreateReplica(100)
|
||||
c.Server.Broker().CreateReplica(100, &url.URL{Host: "localhost"})
|
||||
|
||||
// Create subscription through client.
|
||||
if err := c.Subscribe(100, 200); err != nil {
|
||||
|
@ -208,7 +208,7 @@ func TestClient_Subscribe_Err(t *testing.T) {
|
|||
func TestClient_Unsubscribe(t *testing.T) {
|
||||
c := OpenClient(0)
|
||||
defer c.Close()
|
||||
c.Server.Broker().CreateReplica(100)
|
||||
c.Server.Broker().CreateReplica(100, &url.URL{Host: "localhost"})
|
||||
c.Server.Broker().Subscribe(100, 200)
|
||||
|
||||
// Remove subscription through client.
|
||||
|
@ -250,7 +250,7 @@ func NewClient(replicaID uint64) *Client {
|
|||
// OpenClient returns a new, open instance of Client.
|
||||
func OpenClient(replicaID uint64) *Client {
|
||||
c := NewClient(replicaID)
|
||||
c.Server.Handler.Broker().CreateReplica(replicaID)
|
||||
c.Server.Handler.Broker().CreateReplica(replicaID, &url.URL{Host: "localhost"})
|
||||
|
||||
// Open client to broker.
|
||||
c.clientConfig = NewTempFile()
|
||||
|
|
|
@ -3,6 +3,7 @@ package messaging
|
|||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
|
@ -152,9 +153,14 @@ func (h *Handler) createReplica(w http.ResponseWriter, r *http.Request) {
|
|||
} else {
|
||||
replicaID = uint64(n)
|
||||
}
|
||||
u, err := url.Parse(r.URL.Query().Get("url"))
|
||||
if err != nil {
|
||||
h.error(w, err, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new replica on the broker.
|
||||
if err := h.broker.CreateReplica(replicaID); err == raft.ErrNotLeader {
|
||||
if err := h.broker.CreateReplica(replicaID, u); err == raft.ErrNotLeader {
|
||||
h.redirectToLeader(w, r)
|
||||
return
|
||||
} else if err == ErrReplicaExists {
|
||||
|
|
|
@ -18,7 +18,7 @@ func TestHandler_stream(t *testing.T) {
|
|||
defer s.Close()
|
||||
|
||||
// Create replica.
|
||||
s.Handler.Broker().CreateReplica(2000)
|
||||
s.Handler.Broker().CreateReplica(2000, &url.URL{Host: "localhost"})
|
||||
|
||||
// Send request to stream the replica.
|
||||
resp, err := http.Get(s.URL + `/messaging/messages?replicaID=2000`)
|
||||
|
@ -83,7 +83,7 @@ func TestHandler_publish(t *testing.T) {
|
|||
|
||||
// Stream subscription for a replica.
|
||||
var m messaging.Message
|
||||
s.Handler.Broker().CreateReplica(2000)
|
||||
s.Handler.Broker().CreateReplica(2000, &url.URL{Host: "localhost"})
|
||||
s.Handler.Broker().Subscribe(2000, 200)
|
||||
go func() {
|
||||
resp, _ := http.Get(s.URL + `/messaging/messages?replicaID=2000`)
|
||||
|
@ -220,7 +220,7 @@ func TestHandler_createReplica_ErrReplicaIDRequired(t *testing.T) {
|
|||
func TestHandler_createReplica_ErrReplicaExists(t *testing.T) {
|
||||
s := NewServer()
|
||||
defer s.Close()
|
||||
s.Handler.Broker().CreateReplica(200)
|
||||
s.Handler.Broker().CreateReplica(200, &url.URL{Host: "localhost"})
|
||||
|
||||
// Send request to the broker.
|
||||
resp, _ := http.Post(s.URL+`/messaging/replicas?id=200`, "application/octet-stream", nil)
|
||||
|
@ -236,7 +236,7 @@ func TestHandler_createReplica_ErrReplicaExists(t *testing.T) {
|
|||
func TestHandler_deleteReplica(t *testing.T) {
|
||||
s := NewServer()
|
||||
defer s.Close()
|
||||
s.Handler.Broker().CreateReplica(200)
|
||||
s.Handler.Broker().CreateReplica(200, &url.URL{Host: "localhost"})
|
||||
|
||||
// Send request to the broker.
|
||||
req, _ := http.NewRequest("DELETE", s.URL+`/messaging/replicas?id=200`, nil)
|
||||
|
@ -272,7 +272,7 @@ func TestHandler_deleteReplica_ErrReplicaIDRequired(t *testing.T) {
|
|||
func TestHandler_subscribe(t *testing.T) {
|
||||
s := NewServer()
|
||||
defer s.Close()
|
||||
s.Broker().CreateReplica(100)
|
||||
s.Broker().CreateReplica(100, &url.URL{Host: "localhost"})
|
||||
|
||||
// Send request to the broker.
|
||||
resp, _ := http.Post(s.URL+`/messaging/subscriptions?replicaID=100&topicID=200`, "application/octet-stream", nil)
|
||||
|
@ -330,7 +330,7 @@ func TestHandler_subscribe_ErrReplicaNotFound(t *testing.T) {
|
|||
func TestHandler_unsubscribe(t *testing.T) {
|
||||
s := NewServer()
|
||||
defer s.Close()
|
||||
s.Handler.Broker().CreateReplica(200)
|
||||
s.Handler.Broker().CreateReplica(200, &url.URL{Host: "localhost"})
|
||||
s.Handler.Broker().Subscribe(200, 100)
|
||||
|
||||
// Send request to the broker.
|
||||
|
|
|
@ -19,7 +19,7 @@ func TestBroker_Join(t *testing.T) {
|
|||
b0, b1 := s0.Broker(), s1.Broker()
|
||||
|
||||
// Create data on the first server.
|
||||
b0.CreateReplica(20)
|
||||
b0.CreateReplica(20, &url.URL{Host: "localhost"})
|
||||
b0.Subscribe(20, 1000)
|
||||
index, _ := b0.Publish(&messaging.Message{Type: 100, TopicID: 1000, Data: []byte("XXXX")})
|
||||
b0.Sync(index)
|
||||
|
@ -69,7 +69,7 @@ func BenchmarkCluster_Publish(b *testing.B) {
|
|||
defer c.Close()
|
||||
|
||||
// Create replica and connect client.
|
||||
c.Leader().Broker().CreateReplica(100)
|
||||
c.Leader().Broker().CreateReplica(100, &url.URL{Host: "localhost"})
|
||||
client := messaging.NewClient(100)
|
||||
client.Open("", []*url.URL{c.URL()})
|
||||
|
||||
|
|
351
server.go
351
server.go
|
@ -74,6 +74,9 @@ const (
|
|||
// Measurement messages
|
||||
createFieldsIfNotExistsMessageType = messaging.MessageType(0x60)
|
||||
|
||||
// Continuous Query messages
|
||||
createContinuousQueryMessageType = messaging.MessageType(0x70)
|
||||
|
||||
// Write series data messages (per-topic)
|
||||
writeRawSeriesMessageType = messaging.MessageType(0x80)
|
||||
|
||||
|
@ -105,6 +108,19 @@ type Server struct {
|
|||
Logger *log.Logger
|
||||
|
||||
authenticationEnabled bool
|
||||
|
||||
// continuous query settings
|
||||
RecomputePreviousN int
|
||||
RecomputeNoOlderThan time.Duration
|
||||
ComputeRunsPerInterval int
|
||||
ComputeNoMoreThan time.Duration
|
||||
|
||||
// This is the last time this data node has run continuous queries.
|
||||
// Keep this state in memory so if a broker makes a request in another second
|
||||
// to compute, it won't rerun CQs that have already been run. If this data node
|
||||
// is just getting the request after being off duty for running CQs then
|
||||
// it will recompute all of them
|
||||
lastContinuousQueryRun time.Time
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
|
@ -1665,9 +1681,23 @@ func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uin
|
|||
return 0, err
|
||||
}
|
||||
|
||||
// Get a field codec.
|
||||
s.mu.RLock()
|
||||
codec := NewFieldCodec(m)
|
||||
s.mu.RUnlock()
|
||||
if codec == nil {
|
||||
panic("field codec is nil")
|
||||
}
|
||||
|
||||
// Convert string-key/values to encoded fields.
|
||||
encodedFields, err := codec.EncodeFields(values)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Encode point header.
|
||||
data := marshalPointHeader(seriesID, timestamp.UnixNano())
|
||||
data = append(data, marshalValues(m.mapValues(values))...)
|
||||
data = append(data, encodedFields...)
|
||||
|
||||
// Publish "raw write series" message on shard's topic to broker.
|
||||
return s.client.Publish(&messaging.Message{
|
||||
|
@ -1787,7 +1817,7 @@ func (s *Server) createFieldsIfNotExists(database string, measurement string, va
|
|||
return nil
|
||||
}
|
||||
|
||||
// ReadSeries reads a single point from a series in the database.
|
||||
// ReadSeries reads a single point from a series in the database. It is used for debug and test only.
|
||||
func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time) (map[string]interface{}, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
@ -1837,7 +1867,8 @@ func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[str
|
|||
}
|
||||
|
||||
// Decode into a raw value map.
|
||||
rawValues := unmarshalValues(data)
|
||||
codec := NewFieldCodec(mm)
|
||||
rawValues := codec.DecodeFields(data)
|
||||
if rawValues == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -1918,11 +1949,11 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
|
|||
case *influxql.ShowRetentionPoliciesStatement:
|
||||
res = s.executeShowRetentionPoliciesStatement(stmt, user)
|
||||
case *influxql.CreateContinuousQueryStatement:
|
||||
continue
|
||||
res = s.executeCreateContinuousQueryStatement(stmt, user)
|
||||
case *influxql.DropContinuousQueryStatement:
|
||||
continue
|
||||
case *influxql.ShowContinuousQueriesStatement:
|
||||
continue
|
||||
res = s.executeShowContinuousQueriesStatement(stmt, database, user)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
|
||||
}
|
||||
|
@ -2006,6 +2037,7 @@ func (s *Server) planSelectStatement(stmt *influxql.SelectStatement) (*influxql.
|
|||
|
||||
// Plan query.
|
||||
p := influxql.NewPlanner(s)
|
||||
|
||||
return p.Plan(stmt)
|
||||
}
|
||||
|
||||
|
@ -2280,6 +2312,18 @@ func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesState
|
|||
return result
|
||||
}
|
||||
|
||||
func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement, database string, user *User) *Result {
|
||||
rows := make([]*influxql.Row, 0)
|
||||
for _, name := range s.Databases() {
|
||||
row := &influxql.Row{Columns: []string{"name", "query"}, Name: name}
|
||||
for _, cq := range s.ContinuousQueries(name) {
|
||||
row.Values = append(row.Values, []interface{}{cq.cq.Name, cq.Query})
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return &Result{Rows: rows}
|
||||
}
|
||||
|
||||
// filterMeasurementsByExpr filters a list of measurements by a tags expression.
|
||||
func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) {
|
||||
// Create a list to hold result measurements.
|
||||
|
@ -2465,6 +2509,28 @@ func (s *Server) executeShowRetentionPoliciesStatement(q *influxql.ShowRetention
|
|||
return &Result{Rows: []*influxql.Row{row}}
|
||||
}
|
||||
|
||||
func (s *Server) executeCreateContinuousQueryStatement(q *influxql.CreateContinuousQueryStatement, user *User) *Result {
|
||||
return &Result{Err: s.CreateContinuousQuery(q)}
|
||||
}
|
||||
|
||||
func (s *Server) CreateContinuousQuery(q *influxql.CreateContinuousQueryStatement) error {
|
||||
c := &createContinuousQueryCommand{Query: q.String()}
|
||||
_, err := s.broadcast(createContinuousQueryMessageType, c)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) ContinuousQueries(database string) []*ContinuousQuery {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
db := s.databases[database]
|
||||
if db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.continuousQueries
|
||||
}
|
||||
|
||||
// MeasurementNames returns a list of all measurements for the specified database.
|
||||
func (s *Server) MeasurementNames(database string) []string {
|
||||
s.mu.RLock()
|
||||
|
@ -2659,6 +2725,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
|
|||
err = s.applyCreateSeriesIfNotExists(m)
|
||||
case setPrivilegeMessageType:
|
||||
err = s.applySetPrivilege(m)
|
||||
case createContinuousQueryMessageType:
|
||||
err = s.applyCreateContinuousQueryCommand(m)
|
||||
}
|
||||
|
||||
// Sync high water mark and errors.
|
||||
|
@ -2718,7 +2786,7 @@ type Results struct {
|
|||
Err error
|
||||
}
|
||||
|
||||
// MarshalJSON encodes a Results stuct into JSON.
|
||||
// MarshalJSON encodes a Results struct into JSON.
|
||||
func (r Results) MarshalJSON() ([]byte, error) {
|
||||
// Define a struct that outputs "error" as a string.
|
||||
var o struct {
|
||||
|
@ -2773,7 +2841,7 @@ type MessagingClient interface {
|
|||
Publish(m *messaging.Message) (index uint64, err error)
|
||||
|
||||
// Creates a new replica with a given ID on the broker.
|
||||
CreateReplica(replicaID uint64) error
|
||||
CreateReplica(replicaID uint64, connectURL *url.URL) error
|
||||
|
||||
// Deletes an existing replica with a given ID from the broker.
|
||||
DeleteReplica(replicaID uint64) error
|
||||
|
@ -2911,9 +2979,272 @@ func HashPassword(password string) ([]byte, error) {
|
|||
// ContinuousQuery represents a query that exists on the server and processes
|
||||
// each incoming event.
|
||||
type ContinuousQuery struct {
|
||||
ID uint32
|
||||
Query string
|
||||
// TODO: ParsedQuery *parser.SelectQuery
|
||||
Query string `json:"query"`
|
||||
|
||||
mu sync.Mutex
|
||||
cq *influxql.CreateContinuousQueryStatement
|
||||
lastRun time.Time
|
||||
intoDB string
|
||||
intoRP string
|
||||
intoMeasurement string
|
||||
}
|
||||
|
||||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement
|
||||
func NewContinuousQuery(q string) (*ContinuousQuery, error) {
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cq, ok := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if !ok {
|
||||
return nil, errors.New("query isn't a continuous query")
|
||||
}
|
||||
|
||||
cquery := &ContinuousQuery{
|
||||
Query: q,
|
||||
cq: cq,
|
||||
}
|
||||
|
||||
// set which database and retention policy, and measuremet a CQ is writing into
|
||||
a, err := influxql.SplitIdent(cq.Source.Target.Measurement)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// set the default into database to the same as the from database
|
||||
cquery.intoDB = cq.Database
|
||||
|
||||
if len(a) == 1 { // into only set the measurement name. keep default db and rp
|
||||
cquery.intoMeasurement = a[0]
|
||||
} else if len(a) == 2 { // into set the rp and the measurement
|
||||
cquery.intoRP = a[0]
|
||||
cquery.intoMeasurement = a[1]
|
||||
} else { // into set db, rp, and measurement
|
||||
cquery.intoDB = a[0]
|
||||
cquery.intoRP = a[1]
|
||||
cquery.intoMeasurement = a[2]
|
||||
}
|
||||
|
||||
return cquery, nil
|
||||
}
|
||||
|
||||
// applyCreateContinuousQueryCommand adds the continuous query to the database object and saves it to the metastore
|
||||
func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
|
||||
var c createContinuousQueryCommand
|
||||
mustUnmarshalJSON(m.Data, &c)
|
||||
|
||||
cq, err := NewContinuousQuery(c.Query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// normalize the select statement in the CQ so that it has the database and retention policy inserted
|
||||
if err := s.NormalizeStatement(cq.cq.Source, cq.cq.Database); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// ensure the into database exists
|
||||
if s.databases[cq.intoDB] == nil {
|
||||
return ErrDatabaseNotFound
|
||||
}
|
||||
|
||||
// Retrieve the database.
|
||||
db := s.databases[cq.cq.Database]
|
||||
if db == nil {
|
||||
return ErrDatabaseNotFound
|
||||
} else if db.continuousQueryByName(cq.cq.Name) != nil {
|
||||
return ErrContinuousQueryExists
|
||||
}
|
||||
|
||||
// Add cq to the database.
|
||||
db.continuousQueries = append(db.continuousQueries, cq)
|
||||
|
||||
// Persist to metastore.
|
||||
s.meta.mustUpdate(func(tx *metatx) error {
|
||||
return tx.saveDatabase(db)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunContinuousQueries will run any continuous queries that are due to run and write the
|
||||
// results back into the database
|
||||
func (s *Server) RunContinuousQueries() error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, d := range s.databases {
|
||||
for _, c := range d.continuousQueries {
|
||||
if s.shouldRunContinuousQuery(c) {
|
||||
// set the into retention policy based on what is now the default
|
||||
if c.intoRP == "" {
|
||||
c.intoRP = d.defaultRetentionPolicy
|
||||
}
|
||||
go func(cq *ContinuousQuery) {
|
||||
s.runContinuousQuery(c)
|
||||
}(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
|
||||
// lastRunTime of the CQ and the rules for when to run set through the config to determine
|
||||
// if this CQ should be run
|
||||
func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
|
||||
// if it's not aggregated we don't run it
|
||||
if !cq.cq.Source.Aggregated() {
|
||||
return false
|
||||
}
|
||||
|
||||
// since it's aggregated we need to figure how often it should be run
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// determine how often we should run this continuous query.
|
||||
// group by time / the number of times to compute
|
||||
computeEvery := time.Duration(interval.Nanoseconds()/int64(s.ComputeRunsPerInterval)) * time.Nanosecond
|
||||
// make sure we're running no more frequently than the setting in the config
|
||||
if computeEvery < s.ComputeNoMoreThan {
|
||||
computeEvery = s.ComputeNoMoreThan
|
||||
}
|
||||
|
||||
// if we've passed the amount of time since the last run, do it up
|
||||
if cq.lastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// runContinuousQuery will execute a continuous query
|
||||
// TODO: make this fan out to the cluster instead of running all the queries on this single data node
|
||||
func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
|
||||
cq.mu.Lock()
|
||||
defer cq.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
cq.lastRun = now
|
||||
|
||||
interval, err := cq.cq.Source.GroupByInterval()
|
||||
if err != nil || interval == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
startTime := now.Round(interval)
|
||||
if startTime.UnixNano() > now.UnixNano() {
|
||||
startTime = startTime.Add(-interval)
|
||||
}
|
||||
|
||||
if err := cq.cq.Source.SetTimeRange(startTime, startTime.Add(interval)); err != nil {
|
||||
log.Printf("cq error setting time range: %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
|
||||
}
|
||||
|
||||
for i := 0; i < s.RecomputePreviousN; i++ {
|
||||
// if we're already more time past the previous window than we're going to look back, stop
|
||||
if now.Sub(startTime) > s.RecomputeNoOlderThan {
|
||||
return
|
||||
}
|
||||
newStartTime := startTime.Add(-interval)
|
||||
|
||||
if err := cq.cq.Source.SetTimeRange(newStartTime, startTime); err != nil {
|
||||
log.Printf("cq error setting time range: %s\n", err.Error())
|
||||
}
|
||||
|
||||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
|
||||
log.Printf("cq error: %s. running: %s\n", err.Error(), cq.cq.String())
|
||||
}
|
||||
|
||||
startTime = newStartTime
|
||||
}
|
||||
}
|
||||
|
||||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
|
||||
func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
|
||||
e, err := s.planSelectStatement(cq.cq.Source)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Execute plan.
|
||||
ch, err := e.Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all rows from channel and write them in
|
||||
for row := range ch {
|
||||
points, err := s.convertRowToPoints(cq.intoMeasurement, row)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(points) > 0 {
|
||||
_, err = s.WriteSeries(cq.intoDB, cq.intoRP, points)
|
||||
if err != nil {
|
||||
log.Printf("[cq] err: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertRowToPoints will convert a query result Row into Points that can be written back in.
|
||||
// Used for continuous and INTO queries
|
||||
func (s *Server) convertRowToPoints(measurementName string, row *influxql.Row) ([]Point, error) {
|
||||
// figure out which parts of the result are the time and which are the fields
|
||||
timeIndex := -1
|
||||
fieldIndexes := make(map[string]int)
|
||||
for i, c := range row.Columns {
|
||||
if c == "time" {
|
||||
timeIndex = i
|
||||
} else {
|
||||
fieldIndexes[c] = i
|
||||
}
|
||||
}
|
||||
|
||||
if timeIndex == -1 {
|
||||
return nil, errors.New("cq error finding time index in result")
|
||||
}
|
||||
|
||||
points := make([]Point, 0, len(row.Values))
|
||||
for _, v := range row.Values {
|
||||
vals := make(map[string]interface{})
|
||||
for fieldName, fieldIndex := range fieldIndexes {
|
||||
vals[fieldName] = v[fieldIndex]
|
||||
}
|
||||
|
||||
p := &Point{
|
||||
Name: measurementName,
|
||||
Tags: row.Tags,
|
||||
Timestamp: v[timeIndex].(time.Time),
|
||||
Values: vals,
|
||||
}
|
||||
|
||||
points = append(points, *p)
|
||||
}
|
||||
|
||||
return points, nil
|
||||
}
|
||||
|
||||
// createContinuousQueryCommand is the raft command for creating a continuous query on a database
|
||||
type createContinuousQueryCommand struct {
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
||||
// copyURL returns a copy of the the URL.
|
||||
|
|
145
server_test.go
145
server_test.go
|
@ -1099,6 +1099,143 @@ func TestServer_NormalizeQuery(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the server can create a continuous query
|
||||
func TestServer_CreateContinuousQuery(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create the "foo" database.
|
||||
if err := s.CreateDatabase("foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s.SetDefaultRetentionPolicy("foo", "bar")
|
||||
|
||||
// create and check
|
||||
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing query %s", err.Error())
|
||||
}
|
||||
cq := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if err := s.CreateContinuousQuery(cq); err != nil {
|
||||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
|
||||
queries := s.ContinuousQueries("foo")
|
||||
cqObj, _ := influxdb.NewContinuousQuery(q)
|
||||
expected := []*influxdb.ContinuousQuery{cqObj}
|
||||
if mustMarshalJSON(expected) != mustMarshalJSON(queries) {
|
||||
t.Fatalf("query not saved:\n\texp: %s\n\tgot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
|
||||
}
|
||||
s.Restart()
|
||||
|
||||
// check again
|
||||
queries = s.ContinuousQueries("foo")
|
||||
if !reflect.DeepEqual(queries, expected) {
|
||||
t.Fatalf("query not saved:\n\texp: %s\ngot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the server prevents a duplicate named continuous query from being created
|
||||
func TestServer_CreateContinuousQuery_ErrContinuousQueryExists(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure the server returns an error when creating a continuous query on a database that doesn't exist
|
||||
func TestServer_CreateContinuousQuery_ErrDatabaseNotFound(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure the server returns an error when creating a continuous query on a retention policy that doesn't exist
|
||||
func TestServer_CreateContinuousQuery_ErrRetentionPolicyNotFound(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) {
|
||||
t.Skip("pending")
|
||||
}
|
||||
|
||||
// Ensure
|
||||
func TestServer_RunContinuousQueries(t *testing.T) {
|
||||
s := OpenServer(NewMessagingClient())
|
||||
defer s.Close()
|
||||
|
||||
// Create the "foo" database.
|
||||
if err := s.CreateDatabase("foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s.SetDefaultRetentionPolicy("foo", "raw")
|
||||
|
||||
s.RecomputePreviousN = 50
|
||||
s.RecomputeNoOlderThan = time.Second
|
||||
s.ComputeRunsPerInterval = 5
|
||||
s.ComputeNoMoreThan = 2 * time.Millisecond
|
||||
|
||||
// create cq and check
|
||||
q := `CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT mean(value) INTO cpu_region FROM cpu GROUP BY time(5ms), region END`
|
||||
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
|
||||
if err != nil {
|
||||
t.Fatalf("error parsing query %s", err.Error())
|
||||
}
|
||||
cq := stmt.(*influxql.CreateContinuousQueryStatement)
|
||||
if err := s.CreateContinuousQuery(cq); err != nil {
|
||||
t.Fatalf("error creating continuous query %s", err.Error())
|
||||
}
|
||||
if err := s.RunContinuousQueries(); err != nil {
|
||||
t.Fatalf("error running cqs when no data exists: %s", err.Error())
|
||||
}
|
||||
|
||||
// set a test time in the middle of a 5 second interval that we can work with
|
||||
testTime := time.Now().UTC().Round(5 * time.Millisecond)
|
||||
if testTime.UnixNano() > time.Now().UnixNano() {
|
||||
testTime = testTime.Add(-5 * time.Millisecond)
|
||||
}
|
||||
testTime.Add(time.Millisecond * 2)
|
||||
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(30)}}})
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east"}, Timestamp: testTime.Add(-time.Millisecond * 5), Values: map[string]interface{}{"value": float64(20)}}})
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime, Values: map[string]interface{}{"value": float64(100)}}})
|
||||
|
||||
// Run CQs after a period of time
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
s.RunContinuousQueries()
|
||||
// give the CQs time to run
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
verify := func(num int, exp string) {
|
||||
results := s.ExecuteQuery(MustParseQuery(`SELECT mean(mean) FROM cpu_region GROUP BY region`), "foo", nil)
|
||||
if res := results.Results[0]; res.Err != nil {
|
||||
t.Fatalf("unexpected error verify %d: %s", num, res.Err)
|
||||
} else if len(res.Rows) != 2 {
|
||||
t.Fatalf("unexpected row count on verify %d: %d", num, len(res.Rows))
|
||||
} else if s := mustMarshalJSON(res); s != exp {
|
||||
t.Fatalf("unexpected row(0) on verify %d: %s", num, s)
|
||||
}
|
||||
}
|
||||
|
||||
// ensure CQ results were saved
|
||||
verify(1, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`)
|
||||
|
||||
// ensure that repeated runs don't cause duplicate data
|
||||
s.RunContinuousQueries()
|
||||
verify(2, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",100]]}]}`)
|
||||
|
||||
// ensure that data written into a previous window is picked up and the result recomputed.
|
||||
time.Sleep(time.Millisecond * 2)
|
||||
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west"}, Timestamp: testTime.Add(-time.Millisecond), Values: map[string]interface{}{"value": float64(50)}}})
|
||||
s.RunContinuousQueries()
|
||||
// give CQs time to run
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
verify(3, `{"rows":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]}`)
|
||||
}
|
||||
|
||||
func mustMarshalJSON(v interface{}) string {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
|
@ -1242,7 +1379,7 @@ type MessagingClient struct {
|
|||
c chan *messaging.Message
|
||||
|
||||
PublishFunc func(*messaging.Message) (uint64, error)
|
||||
CreateReplicaFunc func(replicaID uint64) error
|
||||
CreateReplicaFunc func(replicaID uint64, connectURL *url.URL) error
|
||||
DeleteReplicaFunc func(replicaID uint64) error
|
||||
SubscribeFunc func(replicaID, topicID uint64) error
|
||||
UnsubscribeFunc func(replicaID, topicID uint64) error
|
||||
|
@ -1252,7 +1389,7 @@ type MessagingClient struct {
|
|||
func NewMessagingClient() *MessagingClient {
|
||||
c := &MessagingClient{c: make(chan *messaging.Message, 1)}
|
||||
c.PublishFunc = c.send
|
||||
c.CreateReplicaFunc = func(replicaID uint64) error { return nil }
|
||||
c.CreateReplicaFunc = func(replicaID uint64, connectURL *url.URL) error { return nil }
|
||||
c.DeleteReplicaFunc = func(replicaID uint64) error { return nil }
|
||||
c.SubscribeFunc = func(replicaID, topicID uint64) error { return nil }
|
||||
c.UnsubscribeFunc = func(replicaID, topicID uint64) error { return nil }
|
||||
|
@ -1275,8 +1412,8 @@ func (c *MessagingClient) send(m *messaging.Message) (uint64, error) {
|
|||
}
|
||||
|
||||
// Creates a new replica with a given ID on the broker.
|
||||
func (c *MessagingClient) CreateReplica(replicaID uint64) error {
|
||||
return c.CreateReplicaFunc(replicaID)
|
||||
func (c *MessagingClient) CreateReplica(replicaID uint64, connectURL *url.URL) error {
|
||||
return c.CreateReplicaFunc(replicaID, connectURL)
|
||||
}
|
||||
|
||||
// Deletes an existing replica with a given ID from the broker.
|
||||
|
|
82
shard.go
82
shard.go
|
@ -4,8 +4,6 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
|
@ -120,7 +118,6 @@ func (s *Shard) writeSeries(seriesID uint32, timestamp int64, values []byte, ove
|
|||
}
|
||||
|
||||
// Insert the values by timestamp.
|
||||
warn("[write]", seriesID, time.Unix(0, timestamp))
|
||||
if err := b.Put(u64tob(uint64(timestamp)), values); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -154,85 +151,6 @@ func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) {
|
|||
return
|
||||
}
|
||||
|
||||
// marshalValues encodes a set of field ids and values to a byte slice.
|
||||
func marshalValues(values map[uint8]interface{}) []byte {
|
||||
// Sort fields for consistency.
|
||||
fieldIDs := make([]uint8, 0, len(values))
|
||||
for fieldID := range values {
|
||||
fieldIDs = append(fieldIDs, fieldID)
|
||||
}
|
||||
sort.Sort(uint8Slice(fieldIDs))
|
||||
|
||||
// Allocate byte slice and write field count.
|
||||
b := make([]byte, 1, 10)
|
||||
b[0] = byte(len(values))
|
||||
|
||||
// Write out each field.
|
||||
for _, fieldID := range fieldIDs {
|
||||
// Create a temporary buffer for this field.
|
||||
buf := make([]byte, 9)
|
||||
buf[0] = fieldID
|
||||
|
||||
// Convert integers to floats.
|
||||
v := values[fieldID]
|
||||
if intval, ok := v.(int); ok {
|
||||
v = float64(intval)
|
||||
}
|
||||
|
||||
// Encode value after field id.
|
||||
// TODO: Support non-float types.
|
||||
switch v := v.(type) {
|
||||
case float64:
|
||||
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported value type: %T", v))
|
||||
}
|
||||
|
||||
// Append temp buffer to the end.
|
||||
b = append(b, buf...)
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// unmarshalValues decodes a byte slice into a set of field ids and values.
|
||||
func unmarshalValues(b []byte) map[uint8]interface{} {
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read the field count from the field byte.
|
||||
n := int(b[0])
|
||||
|
||||
// Create a map to hold the decoded data.
|
||||
values := make(map[uint8]interface{}, n)
|
||||
|
||||
// Start from the second byte and iterate over until we're done decoding.
|
||||
b = b[1:]
|
||||
for i := 0; i < n; i++ {
|
||||
// First byte is the field identifier.
|
||||
fieldID := b[0]
|
||||
|
||||
// Decode value.
|
||||
// TODO: Support non-float types.
|
||||
value := math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
|
||||
|
||||
values[fieldID] = value
|
||||
|
||||
// Move bytes forward.
|
||||
b = b[9:]
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
// unmarshalValue extracts a single value by field id from an encoded byte slice.
|
||||
func unmarshalValue(b []byte, fieldID uint8) interface{} {
|
||||
// OPTIMIZE: Don't materialize entire map. Just search for value.
|
||||
values := unmarshalValues(b)
|
||||
return values[fieldID]
|
||||
}
|
||||
|
||||
type uint8Slice []uint8
|
||||
|
||||
func (p uint8Slice) Len() int { return len(p) }
|
||||
|
|
70
tx.go
70
tx.go
|
@ -129,6 +129,9 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
|
|||
}
|
||||
tagSets := m.tagSets(stmt, dimensions)
|
||||
|
||||
// Get a field decoder.
|
||||
d := NewFieldCodec(m)
|
||||
|
||||
// Create an iterator for every shard.
|
||||
var itrs []influxql.Iterator
|
||||
for tag, set := range tagSets {
|
||||
|
@ -139,18 +142,19 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
|
|||
// create a series cursor for each unique series id
|
||||
cursors := make([]*seriesCursor, 0, len(set))
|
||||
for id, cond := range set {
|
||||
cursors = append(cursors, &seriesCursor{id: id, condition: cond})
|
||||
cursors = append(cursors, &seriesCursor{id: id, condition: cond, decoder: d})
|
||||
}
|
||||
|
||||
// create the shard iterator that will map over all series for the shard
|
||||
itr := &shardIterator{
|
||||
fieldName: f.Name,
|
||||
fieldID: f.ID,
|
||||
tags: tag,
|
||||
db: sh.store,
|
||||
cursors: cursors,
|
||||
tmin: tmin.UnixNano(),
|
||||
tmax: tmax.UnixNano(),
|
||||
measurement: m,
|
||||
fieldName: f.Name,
|
||||
fieldID: f.ID,
|
||||
tags: tag,
|
||||
db: sh.store,
|
||||
cursors: cursors,
|
||||
tmin: tmin.UnixNano(),
|
||||
tmax: tmax.UnixNano(),
|
||||
}
|
||||
|
||||
// Add to tx so the bolt transaction can be opened/closed.
|
||||
|
@ -177,14 +181,15 @@ func splitIdent(s string) (db, rp, m string, err error) {
|
|||
|
||||
// shardIterator represents an iterator for traversing over a single series.
|
||||
type shardIterator struct {
|
||||
fieldName string
|
||||
fieldID uint8
|
||||
tags string // encoded dimensional tag values
|
||||
cursors []*seriesCursor
|
||||
keyValues []keyValue
|
||||
db *bolt.DB // data stores by shard id
|
||||
txn *bolt.Tx // read transactions by shard id
|
||||
tmin, tmax int64
|
||||
fieldName string
|
||||
fieldID uint8
|
||||
measurement *Measurement
|
||||
tags string // encoded dimensional tag values
|
||||
cursors []*seriesCursor
|
||||
keyValues []keyValue
|
||||
db *bolt.DB // data stores by shard id
|
||||
txn *bolt.Tx // read transactions by shard id
|
||||
tmin, tmax int64
|
||||
}
|
||||
|
||||
func (i *shardIterator) open() error {
|
||||
|
@ -207,7 +212,7 @@ func (i *shardIterator) open() error {
|
|||
|
||||
i.keyValues = make([]keyValue, len(i.cursors))
|
||||
for j, cur := range i.cursors {
|
||||
i.keyValues[j].key, i.keyValues[j].value = cur.Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
|
||||
i.keyValues[j].key, i.keyValues[j].data, i.keyValues[j].value = cur.Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -220,7 +225,7 @@ func (i *shardIterator) close() error {
|
|||
|
||||
func (i *shardIterator) Tags() string { return i.tags }
|
||||
|
||||
func (i *shardIterator) Next() (key int64, value interface{}) {
|
||||
func (i *shardIterator) Next() (key int64, data []byte, value interface{}) {
|
||||
min := -1
|
||||
|
||||
for ind, kv := range i.keyValues {
|
||||
|
@ -231,35 +236,42 @@ func (i *shardIterator) Next() (key int64, value interface{}) {
|
|||
|
||||
// if min is -1 we've exhausted all cursors for the given time range
|
||||
if min == -1 {
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
kv := i.keyValues[min]
|
||||
key = kv.key
|
||||
data = kv.data
|
||||
value = kv.value
|
||||
|
||||
i.keyValues[min].key, i.keyValues[min].value = i.cursors[min].Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
|
||||
return key, value
|
||||
i.keyValues[min].key, i.keyValues[min].data, i.keyValues[min].value = i.cursors[min].Next(i.fieldName, i.fieldID, i.tmin, i.tmax)
|
||||
return key, data, value
|
||||
}
|
||||
|
||||
type keyValue struct {
|
||||
key int64
|
||||
data []byte
|
||||
value interface{}
|
||||
}
|
||||
|
||||
type fieldDecoder interface {
|
||||
DecodeByID(fieldID uint8, b []byte) (interface{}, error)
|
||||
}
|
||||
|
||||
type seriesCursor struct {
|
||||
id uint32
|
||||
condition influxql.Expr
|
||||
cur *bolt.Cursor
|
||||
initialized bool
|
||||
decoder fieldDecoder
|
||||
}
|
||||
|
||||
func (c *seriesCursor) Next(fieldName string, fieldID uint8, tmin, tmax int64) (key int64, value interface{}) {
|
||||
func (c *seriesCursor) Next(fieldName string, fieldID uint8, tmin, tmax int64) (key int64, data []byte, value interface{}) {
|
||||
// TODO: clean this up when we make it so series ids are only queried against the shards they exist in.
|
||||
// Right now we query for all series ids on a query against each shard, even if that shard may not have the
|
||||
// data, so cur could be nil.
|
||||
if c.cur == nil {
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
for {
|
||||
|
@ -273,14 +285,18 @@ func (c *seriesCursor) Next(fieldName string, fieldID uint8, tmin, tmax int64) (
|
|||
|
||||
// Exit if there is no more data.
|
||||
if k == nil {
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
// Marshal key & value.
|
||||
key, value = int64(btou64(k)), unmarshalValue(v, fieldID)
|
||||
key := int64(btou64(k))
|
||||
value, err := c.decoder.DecodeByID(fieldID, v)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if key > tmax {
|
||||
return 0, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
// Skip to the next if we don't have a field value for this field for this point
|
||||
|
@ -295,6 +311,6 @@ func (c *seriesCursor) Next(fieldName string, fieldID uint8, tmin, tmax int64) (
|
|||
}
|
||||
}
|
||||
|
||||
return key, value
|
||||
return key, v, value
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func TestTx_CreateIterators(t *testing.T) {
|
|||
func slurp(itrs []influxql.Iterator) []keyValue {
|
||||
var rows []keyValue
|
||||
for _, itr := range itrs {
|
||||
for k, v := itr.Next(); k != 0; k, v = itr.Next() {
|
||||
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
|
||||
rows = append(rows, keyValue{key: k, value: v, tags: itr.Tags()})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue