Clean up influxdb.
parent
97f9670fa3
commit
cdc5a47efa
146
broker.go
146
broker.go
|
@ -1,146 +0,0 @@
|
||||||
package influxdb
|
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math/rand"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// DefaultContinuousQueryCheckTime is how frequently the broker will ask a data node
|
|
||||||
// in the cluster to run any continuous queries that should be run.
|
|
||||||
DefaultContinuousQueryCheckTime = 1 * time.Second
|
|
||||||
|
|
||||||
// DefaultDataNodeTimeout is how long the broker will wait before timing out on a data node
|
|
||||||
// that it has requested process continuous queries.
|
|
||||||
DefaultDataNodeTimeout = 1 * time.Second
|
|
||||||
|
|
||||||
// DefaultFailureSleep is how long the broker will sleep before trying the next data node in
|
|
||||||
// the cluster if the current data node failed to respond
|
|
||||||
DefaultFailureSleep = 100 * time.Millisecond
|
|
||||||
)
|
|
||||||
|
|
||||||
// Broker represents an InfluxDB specific messaging broker.
|
|
||||||
type Broker struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
*messaging.Broker
|
|
||||||
client *http.Client
|
|
||||||
|
|
||||||
done chan struct{}
|
|
||||||
|
|
||||||
// variables to control when to trigger processing and when to timeout
|
|
||||||
TriggerInterval time.Duration
|
|
||||||
TriggerTimeout time.Duration
|
|
||||||
TriggerFailurePause time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBroker returns a new instance of a Broker with default values.
|
|
||||||
func NewBroker() *Broker {
|
|
||||||
return &Broker{
|
|
||||||
Broker: messaging.NewBroker(),
|
|
||||||
client: &http.Client{
|
|
||||||
Timeout: DefaultDataNodeTimeout,
|
|
||||||
},
|
|
||||||
|
|
||||||
TriggerInterval: 5 * time.Second,
|
|
||||||
TriggerTimeout: 20 * time.Second,
|
|
||||||
TriggerFailurePause: 1 * time.Second,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunContinuousQueryLoop starts running continuous queries on a background goroutine.
|
|
||||||
func (b *Broker) RunContinuousQueryLoop() {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
|
|
||||||
b.done = make(chan struct{})
|
|
||||||
go b.continuousQueryLoop(b.done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the broker.
|
|
||||||
func (b *Broker) Close() error {
|
|
||||||
b.mu.Lock()
|
|
||||||
defer b.mu.Unlock()
|
|
||||||
|
|
||||||
if b.done != nil {
|
|
||||||
close(b.done)
|
|
||||||
b.done = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// since the client doesn't specify a Transport when created,
|
|
||||||
// it will use the DefaultTransport.
|
|
||||||
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
|
|
||||||
|
|
||||||
return b.Broker.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Broker) continuousQueryLoop(done chan struct{}) {
|
|
||||||
for {
|
|
||||||
// Check if broker is currently leader.
|
|
||||||
if b.Broker.IsLeader() {
|
|
||||||
b.runContinuousQueries()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sleep until either the broker is closed or we need to run continuous queries again
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
return
|
|
||||||
case <-time.After(DefaultContinuousQueryCheckTime):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Broker) runContinuousQueries() {
|
|
||||||
topic := b.Broker.Topic(BroadcastTopicID)
|
|
||||||
if topic == nil {
|
|
||||||
log.Println("broker cq: no broadcast topic currently available.")
|
|
||||||
return // don't have any topics to get data urls from, give it up
|
|
||||||
}
|
|
||||||
dataURLs := topic.DataURLs()
|
|
||||||
if len(dataURLs) == 0 {
|
|
||||||
log.Println("broker cq: no data nodes currently available.")
|
|
||||||
return // don't have any data urls to try, give it up
|
|
||||||
}
|
|
||||||
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
// get a set of random indexes so we can randomly distribute cq load over nodes
|
|
||||||
ri := rand.Perm(len(dataURLs))
|
|
||||||
for _, i := range ri {
|
|
||||||
u := dataURLs[i]
|
|
||||||
// if no error, we're all good
|
|
||||||
err := b.requestContinuousQueryProcessing(u)
|
|
||||||
if err == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Printf("broker cq: error hitting data node: %s: %s\n", u.String(), err.Error())
|
|
||||||
|
|
||||||
// let the loop try the next data node in the cluster
|
|
||||||
<-time.After(DefaultFailureSleep)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Broker) requestContinuousQueryProcessing(cqURL url.URL) error {
|
|
||||||
b.mu.RLock()
|
|
||||||
defer b.mu.RUnlock()
|
|
||||||
// Send request.
|
|
||||||
cqURL.Path = "/data/process_continuous_queries"
|
|
||||||
cqURL.Scheme = "http"
|
|
||||||
resp, err := b.client.Post(cqURL.String(), "application/octet-stream", nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// Check if created.
|
|
||||||
if resp.StatusCode != http.StatusAccepted {
|
|
||||||
return fmt.Errorf("request returned status %s", resp.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
*/
|
|
|
@ -1,97 +0,0 @@
|
||||||
package influxdb_test
|
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdb/influxdb"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestBroker_WillRunQueries(t *testing.T) {
|
|
||||||
// TODO fix the raciness in this test
|
|
||||||
t.Skip()
|
|
||||||
// this handler should just work
|
|
||||||
testHandler := &BrokerTestHandler{}
|
|
||||||
server := httptest.NewServer(testHandler)
|
|
||||||
defer server.Close()
|
|
||||||
// this will timeout on the trigger request
|
|
||||||
timeoutHandler := &BrokerTestHandler{wait: 1100 * time.Millisecond}
|
|
||||||
timeoutServer := httptest.NewServer(timeoutHandler)
|
|
||||||
defer timeoutServer.Close()
|
|
||||||
// this will return a 500
|
|
||||||
badHandler := &BrokerTestHandler{sendError: true}
|
|
||||||
badServer := httptest.NewServer(badHandler)
|
|
||||||
defer badServer.Close()
|
|
||||||
|
|
||||||
b := influxdb.NewBroker()
|
|
||||||
|
|
||||||
// set the trigger times and failure sleeps for the test
|
|
||||||
b.TriggerInterval = 2 * time.Millisecond
|
|
||||||
b.TriggerTimeout = 100 * time.Millisecond
|
|
||||||
b.TriggerFailurePause = 2 * time.Millisecond
|
|
||||||
|
|
||||||
f := tempfile()
|
|
||||||
defer os.Remove(f)
|
|
||||||
|
|
||||||
if err := b.Open(f, &url.URL{Host: "127.0.0.1:8080"}); err != nil {
|
|
||||||
t.Fatalf("error opening broker: %s", err)
|
|
||||||
}
|
|
||||||
if err := b.Initialize(); err != nil {
|
|
||||||
t.Fatalf("error initializing broker: %s", err)
|
|
||||||
}
|
|
||||||
defer b.Close()
|
|
||||||
|
|
||||||
// set the data nodes (replicas) so all the failure cases get hit first
|
|
||||||
if err := b.Broker.CreateReplica(1, &url.URL{Host: "127.0.0.1:8090"}); err != nil {
|
|
||||||
t.Fatalf("couldn't create replica %s", err.Error())
|
|
||||||
}
|
|
||||||
b.Broker.CreateReplica(2, &url.URL{Host: timeoutServer.URL[7:]})
|
|
||||||
b.Broker.CreateReplica(3, &url.URL{Host: badServer.URL[7:]})
|
|
||||||
b.Broker.CreateReplica(4, &url.URL{Host: server.URL[7:]})
|
|
||||||
|
|
||||||
b.RunContinuousQueryLoop()
|
|
||||||
// every failure and success case should be hit in this time frame
|
|
||||||
time.Sleep(1400 * time.Millisecond)
|
|
||||||
if timeoutHandler.requestCount != 1 {
|
|
||||||
t.Fatal("broker should have only sent 1 request to the server that times out.")
|
|
||||||
}
|
|
||||||
if badHandler.requestCount != 1 {
|
|
||||||
t.Fatal("broker should have only sent 1 request to the bad server. i.e. it didn't keep the state to make request to the good server")
|
|
||||||
}
|
|
||||||
if testHandler.requestCount < 1 || testHandler.processRequestCount < 1 {
|
|
||||||
t.Fatal("broker failed to send multiple continuous query requests to the data node")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type BrokerTestHandler struct {
|
|
||||||
requestCount int
|
|
||||||
processRequestCount int
|
|
||||||
sendError bool
|
|
||||||
wait time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServeHTTP serves an HTTP request.
|
|
||||||
func (h *BrokerTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
h.requestCount++
|
|
||||||
<-time.After(h.wait)
|
|
||||||
if h.sendError {
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
|
||||||
}
|
|
||||||
switch r.URL.Path {
|
|
||||||
case "/data/process_continuous_queries":
|
|
||||||
if r.Method == "POST" {
|
|
||||||
h.processRequestCount++
|
|
||||||
w.WriteHeader(http.StatusAccepted)
|
|
||||||
} else {
|
|
||||||
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
http.NotFound(w, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb"
|
|
||||||
"github.com/influxdb/influxdb/client"
|
"github.com/influxdb/influxdb/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,7 +45,7 @@ func TestClient_Ping(t *testing.T) {
|
||||||
|
|
||||||
func TestClient_Query(t *testing.T) {
|
func TestClient_Query(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
var data influxdb.Response
|
var data client.Response
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
_ = json.NewEncoder(w).Encode(data)
|
_ = json.NewEncoder(w).Encode(data)
|
||||||
}))
|
}))
|
||||||
|
@ -99,7 +98,7 @@ func TestClient_BasicAuth(t *testing.T) {
|
||||||
|
|
||||||
func TestClient_Write(t *testing.T) {
|
func TestClient_Write(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
var data influxdb.Response
|
var data client.Response
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
_ = json.NewEncoder(w).Encode(data)
|
_ = json.NewEncoder(w).Encode(data)
|
||||||
}))
|
}))
|
||||||
|
@ -127,7 +126,7 @@ func TestClient_UserAgent(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
receivedUserAgent = r.UserAgent()
|
receivedUserAgent = r.UserAgent()
|
||||||
|
|
||||||
var data influxdb.Response
|
var data client.Response
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
_ = json.NewEncoder(w).Encode(data)
|
_ = json.NewEncoder(w).Encode(data)
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -3,62 +3,21 @@ package cluster_test
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/cluster"
|
"github.com/influxdb/influxdb/cluster"
|
||||||
|
"github.com/influxdb/influxdb/influxql"
|
||||||
"github.com/influxdb/influxdb/meta"
|
"github.com/influxdb/influxdb/meta"
|
||||||
"github.com/influxdb/influxdb/test"
|
|
||||||
"github.com/influxdb/influxdb/tsdb"
|
"github.com/influxdb/influxdb/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeShardWriter struct {
|
|
||||||
ShardWriteFn func(shardID, nodeID uint64, points []tsdb.Point) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeShardWriter) Write(shardID, nodeID uint64, points []tsdb.Point) error {
|
|
||||||
return f.ShardWriteFn(shardID, nodeID, points)
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakeStore struct {
|
|
||||||
WriteFn func(shardID uint64, points []tsdb.Point) error
|
|
||||||
CreateShardfn func(database, retentionPolicy string, shardID uint64) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeStore) WriteToShard(shardID uint64, points []tsdb.Point) error {
|
|
||||||
return f.WriteFn(shardID, points)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64) error {
|
|
||||||
return f.CreateShardfn(database, retentionPolicy, shardID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTestMetaStore() *test.MetaStore {
|
|
||||||
ms := &test.MetaStore{}
|
|
||||||
rp := test.NewRetentionPolicy("myp", time.Hour, 3)
|
|
||||||
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
|
||||||
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
|
||||||
|
|
||||||
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
|
||||||
return rp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
|
||||||
for i, sg := range rp.ShardGroups {
|
|
||||||
if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) {
|
|
||||||
return &rp.ShardGroups[i], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
panic("should not get here")
|
|
||||||
}
|
|
||||||
return ms
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestCoordinatorEnsureShardMappingOne tests that a single point maps to
|
// TestCoordinatorEnsureShardMappingOne tests that a single point maps to
|
||||||
// a single shard
|
// a single shard
|
||||||
func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
||||||
ms := test.MetaStore{}
|
ms := MetaStore{}
|
||||||
rp := test.NewRetentionPolicy("myp", time.Hour, 3)
|
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||||
|
|
||||||
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
||||||
return rp, nil
|
return rp, nil
|
||||||
|
@ -92,10 +51,10 @@ func TestCoordinatorEnsureShardMappingOne(t *testing.T) {
|
||||||
// TestCoordinatorEnsureShardMappingMultiple tests that MapShards maps multiple points
|
// TestCoordinatorEnsureShardMappingMultiple tests that MapShards maps multiple points
|
||||||
// across shard group boundaries to multiple shards
|
// across shard group boundaries to multiple shards
|
||||||
func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
|
func TestCoordinatorEnsureShardMappingMultiple(t *testing.T) {
|
||||||
ms := test.MetaStore{}
|
ms := MetaStore{}
|
||||||
rp := test.NewRetentionPolicy("myp", time.Hour, 3)
|
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||||
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||||
test.AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||||
|
|
||||||
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
||||||
return rp, nil
|
return rp, nil
|
||||||
|
@ -297,3 +256,245 @@ func TestCoordinatorWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
shardID uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeShardWriter struct {
|
||||||
|
ShardWriteFn func(shardID, nodeID uint64, points []tsdb.Point) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeShardWriter) Write(shardID, nodeID uint64, points []tsdb.Point) error {
|
||||||
|
return f.ShardWriteFn(shardID, nodeID, points)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeStore struct {
|
||||||
|
WriteFn func(shardID uint64, points []tsdb.Point) error
|
||||||
|
CreateShardfn func(database, retentionPolicy string, shardID uint64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeStore) WriteToShard(shardID uint64, points []tsdb.Point) error {
|
||||||
|
return f.WriteFn(shardID, points)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64) error {
|
||||||
|
return f.CreateShardfn(database, retentionPolicy, shardID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestMetaStore() *MetaStore {
|
||||||
|
ms := &MetaStore{}
|
||||||
|
rp := NewRetentionPolicy("myp", time.Hour, 3)
|
||||||
|
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||||
|
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
|
||||||
|
|
||||||
|
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
|
||||||
|
return rp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||||
|
for i, sg := range rp.ShardGroups {
|
||||||
|
if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) {
|
||||||
|
return &rp.ShardGroups[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic("should not get here")
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetaStore struct {
|
||||||
|
OpenFn func(path string) error
|
||||||
|
CloseFn func() error
|
||||||
|
|
||||||
|
CreateContinuousQueryFn func(query string) (*meta.ContinuousQueryInfo, error)
|
||||||
|
DropContinuousQueryFn func(query string) error
|
||||||
|
|
||||||
|
NodeFn func(id uint64) (*meta.NodeInfo, error)
|
||||||
|
NodeByHostFn func(host string) (*meta.NodeInfo, error)
|
||||||
|
CreateNodeFn func(host string) (*meta.NodeInfo, error)
|
||||||
|
DeleteNodeFn func(id uint64) error
|
||||||
|
|
||||||
|
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||||
|
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||||
|
CreateDatabaseIfNotExistsFn func(name string) (*meta.DatabaseInfo, error)
|
||||||
|
DropDatabaseFn func(name string) error
|
||||||
|
|
||||||
|
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
|
||||||
|
CreateRetentionPolicyFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||||
|
CreateRetentionPolicyIfNotExistsFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||||
|
SetDefaultRetentionPolicyFn func(database, name string) error
|
||||||
|
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error)
|
||||||
|
DeleteRetentionPolicyFn func(database, name string) error
|
||||||
|
|
||||||
|
ShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||||
|
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||||
|
DeleteShardGroupFn func(database, policy string, shardID uint64) error
|
||||||
|
|
||||||
|
UserFn func(username string) (*meta.UserInfo, error)
|
||||||
|
CreateUserFn func(username, password string, admin bool) (*meta.UserInfo, error)
|
||||||
|
UpdateUserFn func(username, password string) (*meta.UserInfo, error)
|
||||||
|
DeleteUserFn func(username string) error
|
||||||
|
SetPrivilegeFn func(p influxql.Privilege, username string, dbname string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) Open(path string) error {
|
||||||
|
return m.OpenFn(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) Close() error {
|
||||||
|
return m.CloseFn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateContinuousQuery(query string) (*meta.ContinuousQueryInfo, error) {
|
||||||
|
return m.CreateContinuousQueryFn(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) DropContinuousQuery(query string) error {
|
||||||
|
return m.DropContinuousQueryFn(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) Node(id uint64) (*meta.NodeInfo, error) {
|
||||||
|
return m.NodeFn(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) NodeByHost(host string) (*meta.NodeInfo, error) {
|
||||||
|
return m.NodeByHostFn(host)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateNode(host string) (*meta.NodeInfo, error) {
|
||||||
|
return m.CreateNodeFn(host)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) DeleteNode(id uint64) error {
|
||||||
|
return m.DeleteNodeFn(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) Database(name string) (*meta.DatabaseInfo, error) {
|
||||||
|
return m.DatabaseFn(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
|
||||||
|
return m.CreateDatabaseFn(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
|
||||||
|
return m.CreateDatabaseIfNotExistsFn(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) DropDatabase(name string) error {
|
||||||
|
return m.DropDatabaseFn(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
|
||||||
|
return m.RetentionPolicyFn(database, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateRetentionPolicy(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
||||||
|
return m.CreateRetentionPolicyFn(database, rp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateRetentionPolicyIfNotExists(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
||||||
|
return m.CreateRetentionPolicyIfNotExistsFn(database, rp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) SetDefaultRetentionPolicy(database, name string) error {
|
||||||
|
return m.SetDefaultRetentionPolicyFn(database, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error) {
|
||||||
|
return m.UpdateRetentionPolicyFn(database, name, rpu)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) DeleteRetentionPolicy(database, name string) error {
|
||||||
|
return m.DeleteRetentionPolicyFn(database, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) ShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||||
|
return m.ShardGroupFn(database, policy, timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||||
|
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) DeleteShardGroup(database, policy string, shardID uint64) error {
|
||||||
|
return m.DeleteShardGroupFn(database, policy, shardID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) User(username string) (*meta.UserInfo, error) {
|
||||||
|
return m.UserFn(username)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) CreateUser(username, password string, admin bool) (*meta.UserInfo, error) {
|
||||||
|
return m.CreateUserFn(username, password, admin)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) UpdateUser(username, password string) (*meta.UserInfo, error) {
|
||||||
|
return m.UpdateUserFn(username, password)
|
||||||
|
}
|
||||||
|
func (m MetaStore) DeleteUser(username string) error {
|
||||||
|
return m.DeleteUserFn(username)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MetaStore) SetPrivilege(p influxql.Privilege, username string, dbname string) error {
|
||||||
|
return m.SetPrivilegeFn(p, username, dbname)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
|
||||||
|
|
||||||
|
shards := []meta.ShardInfo{}
|
||||||
|
ownerIDs := []uint64{}
|
||||||
|
for i := 1; i <= nodeCount; i++ {
|
||||||
|
ownerIDs = append(ownerIDs, uint64(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
// each node is fully replicated with each other
|
||||||
|
shards = append(shards, meta.ShardInfo{
|
||||||
|
ID: nextShardID(),
|
||||||
|
OwnerIDs: ownerIDs,
|
||||||
|
})
|
||||||
|
|
||||||
|
rp := &meta.RetentionPolicyInfo{
|
||||||
|
Name: "myrp",
|
||||||
|
ReplicaN: nodeCount,
|
||||||
|
Duration: duration,
|
||||||
|
ShardGroupDuration: duration,
|
||||||
|
ShardGroups: []meta.ShardGroupInfo{
|
||||||
|
meta.ShardGroupInfo{
|
||||||
|
ID: nextShardID(),
|
||||||
|
StartTime: time.Unix(0, 0),
|
||||||
|
EndTime: time.Unix(0, 0).Add(duration).Add(-1),
|
||||||
|
Shards: shards,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return rp
|
||||||
|
}
|
||||||
|
|
||||||
|
func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, ownerIDs []uint64) {
|
||||||
|
var startTime, endTime time.Time
|
||||||
|
if len(rp.ShardGroups) == 0 {
|
||||||
|
startTime = time.Unix(0, 0)
|
||||||
|
} else {
|
||||||
|
startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration)
|
||||||
|
}
|
||||||
|
endTime = startTime.Add(rp.ShardGroupDuration).Add(-1)
|
||||||
|
|
||||||
|
sh := meta.ShardGroupInfo{
|
||||||
|
ID: uint64(len(rp.ShardGroups) + 1),
|
||||||
|
StartTime: startTime,
|
||||||
|
EndTime: endTime,
|
||||||
|
Shards: []meta.ShardInfo{
|
||||||
|
meta.ShardInfo{
|
||||||
|
ID: nextShardID(),
|
||||||
|
OwnerIDs: ownerIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rp.ShardGroups = append(rp.ShardGroups, sh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func nextShardID() uint64 {
|
||||||
|
return atomic.AddUint64(&shardID, 1)
|
||||||
|
}
|
||||||
|
|
|
@ -207,7 +207,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we're not chunking, this will be the in memory buffer for all results before sending to client
|
// if we're not chunking, this will be the in memory buffer for all results before sending to client
|
||||||
resp := influxdb.Response{Results: make([]*influxql.Result, 0)}
|
resp := Response{Results: make([]*influxql.Result, 0)}
|
||||||
statusWritten := false
|
statusWritten := false
|
||||||
|
|
||||||
// pull all results from the channel
|
// pull all results from the channel
|
||||||
|
@ -233,7 +233,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
||||||
|
|
||||||
// Write out result immediately if chunked.
|
// Write out result immediately if chunked.
|
||||||
if chunked {
|
if chunked {
|
||||||
w.Write(MarshalJSON(influxdb.Response{
|
w.Write(MarshalJSON(Response{
|
||||||
Results: []*influxql.Result{r},
|
Results: []*influxql.Result{r},
|
||||||
}, pretty))
|
}, pretty))
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
|
@ -494,7 +494,7 @@ func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
|
||||||
w.Header().Add("content-type", "application/json")
|
w.Header().Add("content-type", "application/json")
|
||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
|
|
||||||
response := influxdb.Response{Err: errors.New(error)}
|
response := Response{Err: errors.New(error)}
|
||||||
var b []byte
|
var b []byte
|
||||||
if pretty {
|
if pretty {
|
||||||
b, _ = json.MarshalIndent(response, "", " ")
|
b, _ = json.MarshalIndent(response, "", " ")
|
||||||
|
@ -676,6 +676,61 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []*influxql.Result
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON encodes a Response struct into JSON.
|
||||||
|
func (r Response) MarshalJSON() ([]byte, error) {
|
||||||
|
// Define a struct that outputs "error" as a string.
|
||||||
|
var o struct {
|
||||||
|
Results []*influxql.Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy fields to output struct.
|
||||||
|
o.Results = r.Results
|
||||||
|
if r.Err != nil {
|
||||||
|
o.Err = r.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Response struct
|
||||||
|
func (r *Response) UnmarshalJSON(b []byte) error {
|
||||||
|
var o struct {
|
||||||
|
Results []*influxql.Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err := json.Unmarshal(b, &o)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Results = o.Results
|
||||||
|
if o.Err != "" {
|
||||||
|
r.Err = errors.New(o.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// Returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != nil {
|
||||||
|
return r.Err
|
||||||
|
}
|
||||||
|
for _, rr := range r.Results {
|
||||||
|
if rr.Err != nil {
|
||||||
|
return rr.Err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
FIXME: Convert to line protocol format.
|
FIXME: Convert to line protocol format.
|
||||||
|
|
||||||
|
@ -769,7 +824,7 @@ func (h *Handler) showMeasurements(db string, user *meta.UserInfo) ([]string, er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return measurements, err
|
return measurements, err
|
||||||
}
|
}
|
||||||
results := influxdb.Response{}
|
results := Response{}
|
||||||
|
|
||||||
for r := range c {
|
for r := range c {
|
||||||
results.Results = append(results.Results, r)
|
results.Results = append(results.Results, r)
|
||||||
|
|
|
@ -1287,7 +1287,7 @@ func TestHandler_serveWriteSeriesNonZeroTime(t *testing.T) {
|
||||||
t.Errorf("unexpected status: %d", status)
|
t.Errorf("unexpected status: %d", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &influxdb.Response{}
|
r := &httpd.Response{}
|
||||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||||
t.Logf("query : %s\n", query)
|
t.Logf("query : %s\n", query)
|
||||||
t.Log(body)
|
t.Log(body)
|
||||||
|
@ -1332,7 +1332,7 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
|
||||||
t.Errorf("unexpected status: %d", status)
|
t.Errorf("unexpected status: %d", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &influxdb.Response{}
|
r := &httpd.Response{}
|
||||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||||
t.Logf("query : %s\n", query)
|
t.Logf("query : %s\n", query)
|
||||||
t.Log(body)
|
t.Log(body)
|
||||||
|
@ -1418,7 +1418,7 @@ func TestHandler_serveWriteSeriesBatch(t *testing.T) {
|
||||||
t.Errorf("unexpected status: %d", status)
|
t.Errorf("unexpected status: %d", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &influxdb.Response{}
|
r := &httpd.Response{}
|
||||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||||
t.Logf("query : %s\n", query)
|
t.Logf("query : %s\n", query)
|
||||||
t.Log(body)
|
t.Log(body)
|
||||||
|
@ -1458,7 +1458,7 @@ func TestHandler_serveWriteSeriesFieldTypeConflict(t *testing.T) {
|
||||||
t.Errorf("unexpected status: %d", status)
|
t.Errorf("unexpected status: %d", status)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &influxdb.Response{}
|
r := &httpd.Response{}
|
||||||
if err := json.Unmarshal([]byte(body), r); err != nil {
|
if err := json.Unmarshal([]byte(body), r); err != nil {
|
||||||
t.Log(body)
|
t.Log(body)
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -1559,7 +1559,7 @@ func TestHandler_ChunkedResponses(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error reading response: %s", err.Error())
|
t.Fatalf("error reading response: %s", err.Error())
|
||||||
}
|
}
|
||||||
response := &influxdb.Response{}
|
response := &httpd.Response{}
|
||||||
err = json.Unmarshal(chunk[0:n], response)
|
err = json.Unmarshal(chunk[0:n], response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error unmarshaling resultsz: %s", err.Error())
|
t.Fatalf("error unmarshaling resultsz: %s", err.Error())
|
||||||
|
|
205
remote_mapper.go
205
remote_mapper.go
|
@ -1,205 +0,0 @@
|
||||||
package influxdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/influxql"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
MAX_MAP_RESPONSE_SIZE = 1024 * 1024 * 1024
|
|
||||||
)
|
|
||||||
|
|
||||||
// RemoteMapper implements the influxql.Mapper interface. The engine uses the remote mapper
|
|
||||||
// to pull map results from shards that only exist on other servers in the cluster.
|
|
||||||
type RemoteMapper struct {
|
|
||||||
dataNodes Balancer
|
|
||||||
resp *http.Response
|
|
||||||
results chan interface{}
|
|
||||||
unmarshal influxql.UnmarshalFunc
|
|
||||||
complete bool
|
|
||||||
decoder *json.Decoder
|
|
||||||
|
|
||||||
Call string `json:",omitempty"`
|
|
||||||
Database string `json:",omitempty"`
|
|
||||||
MeasurementName string `json:",omitempty"`
|
|
||||||
TMin int64 `json:",omitempty"`
|
|
||||||
TMax int64 `json:",omitempty"`
|
|
||||||
SeriesIDs []uint64 `json:",omitempty"`
|
|
||||||
ShardID uint64 `json:",omitempty"`
|
|
||||||
Filters []string `json:",omitempty"`
|
|
||||||
// WhereFields []*Field `json:",omitempty"`
|
|
||||||
// SelectFields []*Field `json:",omitempty"`
|
|
||||||
SelectTags []string `json:",omitempty"`
|
|
||||||
Limit int `json:",omitempty"`
|
|
||||||
Offset int `json:",omitempty"`
|
|
||||||
Interval int64 `json:",omitempty"`
|
|
||||||
ChunkSize int `json:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Responses get streamed back to the remote mapper from the remote machine that runs a local mapper
|
|
||||||
type MapResponse struct {
|
|
||||||
Err string `json:",omitempty"`
|
|
||||||
Data []byte
|
|
||||||
Completed bool `json:",omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open is a no op, real work is done starting with Being
|
|
||||||
func (m *RemoteMapper) Open() error { return nil }
|
|
||||||
|
|
||||||
// Close the response body
|
|
||||||
func (m *RemoteMapper) Close() {
|
|
||||||
if m.resp != nil && m.resp.Body != nil {
|
|
||||||
m.resp.Body.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Begin sends a request to the remote server to start streaming map results
|
|
||||||
func (m *RemoteMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error {
|
|
||||||
// get the function for unmarshaling results
|
|
||||||
f, err := influxql.InitializeUnmarshaller(c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.unmarshal = f
|
|
||||||
|
|
||||||
if c != nil {
|
|
||||||
m.Call = c.String()
|
|
||||||
}
|
|
||||||
m.ChunkSize = chunkSize
|
|
||||||
m.TMin = startingTime
|
|
||||||
|
|
||||||
// send the request to map to the remote server
|
|
||||||
b, err := json.Marshal(m)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var resp *http.Response
|
|
||||||
for {
|
|
||||||
node := m.dataNodes.Next()
|
|
||||||
if node == nil {
|
|
||||||
// no data nodes are available to service this query
|
|
||||||
return ErrNoDataNodeAvailable
|
|
||||||
}
|
|
||||||
|
|
||||||
// request to start streaming results
|
|
||||||
resp, err = http.Post("http://"+node.Host+"/data/run_mapper", "application/json", bytes.NewReader(b))
|
|
||||||
if err != nil {
|
|
||||||
warn("NODE DOWN")
|
|
||||||
// node.Down()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Mark the node as up
|
|
||||||
warn("NODE UP")
|
|
||||||
// node.Up()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
m.resp = resp
|
|
||||||
lr := io.LimitReader(m.resp.Body, MAX_MAP_RESPONSE_SIZE)
|
|
||||||
m.decoder = json.NewDecoder(lr)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NextInterval is part of the mapper interface. In this case we read the next chunk from the remote mapper
|
|
||||||
func (m *RemoteMapper) NextInterval() (interface{}, error) {
|
|
||||||
// just return nil if the mapper has completed its run
|
|
||||||
if m.complete {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
mr := &MapResponse{}
|
|
||||||
err := m.decoder.Decode(&mr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if mr.Err != "" {
|
|
||||||
return nil, errors.New(mr.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// if it's a complete message, we've emptied this mapper of all data
|
|
||||||
if mr.Completed {
|
|
||||||
m.complete = true
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// marshal the data that came from the MapFN
|
|
||||||
v, err := m.unmarshal(mr.Data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CallExpr will parse the Call string into an expression or return nil
|
|
||||||
func (m *RemoteMapper) CallExpr() (*influxql.Call, error) {
|
|
||||||
if m.Call == "" {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := influxql.ParseExpr(m.Call)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
call, ok := c.(*influxql.Call)
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("unable to marshal aggregate call")
|
|
||||||
}
|
|
||||||
return call, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FilterExprs will parse the filter strings and return any expressions. This array
|
|
||||||
// will be the same size as the SeriesIDs array with each element having a filter (which could be nil)
|
|
||||||
func (m *RemoteMapper) FilterExprs() []influxql.Expr {
|
|
||||||
exprs := make([]influxql.Expr, len(m.SeriesIDs), len(m.SeriesIDs))
|
|
||||||
|
|
||||||
// if filters is empty, they're all nil. if filters has one element, all filters
|
|
||||||
// should be set to that. Otherwise marshal each filter
|
|
||||||
if len(m.Filters) == 1 {
|
|
||||||
f, _ := influxql.ParseExpr(m.Filters[0])
|
|
||||||
for i, _ := range exprs {
|
|
||||||
exprs[i] = f
|
|
||||||
}
|
|
||||||
} else if len(m.Filters) > 1 {
|
|
||||||
for i, s := range m.Filters {
|
|
||||||
f, _ := influxql.ParseExpr(s)
|
|
||||||
exprs[i] = f
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return exprs
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetFilters will convert the given arrray of filters into filters that can be marshaled and sent to the remote system
|
|
||||||
func (m *RemoteMapper) SetFilters(filters []influxql.Expr) {
|
|
||||||
l := filters[0]
|
|
||||||
allFiltersTheSame := true
|
|
||||||
for _, f := range filters {
|
|
||||||
if l != f {
|
|
||||||
allFiltersTheSame = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we don't need anything if they're all the same and nil
|
|
||||||
if l == nil && allFiltersTheSame {
|
|
||||||
return
|
|
||||||
} else if allFiltersTheSame { // just set one filter element since they're all the same
|
|
||||||
m.Filters = []string{l.String()}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// marshal all of them since there are different ones
|
|
||||||
m.Filters = make([]string, len(filters), len(filters))
|
|
||||||
for i, f := range filters {
|
|
||||||
m.Filters[i] = f.String()
|
|
||||||
}
|
|
||||||
}
|
|
1467
server_test.go
1467
server_test.go
File diff suppressed because it is too large
Load Diff
116
stats.go
116
stats.go
|
@ -6,98 +6,60 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Int representes a 64-bit signed integer which can be updated atomically.
|
// Stats represents a collection of metrics as key-value pairs.
|
||||||
type Int struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
i int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewInt returns a new Int
|
|
||||||
func NewInt(v int64) *Int {
|
|
||||||
return &Int{i: v}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add atomically adds the given delta to the Int.
|
|
||||||
func (i *Int) Add(delta int64) {
|
|
||||||
i.mu.Lock()
|
|
||||||
defer i.mu.Unlock()
|
|
||||||
i.i += delta
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stats represents a collection of metrics, as key-value pairs.
|
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
name string
|
mu sync.RWMutex
|
||||||
m map[string]*Int
|
name string
|
||||||
mu sync.RWMutex
|
values map[string]int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStats returns a Stats object with the given name.
|
// NewStats returns a Stats object with the given name.
|
||||||
func NewStats(name string) *Stats {
|
func NewStats(name string) *Stats {
|
||||||
return &Stats{
|
return &Stats{
|
||||||
name: name,
|
name: name,
|
||||||
m: make(map[string]*Int),
|
values: make(map[string]int64),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds delta to the stat indiciated by key.
|
// Name returns the name of the Stats object.
|
||||||
func (s *Stats) Add(key string, delta int64) {
|
func (s *Stats) Name() string { return s.name }
|
||||||
s.mu.RLock()
|
|
||||||
i, ok := s.m[key]
|
|
||||||
s.mu.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
// check again under the write lock
|
|
||||||
s.mu.Lock()
|
|
||||||
i, ok = s.m[key]
|
|
||||||
if !ok {
|
|
||||||
i = new(Int)
|
|
||||||
s.m[key] = i
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
i.Add(delta)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inc simply increments the given key by 1.
|
|
||||||
func (s *Stats) Inc(key string) {
|
|
||||||
s.Add(key, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns a value for a given key.
|
// Get returns a value for a given key.
|
||||||
func (s *Stats) Get(key string) int64 {
|
func (s *Stats) Get(key string) int64 {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
v := s.values[key]
|
||||||
return s.m[key].i
|
s.mu.RUnlock()
|
||||||
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set sets a value for the given key.
|
// Set sets a value for the given key.
|
||||||
func (s *Stats) Set(key string, v int64) {
|
func (s *Stats) Set(key string, v int64) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
s.values[key] = v
|
||||||
s.m[key] = NewInt(v)
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name returns the name of the Stats object.
|
// Add adds delta to the stat indiciated by key.
|
||||||
func (s *Stats) Name() string {
|
func (s *Stats) Add(key string, delta int64) {
|
||||||
return s.name
|
s.mu.Lock()
|
||||||
|
s.values[key] += delta
|
||||||
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk calls f for each entry in the stats. The stats are locked
|
// Inc simply increments the given key by 1.
|
||||||
// during the walk but existing entries may be concurrently updated.
|
func (s *Stats) Inc(key string) { s.Add(key, 1) }
|
||||||
func (s *Stats) Walk(f func(string, int64)) {
|
|
||||||
|
// Walk calls f for each entry in the stats.
|
||||||
|
func (s *Stats) Walk(fn func(string, int64)) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
for k, v := range s.values {
|
||||||
for k, v := range s.m {
|
fn(k, v)
|
||||||
v.mu.RLock()
|
|
||||||
f(k, v.i)
|
|
||||||
v.mu.RUnlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Diff returns the difference between two sets of stats. The result is undefined
|
// Diff returns the difference between two sets of stats.
|
||||||
// if the two Stats objects do not contain the same keys.
|
// The result is undefined if the two Stats objects do not contain the same keys.
|
||||||
func (s *Stats) Diff(other *Stats) *Stats {
|
func (s *Stats) Diff(other *Stats) *Stats {
|
||||||
diff := NewStats(s.name)
|
diff := NewStats(s.name)
|
||||||
s.Walk(func(k string, v int64) {
|
s.Walk(func(k string, v int64) {
|
||||||
|
@ -106,30 +68,30 @@ func (s *Stats) Diff(other *Stats) *Stats {
|
||||||
return diff
|
return diff
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot returns a copy of the stats object. Addition and removal of stats keys
|
// Clone returns a copy of the stats object.
|
||||||
// is blocked during the created of the snapshot, but existing entries may be
|
func (s *Stats) Clone() *Stats {
|
||||||
// concurrently updated.
|
other := NewStats(s.name)
|
||||||
func (s *Stats) Snapshot() *Stats {
|
|
||||||
snap := NewStats(s.name)
|
|
||||||
s.Walk(func(k string, v int64) {
|
s.Walk(func(k string, v int64) {
|
||||||
snap.Set(k, s.m[k].i)
|
other.Set(k, s.values[k])
|
||||||
})
|
})
|
||||||
return snap
|
return other
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Stats) String() string {
|
func (s *Stats) String() string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
var out string
|
var out string
|
||||||
stat := s.Snapshot()
|
|
||||||
var keys []string
|
var keys []string
|
||||||
for k, _ := range stat.m {
|
for k, _ := range s.values {
|
||||||
keys = append(keys, k)
|
keys = append(keys, k)
|
||||||
}
|
}
|
||||||
sort.Strings(keys)
|
sort.Strings(keys)
|
||||||
out += `{"` + stat.name + `":[`
|
out += `{"` + s.name + `":[`
|
||||||
|
|
||||||
var j int
|
var j int
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
v := stat.m[k].i
|
out += fmt.Sprintf(`{"%s":%d}`, k, s.values[k])
|
||||||
out += `{"` + k + `":` + fmt.Sprintf("%d", v) + `}`
|
|
||||||
j++
|
j++
|
||||||
if j != len(keys) {
|
if j != len(keys) {
|
||||||
out += `,`
|
out += `,`
|
||||||
|
|
|
@ -2,20 +2,20 @@ package influxdb_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/influxdb/influxdb"
|
"github.com/influxdb/influxdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStats_SetAndGet(t *testing.T) {
|
// Ensure the stats can set and retrieve a value for a key.
|
||||||
|
func TestStats_Set(t *testing.T) {
|
||||||
s := influxdb.NewStats("foo")
|
s := influxdb.NewStats("foo")
|
||||||
|
|
||||||
s.Set("a", 100)
|
s.Set("a", 100)
|
||||||
if s.Get("a") != 100 {
|
if s.Get("a") != 100 {
|
||||||
t.Fatalf("stats set failed, expected 100, got %d", s.Get("a"))
|
t.Fatalf("stats set failed, expected 100, got %d", s.Get("a"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that stats can add a value to a key.
|
||||||
func TestStats_Add(t *testing.T) {
|
func TestStats_Add(t *testing.T) {
|
||||||
s := influxdb.NewStats("foo")
|
s := influxdb.NewStats("foo")
|
||||||
|
|
||||||
|
@ -25,6 +25,17 @@ func TestStats_Add(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that stats can subtract by adding a negative number.
|
||||||
|
func TestStats_Add_Negative(t *testing.T) {
|
||||||
|
s := influxdb.NewStats("foo")
|
||||||
|
|
||||||
|
s.Add("a", -200)
|
||||||
|
if s.Get("a") != -200 {
|
||||||
|
t.Fatalf("stats set failed, expected -200, got %d", s.Get("a"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that stats can increment a value by 1.
|
||||||
func TestStats_Inc(t *testing.T) {
|
func TestStats_Inc(t *testing.T) {
|
||||||
s := influxdb.NewStats("foo")
|
s := influxdb.NewStats("foo")
|
||||||
|
|
||||||
|
@ -40,15 +51,6 @@ func TestStats_Inc(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStats_AddNegative(t *testing.T) {
|
|
||||||
s := influxdb.NewStats("foo")
|
|
||||||
|
|
||||||
s.Add("a", -200)
|
|
||||||
if s.Get("a") != -200 {
|
|
||||||
t.Fatalf("stats set failed, expected -200, got %d", s.Get("a"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStats_SetAndAdd(t *testing.T) {
|
func TestStats_SetAndAdd(t *testing.T) {
|
||||||
s := influxdb.NewStats("foo")
|
s := influxdb.NewStats("foo")
|
||||||
|
|
||||||
|
@ -78,55 +80,24 @@ func TestStats_Diff(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStats_Snapshot(t *testing.T) {
|
func TestStats_Snapshot(t *testing.T) {
|
||||||
foo := influxdb.NewStats("server")
|
s := influxdb.NewStats("server")
|
||||||
foo.Set("a", 100)
|
s.Set("a", 100)
|
||||||
foo.Set("b", 600)
|
s.Set("b", 600)
|
||||||
|
|
||||||
bar := foo.Snapshot()
|
other := s.Clone()
|
||||||
if bar.Name() != "server" || bar.Get("a") != 100 || bar.Get("b") != 600 {
|
if other.Name() != "server" || other.Get("a") != 100 || other.Get("b") != 600 {
|
||||||
t.Fatalf("stats snapshot returned unexpected result: %#v", bar)
|
t.Fatalf("stats snapshot returned unexpected result: %#v", other)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStats_String(t *testing.T) {
|
func TestStats_String(t *testing.T) {
|
||||||
foo := influxdb.NewStats("server")
|
s := influxdb.NewStats("server")
|
||||||
foo.Set("a", 100)
|
s.Set("a", 100)
|
||||||
foo.Set("b", 600)
|
s.Set("b", 600)
|
||||||
|
|
||||||
if exp, got := `{"server":[{"a":100},{"b":600}]}`, foo.String(); exp != got {
|
if exp, got := `{"server":[{"a":100},{"b":600}]}`, s.String(); exp != got {
|
||||||
t.Log("exp: ", exp)
|
t.Log("exp: ", exp)
|
||||||
t.Log("got: ", got)
|
t.Log("got: ", got)
|
||||||
t.Fatalf("failed to get string")
|
t.Fatalf("failed to get string")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestStats_RaceCheck is meant to be run with race-detection enabled.
|
|
||||||
func TestStats_RaceCheck(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping long-runnning Stats race checking")
|
|
||||||
}
|
|
||||||
|
|
||||||
foo := influxdb.NewStats("server")
|
|
||||||
foo.Set("a", 100)
|
|
||||||
|
|
||||||
walker := func(s string, i int64) {
|
|
||||||
if i == -1 {
|
|
||||||
return // Will never happen.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
foo.Add("a", 1)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
foo.Walk(walker)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
t.Log("TestStats_RaceCheck completed")
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,210 +0,0 @@
|
||||||
package test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/influxdb/influxdb/influxql"
|
|
||||||
"github.com/influxdb/influxdb/meta"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
shardID uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
type MetaStore struct {
|
|
||||||
OpenFn func(path string) error
|
|
||||||
CloseFn func() error
|
|
||||||
|
|
||||||
CreateContinuousQueryFn func(query string) (*meta.ContinuousQueryInfo, error)
|
|
||||||
DropContinuousQueryFn func(query string) error
|
|
||||||
|
|
||||||
NodeFn func(id uint64) (*meta.NodeInfo, error)
|
|
||||||
NodeByHostFn func(host string) (*meta.NodeInfo, error)
|
|
||||||
CreateNodeFn func(host string) (*meta.NodeInfo, error)
|
|
||||||
DeleteNodeFn func(id uint64) error
|
|
||||||
|
|
||||||
DatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
|
||||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
|
||||||
CreateDatabaseIfNotExistsFn func(name string) (*meta.DatabaseInfo, error)
|
|
||||||
DropDatabaseFn func(name string) error
|
|
||||||
|
|
||||||
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
|
|
||||||
CreateRetentionPolicyFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
|
||||||
CreateRetentionPolicyIfNotExistsFn func(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
|
||||||
SetDefaultRetentionPolicyFn func(database, name string) error
|
|
||||||
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error)
|
|
||||||
DeleteRetentionPolicyFn func(database, name string) error
|
|
||||||
|
|
||||||
ShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
|
||||||
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
|
||||||
DeleteShardGroupFn func(database, policy string, shardID uint64) error
|
|
||||||
|
|
||||||
UserFn func(username string) (*meta.UserInfo, error)
|
|
||||||
CreateUserFn func(username, password string, admin bool) (*meta.UserInfo, error)
|
|
||||||
UpdateUserFn func(username, password string) (*meta.UserInfo, error)
|
|
||||||
DeleteUserFn func(username string) error
|
|
||||||
SetPrivilegeFn func(p influxql.Privilege, username string, dbname string) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) Open(path string) error {
|
|
||||||
return m.OpenFn(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) Close() error {
|
|
||||||
return m.CloseFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateContinuousQuery(query string) (*meta.ContinuousQueryInfo, error) {
|
|
||||||
return m.CreateContinuousQueryFn(query)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) DropContinuousQuery(query string) error {
|
|
||||||
return m.DropContinuousQueryFn(query)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) Node(id uint64) (*meta.NodeInfo, error) {
|
|
||||||
return m.NodeFn(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) NodeByHost(host string) (*meta.NodeInfo, error) {
|
|
||||||
return m.NodeByHostFn(host)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateNode(host string) (*meta.NodeInfo, error) {
|
|
||||||
return m.CreateNodeFn(host)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) DeleteNode(id uint64) error {
|
|
||||||
return m.DeleteNodeFn(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) Database(name string) (*meta.DatabaseInfo, error) {
|
|
||||||
return m.DatabaseFn(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
|
|
||||||
return m.CreateDatabaseFn(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
|
|
||||||
return m.CreateDatabaseIfNotExistsFn(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) DropDatabase(name string) error {
|
|
||||||
return m.DropDatabaseFn(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
|
|
||||||
return m.RetentionPolicyFn(database, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateRetentionPolicy(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
|
||||||
return m.CreateRetentionPolicyFn(database, rp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateRetentionPolicyIfNotExists(database string, rp *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
|
||||||
return m.CreateRetentionPolicyIfNotExistsFn(database, rp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) SetDefaultRetentionPolicy(database, name string) error {
|
|
||||||
return m.SetDefaultRetentionPolicyFn(database, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) (*meta.RetentionPolicyInfo, error) {
|
|
||||||
return m.UpdateRetentionPolicyFn(database, name, rpu)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) DeleteRetentionPolicy(database, name string) error {
|
|
||||||
return m.DeleteRetentionPolicyFn(database, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) ShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
|
||||||
return m.ShardGroupFn(database, policy, timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
|
||||||
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) DeleteShardGroup(database, policy string, shardID uint64) error {
|
|
||||||
return m.DeleteShardGroupFn(database, policy, shardID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) User(username string) (*meta.UserInfo, error) {
|
|
||||||
return m.UserFn(username)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) CreateUser(username, password string, admin bool) (*meta.UserInfo, error) {
|
|
||||||
return m.CreateUserFn(username, password, admin)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) UpdateUser(username, password string) (*meta.UserInfo, error) {
|
|
||||||
return m.UpdateUserFn(username, password)
|
|
||||||
}
|
|
||||||
func (m MetaStore) DeleteUser(username string) error {
|
|
||||||
return m.DeleteUserFn(username)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m MetaStore) SetPrivilege(p influxql.Privilege, username string, dbname string) error {
|
|
||||||
return m.SetPrivilegeFn(p, username, dbname)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
|
|
||||||
|
|
||||||
shards := []meta.ShardInfo{}
|
|
||||||
ownerIDs := []uint64{}
|
|
||||||
for i := 1; i <= nodeCount; i++ {
|
|
||||||
ownerIDs = append(ownerIDs, uint64(i))
|
|
||||||
}
|
|
||||||
|
|
||||||
// each node is fully replicated with each other
|
|
||||||
shards = append(shards, meta.ShardInfo{
|
|
||||||
ID: nextShardID(),
|
|
||||||
OwnerIDs: ownerIDs,
|
|
||||||
})
|
|
||||||
|
|
||||||
rp := &meta.RetentionPolicyInfo{
|
|
||||||
Name: "myrp",
|
|
||||||
ReplicaN: nodeCount,
|
|
||||||
Duration: duration,
|
|
||||||
ShardGroupDuration: duration,
|
|
||||||
ShardGroups: []meta.ShardGroupInfo{
|
|
||||||
meta.ShardGroupInfo{
|
|
||||||
ID: nextShardID(),
|
|
||||||
StartTime: time.Unix(0, 0),
|
|
||||||
EndTime: time.Unix(0, 0).Add(duration).Add(-1),
|
|
||||||
Shards: shards,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return rp
|
|
||||||
}
|
|
||||||
|
|
||||||
func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, ownerIDs []uint64) {
|
|
||||||
var startTime, endTime time.Time
|
|
||||||
if len(rp.ShardGroups) == 0 {
|
|
||||||
startTime = time.Unix(0, 0)
|
|
||||||
} else {
|
|
||||||
startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration)
|
|
||||||
}
|
|
||||||
endTime = startTime.Add(rp.ShardGroupDuration).Add(-1)
|
|
||||||
|
|
||||||
sh := meta.ShardGroupInfo{
|
|
||||||
ID: uint64(len(rp.ShardGroups) + 1),
|
|
||||||
StartTime: startTime,
|
|
||||||
EndTime: endTime,
|
|
||||||
Shards: []meta.ShardInfo{
|
|
||||||
meta.ShardInfo{
|
|
||||||
ID: nextShardID(),
|
|
||||||
OwnerIDs: ownerIDs,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
rp.ShardGroups = append(rp.ShardGroups, sh)
|
|
||||||
}
|
|
||||||
|
|
||||||
func nextShardID() uint64 {
|
|
||||||
return atomic.AddUint64(&shardID, 1)
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package influxdb
|
package tsdb
|
||||||
|
|
||||||
/*
|
/*
|
||||||
import (
|
import (
|
|
@ -1,4 +1,4 @@
|
||||||
package influxdb_test
|
package tsdb_test
|
||||||
|
|
||||||
/*
|
/*
|
||||||
import (
|
import (
|
Loading…
Reference in New Issue