influxdb/server.go

2928 lines
75 KiB
Go
Raw Normal View History

2014-10-22 05:32:19 +00:00
package influxdb
import (
"bytes"
2014-10-24 00:54:12 +00:00
"encoding/json"
"errors"
2014-10-22 05:32:19 +00:00
"fmt"
"io"
"log"
"net/http"
2014-12-29 23:12:51 +00:00
"net/url"
2014-10-24 05:38:03 +00:00
"os"
2014-11-05 05:32:17 +00:00
"path/filepath"
2014-12-23 06:18:05 +00:00
"regexp"
2014-10-27 23:31:45 +00:00
"sort"
"strconv"
"strings"
2014-10-22 05:32:19 +00:00
"sync"
"time"
2015-01-13 17:16:43 +00:00
"github.com/influxdb/influxdb/influxql"
2014-10-22 05:32:19 +00:00
"github.com/influxdb/influxdb/messaging"
"golang.org/x/crypto/bcrypt"
2014-10-22 05:32:19 +00:00
)
2014-10-30 00:21:17 +00:00
const (
// DefaultRootPassword is the password initially set for the root user.
// It is also used when reseting the root user's password.
DefaultRootPassword = "root"
// DefaultRetentionPolicyName is the name of a databases's default shard space.
DefaultRetentionPolicyName = "default"
2014-10-30 00:21:17 +00:00
// DefaultSplitN represents the number of partitions a shard is split into.
DefaultSplitN = 1
// DefaultReplicaN represents the number of replicas data is written to.
DefaultReplicaN = 1
// DefaultShardDuration is the time period held by a shard.
DefaultShardDuration = 7 * (24 * time.Hour)
// DefaultShardRetention is the length of time before a shard is dropped.
2015-01-28 00:42:23 +00:00
DefaultShardRetention = 7 * (24 * time.Hour)
2014-10-30 00:21:17 +00:00
)
2014-10-24 23:45:02 +00:00
const (
2014-12-30 15:50:15 +00:00
// Data node messages
createDataNodeMessageType = messaging.MessageType(0x00)
deleteDataNodeMessageType = messaging.MessageType(0x01)
2014-12-23 06:18:05 +00:00
2014-12-29 23:12:51 +00:00
// Database messages
createDatabaseMessageType = messaging.MessageType(0x10)
deleteDatabaseMessageType = messaging.MessageType(0x11)
2014-12-23 06:18:05 +00:00
2014-12-29 23:12:51 +00:00
// Retention policy messages
createRetentionPolicyMessageType = messaging.MessageType(0x20)
updateRetentionPolicyMessageType = messaging.MessageType(0x21)
deleteRetentionPolicyMessageType = messaging.MessageType(0x22)
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x23)
2014-12-23 06:18:05 +00:00
2014-12-29 23:12:51 +00:00
// User messages
createUserMessageType = messaging.MessageType(0x30)
updateUserMessageType = messaging.MessageType(0x31)
deleteUserMessageType = messaging.MessageType(0x32)
2014-12-23 06:18:05 +00:00
2014-12-29 23:12:51 +00:00
// Shard messages
2015-01-10 15:48:50 +00:00
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
2015-02-10 23:43:03 +00:00
deleteShardGroupMessageType = messaging.MessageType(0x41)
2014-12-29 23:12:51 +00:00
// Series messages
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
2015-02-13 22:24:24 +00:00
// Measurement messages
2015-02-13 23:10:46 +00:00
createFieldsIfNotExistsMessageType = messaging.MessageType(0x60)
2015-02-13 22:24:24 +00:00
2015-01-10 20:22:57 +00:00
// Write series data messages (per-topic)
writeRawSeriesMessageType = messaging.MessageType(0x80)
// Privilege messages
setPrivilegeMessageType = messaging.MessageType(0x90)
2014-10-24 23:45:02 +00:00
)
2014-10-22 05:32:19 +00:00
// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
rpDone chan struct{} // retention policies goroutine close notification
2014-10-24 05:38:03 +00:00
2014-10-24 23:45:02 +00:00
client MessagingClient // broker client
index uint64 // highest broadcast index seen
errors map[uint64]error // message errors
2014-10-24 00:54:12 +00:00
2014-11-05 05:32:17 +00:00
meta *metastore // metadata store
2014-12-30 15:50:15 +00:00
dataNodes map[uint64]*DataNode // data nodes by id
2015-01-10 15:48:50 +00:00
databases map[string]*database // databases by name
users map[string]*User // user by name
2015-01-26 12:19:35 +00:00
2015-01-27 02:14:07 +00:00
shards map[uint64]*Shard // shards by shard id
shardsBySeriesID map[uint32][]*Shard // shards by series id
2015-01-29 23:07:58 +00:00
Logger *log.Logger
2015-02-05 21:04:28 +00:00
authenticationEnabled bool
2014-10-22 05:32:19 +00:00
}
// NewServer returns a new instance of Server.
func NewServer() *Server {
2015-02-05 21:04:28 +00:00
s := Server{
2015-01-10 15:48:50 +00:00
meta: &metastore{},
2015-01-10 20:22:57 +00:00
errors: make(map[uint64]error),
2015-01-10 15:48:50 +00:00
dataNodes: make(map[uint64]*DataNode),
databases: make(map[string]*database),
users: make(map[string]*User),
2015-01-26 12:19:35 +00:00
shards: make(map[uint64]*Shard),
2015-01-27 02:14:07 +00:00
shardsBySeriesID: make(map[uint32][]*Shard),
2015-01-29 23:07:58 +00:00
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
2014-10-22 05:32:19 +00:00
}
// Server will always return with authentication enabled.
// This ensures that disabling authentication must be an explicit decision.
// To set the server to 'authless mode', call server.SetAuthenticationEnabled(false).
s.authenticationEnabled = true
2015-02-05 21:04:28 +00:00
return &s
2014-10-22 05:32:19 +00:00
}
// SetAuthenticationEnabled turns on or off server authentication
func (s *Server) SetAuthenticationEnabled(enabled bool) {
s.authenticationEnabled = enabled
}
2014-12-30 22:46:50 +00:00
// ID returns the data node id for the server.
// Returns zero if the server is closed or the server has not joined a cluster.
func (s *Server) ID() uint64 {
2015-02-11 00:51:01 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-30 22:46:50 +00:00
return s.id
}
2015-02-06 20:52:28 +00:00
// Index returns the index for the server.
func (s *Server) Index() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
2015-02-06 20:52:28 +00:00
return s.index
}
2014-10-23 04:21:48 +00:00
// Path returns the path used when opening the server.
// Returns an empty string when the server is closed.
2014-12-30 22:46:50 +00:00
func (s *Server) Path() string {
2015-02-11 00:53:43 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-30 22:46:50 +00:00
return s.path
}
2014-10-23 04:21:48 +00:00
// shardPath returns the path for a shard.
func (s *Server) shardPath(id uint64) string {
if s.path == "" {
return ""
}
return filepath.Join(s.path, "shards", strconv.FormatUint(id, 10))
}
2015-01-28 06:51:08 +00:00
// metaPath returns the path for the metastore.
func (s *Server) metaPath() string {
if s.path == "" {
return ""
}
return filepath.Join(s.path, "meta")
}
2015-01-29 23:07:58 +00:00
// SetLogOutput sets writer for all Server log output.
func (s *Server) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[server] ", log.LstdFlags)
}
2014-10-22 05:32:19 +00:00
// Open initializes the server from a given path.
func (s *Server) Open(path string) error {
2014-10-24 00:54:12 +00:00
// Ensure the server isn't already open and there's a path provided.
2014-10-23 04:21:48 +00:00
if s.opened() {
return ErrServerOpen
} else if path == "" {
return ErrPathRequired
}
2015-01-28 06:51:08 +00:00
// Set the server path.
s.path = path
// Create required directories.
2014-11-05 05:32:17 +00:00
if err := os.MkdirAll(path, 0700); err != nil {
return err
}
if err := os.MkdirAll(filepath.Join(path, "shards"), 0700); err != nil {
return err
}
2014-11-05 05:32:17 +00:00
// Open metadata store.
2015-01-28 06:51:08 +00:00
if err := s.meta.open(s.metaPath()); err != nil {
2014-11-05 05:32:17 +00:00
return fmt.Errorf("meta: %s", err)
}
// Load state from metastore.
if err := s.load(); err != nil {
return fmt.Errorf("load: %s", err)
}
2015-01-26 12:19:35 +00:00
// TODO: Open shard data stores.
// TODO: Associate series ids with shards.
2014-10-22 05:32:19 +00:00
return nil
}
2014-10-23 04:21:48 +00:00
// opened returns true when the server is open.
func (s *Server) opened() bool { return s.path != "" }
2014-10-22 05:32:19 +00:00
// Close shuts down the server.
func (s *Server) Close() error {
2014-10-24 00:54:12 +00:00
s.mu.Lock()
2014-10-24 05:38:03 +00:00
defer s.mu.Unlock()
2014-10-24 00:54:12 +00:00
if !s.opened() {
return ErrServerClosed
}
if s.rpDone != nil {
close(s.rpDone)
}
2015-01-13 14:47:45 +00:00
// Remove path.
s.path = ""
2014-12-30 22:46:50 +00:00
// Close message processing.
s.setClient(nil)
2014-10-24 00:54:12 +00:00
2014-11-05 05:32:17 +00:00
// Close metastore.
_ = s.meta.close()
2015-02-11 05:18:36 +00:00
// Close shards.
for _, sh := range s.shards {
_ = sh.close()
}
2014-10-22 05:32:19 +00:00
return nil
}
2014-11-05 05:32:17 +00:00
// load reads the state of the server from the metastore.
func (s *Server) load() error {
return s.meta.view(func(tx *metatx) error {
2014-12-29 23:12:51 +00:00
// Read server id.
s.id = tx.id()
2015-01-28 03:44:22 +00:00
// Load data nodes.
s.dataNodes = make(map[uint64]*DataNode)
for _, node := range tx.dataNodes() {
s.dataNodes[node.ID] = node
}
2014-11-05 05:32:17 +00:00
// Load databases.
2014-12-23 06:18:05 +00:00
s.databases = make(map[string]*database)
2014-11-05 05:32:17 +00:00
for _, db := range tx.databases() {
s.databases[db.name] = db
2014-12-30 22:50:55 +00:00
// load the index
log.Printf("Loading metadata index for %s\n", db.name)
err := s.meta.view(func(tx *metatx) error {
tx.indexDatabase(db)
2014-12-30 22:50:55 +00:00
return nil
})
if err != nil {
return err
}
2014-11-05 05:32:17 +00:00
}
2015-01-28 07:53:16 +00:00
// Open all shards.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
for _, sh := range g.Shards {
if err := sh.open(s.shardPath(sh.ID)); err != nil {
return fmt.Errorf("cannot open shard store: id=%d, err=%s", sh.ID, err)
}
}
}
}
}
2014-12-23 06:18:05 +00:00
// Load users.
s.users = make(map[string]*User)
for _, u := range tx.users() {
s.users[u.Name] = u
2014-11-05 05:32:17 +00:00
}
return nil
})
}
// StartRetentionPolicyEnforcement launches retention policy enforcement.
func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error {
if checkInterval == 0 {
return fmt.Errorf("retention policy check interval must be non-zero")
}
rpDone := make(chan struct{}, 0)
s.rpDone = rpDone
go func() {
for {
select {
case <-rpDone:
return
case <-time.After(checkInterval):
s.EnforceRetentionPolicies()
}
}
}()
return nil
}
// EnforceRetentionPolicies ensures that data that is aging-out due to retention policies
// is removed from the server.
func (s *Server) EnforceRetentionPolicies() {
log.Println("retention policy enforcement check commencing")
// Check all shard groups.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if g.EndTime.Add(rp.Duration).Before(time.Now()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil {
log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error())
2015-02-10 23:57:39 +00:00
}
}
}
}
}
}
2014-12-30 22:46:50 +00:00
// Client retrieves the current messaging client.
func (s *Server) Client() MessagingClient {
s.mu.RLock()
defer s.mu.RUnlock()
return s.client
}
// SetClient sets the messaging client on the server.
func (s *Server) SetClient(client MessagingClient) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.setClient(client)
}
func (s *Server) setClient(client MessagingClient) error {
// Ensure the server is open.
if !s.opened() {
return ErrServerClosed
}
// Stop previous processor, if running.
if s.done != nil {
close(s.done)
s.done = nil
}
// Set the messaging client.
s.client = client
// Start goroutine to read messages from the broker.
if client != nil {
2015-01-13 17:16:43 +00:00
done := make(chan struct{}, 0)
s.done = done
go s.processor(client, done)
2014-12-30 22:46:50 +00:00
}
return nil
}
2014-10-24 05:38:03 +00:00
// broadcast encodes a message as JSON and send it to the broker's broadcast topic.
// This function waits until the message has been processed by the server.
2014-10-24 00:54:12 +00:00
// Returns the broker log index of the message or an error.
2014-10-24 05:38:03 +00:00
func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) {
2014-10-24 00:54:12 +00:00
// Encode the command.
2014-10-24 05:38:03 +00:00
data, err := json.Marshal(c)
2014-10-24 00:54:12 +00:00
if err != nil {
return 0, err
}
// Publish the message.
2014-10-24 05:38:03 +00:00
m := &messaging.Message{
Type: typ,
TopicID: messaging.BroadcastTopicID,
Data: data,
}
index, err := s.client.Publish(m)
2014-10-24 00:54:12 +00:00
if err != nil {
2014-10-24 05:38:03 +00:00
return 0, err
2014-10-24 00:54:12 +00:00
}
2014-10-24 05:38:03 +00:00
// Wait for the server to receive the message.
2015-01-10 15:48:50 +00:00
err = s.Sync(index)
2014-10-24 05:38:03 +00:00
2014-10-24 23:45:02 +00:00
return index, err
2014-10-24 05:38:03 +00:00
}
2015-01-10 15:48:50 +00:00
// Sync blocks until a given index (or a higher index) has been applied.
2014-10-24 23:45:02 +00:00
// Returns any error associated with the command.
2015-01-10 15:48:50 +00:00
func (s *Server) Sync(index uint64) error {
2014-10-24 05:38:03 +00:00
for {
2014-10-24 23:45:02 +00:00
// Check if index has occurred. If so, retrieve the error and return.
s.mu.RLock()
if s.index >= index {
err, ok := s.errors[index]
if ok {
delete(s.errors, index)
}
s.mu.RUnlock()
return err
2014-10-24 05:38:03 +00:00
}
2014-10-24 23:45:02 +00:00
s.mu.RUnlock()
// Otherwise wait momentarily and check again.
2014-10-24 05:38:03 +00:00
time.Sleep(1 * time.Millisecond)
2014-10-24 00:54:12 +00:00
}
2014-10-24 05:38:03 +00:00
}
2014-10-24 00:54:12 +00:00
2014-12-31 19:42:53 +00:00
// Initialize creates a new data node and initializes the server's id to 1.
func (s *Server) Initialize(u *url.URL) error {
// Create a new data node.
if err := s.CreateDataNode(u); err != nil {
return err
}
// Ensure the data node returns with an ID of 1.
// If it doesn't then something went really wrong. We have to panic because
// the messaging client relies on the first server being assigned ID 1.
n := s.DataNodeByURL(u)
assert(n != nil && n.ID == 1, "invalid initial server id: %d", n.ID)
// Set the ID on the metastore.
if err := s.meta.mustUpdate(func(tx *metatx) error {
return tx.setID(n.ID)
}); err != nil {
return err
}
// Set the ID on the server.
s.id = 1
return nil
}
// This is the same struct we use in the httpd package, but
// it seems overkill to export it and share it
type dataNodeJSON struct {
ID uint64 `json:"id"`
URL string `json:"url"`
}
// Join creates a new data node in an existing cluster, copies the metastore,
// and initializes the ID.
func (s *Server) Join(u *url.URL, joinURL *url.URL) error {
s.mu.Lock()
defer s.mu.Unlock()
// Encode data node request.
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&dataNodeJSON{URL: u.String()}); err != nil {
return err
}
// Send request.
joinURL = copyURL(joinURL)
joinURL.Path = "/data_nodes"
resp, err := http.Post(joinURL.String(), "application/octet-stream", &buf)
if err != nil {
return err
}
defer resp.Body.Close()
// Check if created.
if resp.StatusCode != http.StatusCreated {
return ErrUnableToJoin
}
// Decode response.
var n dataNodeJSON
if err := json.NewDecoder(resp.Body).Decode(&n); err != nil {
return err
}
assert(n.ID > 0, "invalid join node id returned: %d", n.ID)
// Download the metastore from joining server.
joinURL.Path = "/metastore"
2015-01-28 06:51:08 +00:00
resp, err = http.Get(joinURL.String())
if err != nil {
return err
}
defer resp.Body.Close()
2015-01-28 06:51:08 +00:00
// Check response & parse content length.
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unsuccessful meta copy: status=%d (%s)", resp.StatusCode, joinURL.String())
}
sz, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
return fmt.Errorf("cannot parse meta size: %s", err)
}
// Close the metastore.
_ = s.meta.close()
// Overwrite the metastore.
f, err := os.Create(s.metaPath())
if err != nil {
return fmt.Errorf("create meta file: %s", err)
}
// Copy and check size.
if _, err := io.CopyN(f, resp.Body, sz); err != nil {
_ = f.Close()
return fmt.Errorf("copy meta file: %s", err)
}
_ = f.Close()
// Reopen metastore.
s.meta = &metastore{}
if err := s.meta.open(s.metaPath()); err != nil {
return fmt.Errorf("reopen meta: %s", err)
}
// Update the ID on the metastore.
if err := s.meta.mustUpdate(func(tx *metatx) error {
return tx.setID(n.ID)
}); err != nil {
return err
}
2015-01-28 06:51:08 +00:00
// Reload the server.
log.Printf("reloading metadata")
if err := s.load(); err != nil {
return fmt.Errorf("reload: %s", err)
}
return nil
}
// CopyMetastore writes the underlying metastore data file to a writer.
func (s *Server) CopyMetastore(w io.Writer) error {
return s.meta.mustView(func(tx *metatx) error {
// Set content lengh if this is a HTTP connection.
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
}
// Write entire database to the writer.
return tx.Copy(w)
})
}
2014-12-30 15:50:15 +00:00
// DataNode returns a data node by id.
func (s *Server) DataNode(id uint64) *DataNode {
2014-12-29 23:12:51 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-30 15:50:15 +00:00
return s.dataNodes[id]
2014-12-29 23:12:51 +00:00
}
2014-12-30 15:50:15 +00:00
// DataNodeByURL returns a data node by url.
func (s *Server) DataNodeByURL(u *url.URL) *DataNode {
2014-12-29 23:12:51 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-30 15:50:15 +00:00
for _, n := range s.dataNodes {
if n.URL.String() == u.String() {
return n
}
}
return nil
2014-12-29 23:12:51 +00:00
}
2014-12-30 15:50:15 +00:00
// DataNodes returns a list of data nodes.
func (s *Server) DataNodes() (a []*DataNode) {
2014-12-29 23:12:51 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-30 15:50:15 +00:00
for _, n := range s.dataNodes {
2014-12-29 23:12:51 +00:00
a = append(a, n)
}
2014-12-30 15:50:15 +00:00
sort.Sort(dataNodes(a))
2014-12-29 23:12:51 +00:00
return
}
2014-12-30 15:50:15 +00:00
// CreateDataNode creates a new data node with a given URL.
func (s *Server) CreateDataNode(u *url.URL) error {
c := &createDataNodeCommand{URL: u.String()}
_, err := s.broadcast(createDataNodeMessageType, c)
2014-12-29 23:12:51 +00:00
return err
}
2014-12-30 15:50:15 +00:00
func (s *Server) applyCreateDataNode(m *messaging.Message) (err error) {
var c createDataNodeCommand
2014-12-29 23:12:51 +00:00
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate parameters.
if c.URL == "" {
2014-12-30 15:50:15 +00:00
return ErrDataNodeURLRequired
2014-12-29 23:12:51 +00:00
}
2014-12-30 15:50:15 +00:00
// Check that another node with the same URL doesn't already exist.
2014-12-29 23:12:51 +00:00
u, _ := url.Parse(c.URL)
2014-12-30 15:50:15 +00:00
for _, n := range s.dataNodes {
if n.URL.String() == u.String() {
return ErrDataNodeExists
}
2014-12-29 23:12:51 +00:00
}
2014-12-30 15:50:15 +00:00
// Create data node.
n := newDataNode()
2014-12-29 23:12:51 +00:00
n.URL = u
// Persist to metastore.
err = s.meta.mustUpdate(func(tx *metatx) error {
n.ID = tx.nextDataNodeID()
return tx.saveDataNode(n)
})
2014-12-29 23:12:51 +00:00
// Add to node on server.
2014-12-30 15:50:15 +00:00
s.dataNodes[n.ID] = n
2014-12-29 23:12:51 +00:00
return
}
2014-12-30 15:50:15 +00:00
type createDataNodeCommand struct {
2014-12-29 23:12:51 +00:00
URL string `json:"url"`
}
2014-12-30 15:50:15 +00:00
// DeleteDataNode deletes an existing data node.
func (s *Server) DeleteDataNode(id uint64) error {
c := &deleteDataNodeCommand{ID: id}
_, err := s.broadcast(deleteDataNodeMessageType, c)
2014-12-29 23:12:51 +00:00
return err
}
2014-12-30 15:50:15 +00:00
func (s *Server) applyDeleteDataNode(m *messaging.Message) (err error) {
var c deleteDataNodeCommand
2014-12-29 23:12:51 +00:00
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
2014-12-30 15:50:15 +00:00
n := s.dataNodes[c.ID]
2014-12-29 23:12:51 +00:00
if n == nil {
2014-12-30 15:50:15 +00:00
return ErrDataNodeNotFound
2014-12-29 23:12:51 +00:00
}
// Remove from metastore.
2014-12-30 15:50:15 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteDataNode(c.ID) })
2014-12-29 23:12:51 +00:00
// Delete the node.
2014-12-30 15:50:15 +00:00
delete(s.dataNodes, n.ID)
2014-12-29 23:12:51 +00:00
return
}
2014-12-30 15:50:15 +00:00
type deleteDataNodeCommand struct {
2014-12-29 23:12:51 +00:00
ID uint64 `json:"id"`
}
2014-12-23 06:18:05 +00:00
// DatabaseExists returns true if a database exists.
func (s *Server) DatabaseExists(name string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.databases[name] != nil
2014-10-24 00:54:12 +00:00
}
2014-12-23 06:18:05 +00:00
// Databases returns a sorted list of all database names.
func (s *Server) Databases() (a []string) {
s.mu.RLock()
defer s.mu.RUnlock()
2014-10-28 00:16:03 +00:00
for _, db := range s.databases {
2014-12-23 06:18:05 +00:00
a = append(a, db.name)
2014-10-28 00:16:03 +00:00
}
2014-12-23 06:18:05 +00:00
sort.Strings(a)
return
2014-10-28 00:16:03 +00:00
}
2014-10-24 00:54:12 +00:00
// CreateDatabase creates a new database.
func (s *Server) CreateDatabase(name string) error {
2014-10-25 17:54:23 +00:00
c := &createDatabaseCommand{Name: name}
_, err := s.broadcast(createDatabaseMessageType, c)
2014-10-24 05:38:03 +00:00
return err
2014-10-24 00:54:12 +00:00
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) {
2014-10-24 05:38:03 +00:00
var c createDatabaseCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-24 05:38:03 +00:00
2014-10-24 00:54:12 +00:00
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] != nil {
2014-10-24 23:45:02 +00:00
return ErrDatabaseExists
2014-10-24 00:54:12 +00:00
}
// Create database entry.
2014-12-23 06:18:05 +00:00
db := newDatabase()
2014-10-25 04:38:01 +00:00
db.name = c.Name
2014-11-05 05:32:17 +00:00
// Persist to metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error { return tx.saveDatabase(db) })
2014-11-05 05:32:17 +00:00
// Add to databases on server.
2014-10-25 04:38:01 +00:00
s.databases[c.Name] = db
2014-11-05 05:32:17 +00:00
2014-12-23 06:18:05 +00:00
return
2014-10-24 23:45:02 +00:00
}
type createDatabaseCommand struct {
Name string `json:"name"`
}
// DeleteDatabase deletes an existing database.
func (s *Server) DeleteDatabase(name string) error {
2014-10-25 17:54:23 +00:00
c := &deleteDatabaseCommand{Name: name}
_, err := s.broadcast(deleteDatabaseMessageType, c)
2014-10-24 23:45:02 +00:00
return err
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyDeleteDatabase(m *messaging.Message) (err error) {
2014-10-24 23:45:02 +00:00
var c deleteDatabaseCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-24 23:45:02 +00:00
s.mu.Lock()
defer s.mu.Unlock()
if s.databases[c.Name] == nil {
return ErrDatabaseNotFound
}
2014-11-05 05:32:17 +00:00
// Remove from metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error { return tx.deleteDatabase(c.Name) })
2014-11-05 05:32:17 +00:00
2014-10-24 23:45:02 +00:00
// Delete the database entry.
delete(s.databases, c.Name)
2014-12-23 06:18:05 +00:00
return
2014-10-24 23:45:02 +00:00
}
type deleteDatabaseCommand struct {
Name string `json:"name"`
2014-10-24 00:54:12 +00:00
}
2015-01-10 20:22:57 +00:00
// Shard returns a shard by ID.
func (s *Server) Shard(id uint64) *Shard {
s.mu.RLock()
defer s.mu.RUnlock()
return s.shards[id]
}
2015-01-12 20:10:26 +00:00
// shardGroupByTimestamp returns a group for a database, policy & timestamp.
2015-01-10 15:48:50 +00:00
func (s *Server) shardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroup, error) {
2014-12-23 06:18:05 +00:00
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
2015-01-10 15:48:50 +00:00
return db.shardGroupByTimestamp(policy, timestamp)
2014-12-23 06:18:05 +00:00
}
2015-01-10 15:48:50 +00:00
// ShardGroups returns a list of all shard groups for a database.
2014-12-23 06:18:05 +00:00
// Returns an error if the database doesn't exist.
2015-01-10 15:48:50 +00:00
func (s *Server) ShardGroups(database string) ([]*ShardGroup, error) {
2014-12-23 06:18:05 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
// Lookup database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
2015-01-10 15:48:50 +00:00
// Retrieve groups from database.
var a []*ShardGroup
for _, rp := range db.policies {
2015-01-15 17:35:42 +00:00
for _, g := range rp.shardGroups {
2015-01-10 15:48:50 +00:00
a = append(a, g)
}
2014-12-23 06:18:05 +00:00
}
2015-01-10 15:48:50 +00:00
return a, nil
2014-12-23 06:18:05 +00:00
}
2015-02-01 18:47:48 +00:00
// CreateShardGroupIfNotExists creates the shard group for a retention policy for the interval a timestamp falls into.
2015-01-10 15:48:50 +00:00
func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) error {
c := &createShardGroupIfNotExistsCommand{Database: database, Policy: policy, Timestamp: timestamp}
_, err := s.broadcast(createShardGroupIfNotExistsMessageType, c)
2014-12-23 06:18:05 +00:00
return err
}
2015-01-12 20:13:13 +00:00
// createShardIfNotExists returns the shard group for a database, policy, and timestamp.
// If the group doesn't exist then one will be created automatically.
2015-01-10 15:48:50 +00:00
func (s *Server) createShardGroupIfNotExists(database, policy string, timestamp time.Time) (*ShardGroup, error) {
// Check if shard group exists first.
g, err := s.shardGroupByTimestamp(database, policy, timestamp)
2014-12-23 06:18:05 +00:00
if err != nil {
return nil, err
2015-01-10 15:48:50 +00:00
} else if g != nil {
return g, nil
2014-12-23 06:18:05 +00:00
}
// If the shard doesn't exist then create it.
2015-01-10 15:48:50 +00:00
if err := s.CreateShardGroupIfNotExists(database, policy, timestamp); err != nil {
2014-12-23 06:18:05 +00:00
return nil, err
}
// Lookup the shard again.
2015-01-10 15:48:50 +00:00
return s.shardGroupByTimestamp(database, policy, timestamp)
2014-12-23 06:18:05 +00:00
}
2015-01-10 15:48:50 +00:00
func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
var c createShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
2014-12-23 06:18:05 +00:00
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
2015-01-10 15:48:50 +00:00
// If we can match to an existing shard group date range then just ignore request.
if g := rp.shardGroupByTimestamp(c.Timestamp); g != nil {
return nil
2014-12-23 06:18:05 +00:00
}
// If no shards match then create a new one.
2015-01-10 15:48:50 +00:00
g := newShardGroup()
g.StartTime = c.Timestamp.Truncate(rp.Duration).UTC()
g.EndTime = g.StartTime.Add(rp.Duration).UTC()
2014-12-23 06:18:05 +00:00
2015-01-10 15:48:50 +00:00
// Sort nodes so they're consistently assigned to the shards.
nodes := make([]*DataNode, 0, len(s.dataNodes))
for _, n := range s.dataNodes {
nodes = append(nodes, n)
}
sort.Sort(dataNodes(nodes))
// Require at least one replica but no more replicas than nodes.
replicaN := int(rp.ReplicaN)
if replicaN == 0 {
replicaN = 1
} else if replicaN > len(nodes) {
replicaN = len(nodes)
}
// Determine shard count by node count divided by replication factor.
// This will ensure nodes will get distributed across nodes evenly and
// replicated the correct number of times.
shardN := len(nodes) / replicaN
2015-01-12 20:10:26 +00:00
// Create a shard based on the node count and replication factor.
2015-01-10 15:48:50 +00:00
g.Shards = make([]*Shard, shardN)
for i := range g.Shards {
g.Shards[i] = newShard()
}
// Persist to metastore if a shard was created.
2014-12-23 18:18:46 +00:00
if err = s.meta.mustUpdate(func(tx *metatx) error {
2015-01-10 15:48:50 +00:00
// Generate an ID for the group.
g.ID = tx.nextShardGroupID()
// Generate an ID for each shard.
for _, sh := range g.Shards {
sh.ID = tx.nextShardID()
}
// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(m.Index % uint64(len(nodes)))
for _, sh := range g.Shards {
for i := 0; i < replicaN; i++ {
node := nodes[nodeIndex%len(nodes)]
sh.DataNodeIDs = append(sh.DataNodeIDs, node.ID)
nodeIndex++
}
}
// Retention policy has a new shard group, so update the policy.
rp.shardGroups = append(rp.shardGroups, g)
return tx.saveDatabase(db)
2014-12-23 18:18:46 +00:00
}); err != nil {
2015-01-10 15:48:50 +00:00
g.close()
2014-12-23 18:18:46 +00:00
return
}
2015-01-10 15:48:50 +00:00
// Open shards assigned to this server.
for _, sh := range g.Shards {
// Ignore if this server is not assigned.
if !sh.HasDataNodeID(s.id) {
continue
}
// Open shard store. Panic if an error occurs and we can retry.
if err := sh.open(s.shardPath(sh.ID)); err != nil {
panic("unable to open shard: " + err.Error())
}
}
2014-12-23 06:18:05 +00:00
// Add to lookups.
2015-01-10 15:48:50 +00:00
for _, sh := range g.Shards {
s.shards[sh.ID] = sh
}
// Subscribe to shard if it matches the server's index.
// TODO: Move subscription outside of command processing.
// TODO: Retry subscriptions on failure.
for _, sh := range g.Shards {
// Ignore if this server is not assigned.
if !sh.HasDataNodeID(s.id) {
continue
}
2015-01-10 15:48:50 +00:00
// Subscribe on the broker.
if err := s.client.Subscribe(s.id, sh.ID); err != nil {
log.Printf("unable to subscribe: replica=%d, topic=%d, err=%s", s.id, sh.ID, err)
}
}
2014-12-29 23:12:51 +00:00
2014-12-23 18:18:46 +00:00
return
}
2015-01-10 15:48:50 +00:00
type createShardGroupIfNotExistsCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
Timestamp time.Time `json:"timestamp"`
}
2015-02-10 23:43:03 +00:00
// DeleteShardGroup deletes the shard group identified by shardID.
func (s *Server) DeleteShardGroup(database, policy string, shardID uint64) error {
c := &deleteShardGroupCommand{Database: database, Policy: policy, ID: shardID}
_, err := s.broadcast(deleteShardGroupMessageType, c)
return err
}
// applyDeleteShardGroup deletes shard data from disk and updates the metastore.
func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
var c deleteShardGroupCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}
// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}
// If shard group no longer exists, then ignore request. This can occur if multiple
// data nodes triggered the deletion.
g := rp.shardGroupByID(c.ID)
if g == nil {
return nil
}
for _, shard := range g.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(s.id) {
continue
}
path := shard.store.Path()
2015-02-10 23:59:46 +00:00
shard.close()
2015-02-10 23:43:03 +00:00
if err := os.Remove(path); err != nil {
2015-02-11 00:08:38 +00:00
// Log, but keep going. This can happen if shards were deleted, but the server exited
// before it acknowledged the delete command.
2015-02-10 23:43:03 +00:00
log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error())
}
}
// Remove from metastore.
rp.removeShardGroupByID(c.ID)
err = s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveDatabase(db)
})
return
}
type deleteShardGroupCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}
2014-12-23 06:18:05 +00:00
// User returns a user by username
// Returns nil if the user does not exist.
func (s *Server) User(name string) *User {
2014-10-28 23:54:49 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
return s.users[name]
2014-10-28 23:54:49 +00:00
}
2014-12-23 06:18:05 +00:00
// Users returns a list of all users, sorted by name.
func (s *Server) Users() (a []*User) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, u := range s.users {
a = append(a, u)
}
2014-12-23 06:18:05 +00:00
sort.Sort(users(a))
return a
}
// UserCount returns the number of users.
func (s *Server) UserCount() int {
2015-01-19 20:12:48 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.users)
}
// AdminUserExists returns whether at least 1 admin-level user exists.
func (s *Server) AdminUserExists() bool {
for _, u := range s.users {
if u.Admin {
return true
}
}
return false
}
// Authenticate returns an authenticated user by username. If any error occurs,
// or the authentication credentials are invalid, an error is returned.
func (s *Server) Authenticate(username, password string) (*User, error) {
s.mu.Lock()
defer s.mu.Unlock()
2015-02-05 21:04:28 +00:00
u := s.users[username]
2015-02-05 21:04:28 +00:00
// If authorization is not enabled and user is nil, we are authorized
if u == nil && !s.authenticationEnabled {
return nil, nil
}
if u == nil {
2015-01-19 20:12:48 +00:00
return nil, fmt.Errorf("invalid username or password")
}
err := u.Authenticate(password)
if err != nil {
2015-01-19 20:12:48 +00:00
return nil, fmt.Errorf("invalid username or password")
}
return u, nil
}
2014-12-23 06:18:05 +00:00
// CreateUser creates a user on the server.
func (s *Server) CreateUser(username, password string, admin bool) error {
c := &createUserCommand{Username: username, Password: password, Admin: admin}
_, err := s.broadcast(createUserMessageType, c)
2014-10-28 23:54:49 +00:00
return err
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyCreateUser(m *messaging.Message) (err error) {
var c createUserCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-28 23:54:49 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate user.
2014-10-28 23:54:49 +00:00
if c.Username == "" {
return ErrUsernameRequired
2014-12-23 06:18:05 +00:00
} else if s.users[c.Username] != nil {
return ErrUserExists
2014-10-28 23:54:49 +00:00
}
// Generate the hash of the password.
hash, err := HashPassword(c.Password)
if err != nil {
return err
}
2014-12-23 06:18:05 +00:00
// Create the user.
u := &User{
Name: c.Username,
Hash: string(hash),
Privileges: make(map[string]influxql.Privilege),
Admin: c.Admin,
2014-10-28 23:54:49 +00:00
}
2014-11-05 05:32:17 +00:00
// Persist to metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveUser(u)
2014-11-05 05:32:17 +00:00
})
2014-12-23 06:18:05 +00:00
s.users[u.Name] = u
return
2014-10-28 23:54:49 +00:00
}
2014-12-23 06:18:05 +00:00
type createUserCommand struct {
2014-10-28 23:54:49 +00:00
Username string `json:"username"`
Password string `json:"password"`
2014-12-23 06:18:05 +00:00
Admin bool `json:"admin,omitempty"`
2014-10-28 23:54:49 +00:00
}
2014-12-23 06:18:05 +00:00
// UpdateUser updates an existing user on the server.
func (s *Server) UpdateUser(username, password string) error {
c := &updateUserCommand{Username: username, Password: password}
_, err := s.broadcast(updateUserMessageType, c)
2014-10-29 00:43:03 +00:00
return err
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyUpdateUser(m *messaging.Message) (err error) {
var c updateUserCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-29 00:43:03 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate command.
u := s.users[c.Username]
if u == nil {
return ErrUserNotFound
2014-10-29 00:43:03 +00:00
}
2014-12-23 06:18:05 +00:00
// Update the user's password, if set.
if c.Password != "" {
hash, err := HashPassword(c.Password)
if err != nil {
return err
}
u.Hash = string(hash)
}
2014-11-05 05:32:17 +00:00
2014-12-23 06:18:05 +00:00
// Persist to metastore.
return s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveUser(u)
})
2014-10-29 00:43:03 +00:00
}
2014-12-23 06:18:05 +00:00
type updateUserCommand struct {
2014-10-29 00:43:03 +00:00
Username string `json:"username"`
2014-12-23 06:18:05 +00:00
Password string `json:"password,omitempty"`
2014-10-29 00:43:03 +00:00
}
2014-12-23 06:18:05 +00:00
// DeleteUser removes a user from the server.
func (s *Server) DeleteUser(username string) error {
c := &deleteUserCommand{Username: username}
_, err := s.broadcast(deleteUserMessageType, c)
return err
}
func (s *Server) applyDeleteUser(m *messaging.Message) error {
var c deleteUserCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-28 23:54:49 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate user.
if c.Username == "" {
return ErrUsernameRequired
} else if s.users[c.Username] == nil {
return ErrUserNotFound
2014-11-05 05:32:17 +00:00
}
2014-12-23 06:18:05 +00:00
// Remove from metastore.
2014-11-05 05:32:17 +00:00
s.meta.mustUpdate(func(tx *metatx) error {
2014-12-23 06:18:05 +00:00
return tx.deleteUser(c.Username)
2014-11-05 05:32:17 +00:00
})
2014-12-23 06:18:05 +00:00
// Delete the user.
delete(s.users, c.Username)
2014-11-05 05:32:17 +00:00
return nil
}
2014-12-23 06:18:05 +00:00
type deleteUserCommand struct {
2014-11-05 05:32:17 +00:00
Username string `json:"username"`
2014-10-28 23:54:49 +00:00
}
// SetPrivilege grants / revokes a privilege to a user.
func (s *Server) SetPrivilege(p influxql.Privilege, username string, dbname string) error {
c := &setPrivilegeCommand{p, username, dbname}
_, err := s.broadcast(setPrivilegeMessageType, c)
return err
}
func (s *Server) applySetPrivilege(m *messaging.Message) error {
var c setPrivilegeCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate user.
if c.Username == "" {
return ErrUsernameRequired
}
u := s.users[c.Username]
if u == nil {
return ErrUserNotFound
}
// If dbname is empty, update user's Admin flag.
if c.Database == "" && (c.Privilege == influxql.AllPrivileges || c.Privilege == influxql.NoPrivileges) {
u.Admin = (c.Privilege == influxql.AllPrivileges)
} else if c.Database != "" {
// Update user's privilege for the database.
u.Privileges[c.Database] = c.Privilege
} else {
return ErrInvalidGrantRevoke
}
// Persist to metastore.
return s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveUser(u)
})
}
type setPrivilegeCommand struct {
Privilege influxql.Privilege `json:"privilege"`
Username string `json:"username"`
Database string `json:"database"`
}
2014-12-23 06:18:05 +00:00
// RetentionPolicy returns a retention policy by name.
// Returns an error if the database doesn't exist.
func (s *Server) RetentionPolicy(database, name string) (*RetentionPolicy, error) {
2014-10-25 04:38:01 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Lookup database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
2014-10-25 04:38:01 +00:00
}
2014-12-23 06:18:05 +00:00
return db.policies[name], nil
}
// DefaultRetentionPolicy returns the default retention policy for a database.
// Returns an error if the database doesn't exist.
func (s *Server) DefaultRetentionPolicy(database string) (*RetentionPolicy, error) {
s.mu.RLock()
defer s.mu.RUnlock()
2014-12-23 06:18:05 +00:00
// Lookup database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
2014-11-05 05:32:17 +00:00
}
2014-12-23 06:18:05 +00:00
return db.policies[db.defaultRetentionPolicy], nil
}
2014-11-05 05:32:17 +00:00
2014-12-23 06:18:05 +00:00
// RetentionPolicies returns a list of retention polocies for a database.
// Returns an error if the database doesn't exist.
func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) {
s.mu.RLock()
defer s.mu.RUnlock()
// Lookup database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
// Retrieve the policies.
a := make([]*RetentionPolicy, 0, len(db.policies))
for _, p := range db.policies {
a = append(a, p)
}
return a, nil
2014-10-25 04:38:01 +00:00
}
2014-12-23 06:18:05 +00:00
// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error {
c := &createRetentionPolicyCommand{
Database: database,
Name: rp.Name,
Duration: rp.Duration,
ReplicaN: rp.ReplicaN,
}
_, err := s.broadcast(createRetentionPolicyMessageType, c)
return err
2014-10-25 04:38:01 +00:00
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyCreateRetentionPolicy(m *messaging.Message) error {
var c createRetentionPolicyCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
2014-10-25 17:54:23 +00:00
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
2014-12-23 06:18:05 +00:00
} else if c.Name == "" {
return ErrRetentionPolicyNameRequired
} else if db.policies[c.Name] != nil {
return ErrRetentionPolicyExists
2014-10-25 17:54:23 +00:00
}
2014-12-23 15:47:32 +00:00
// Add policy to the database.
2014-12-23 06:18:05 +00:00
db.policies[c.Name] = &RetentionPolicy{
Name: c.Name,
Duration: c.Duration,
ReplicaN: c.ReplicaN,
2014-11-05 05:32:17 +00:00
}
2014-10-25 17:54:23 +00:00
2014-11-05 05:32:17 +00:00
// Persist to metastore.
s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveDatabase(db)
})
return nil
2014-10-25 17:54:23 +00:00
}
2014-12-23 06:18:05 +00:00
type createRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Duration time.Duration `json:"duration"`
ReplicaN uint32 `json:"replicaN"`
SplitN uint32 `json:"splitN"`
}
2015-02-05 17:54:06 +00:00
// RetentionPolicyUpdate represents retention policy fields that
// need to be updated.
type RetentionPolicyUpdate struct {
Name *string `json:"name,omitempty"`
Duration *time.Duration `json:"duration,omitempty"`
ReplicaN *uint32 `json:"replicaN,omitempty"`
}
2014-12-23 06:18:05 +00:00
// UpdateRetentionPolicy updates an existing retention policy on a database.
2015-02-05 17:54:06 +00:00
func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
c := &updateRetentionPolicyCommand{Database: database, Name: name, Policy: rpu}
2014-12-23 06:18:05 +00:00
_, err := s.broadcast(updateRetentionPolicyMessageType, c)
return err
}
type updateRetentionPolicyCommand struct {
2015-02-05 17:54:06 +00:00
Database string `json:"database"`
Name string `json:"name"`
Policy *RetentionPolicyUpdate `json:"policy"`
2014-10-25 19:30:41 +00:00
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyUpdateRetentionPolicy(m *messaging.Message) (err error) {
var c updateRetentionPolicyCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate command.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
2014-12-23 06:18:05 +00:00
} else if c.Name == "" {
return ErrRetentionPolicyNameRequired
}
2014-12-23 06:18:05 +00:00
// Retrieve the policy.
p := db.policies[c.Name]
if db.policies[c.Name] == nil {
return ErrRetentionPolicyNotFound
}
2015-02-05 17:54:06 +00:00
// Update the policy name.
if c.Policy.Name != nil {
2014-12-23 06:18:05 +00:00
delete(db.policies, p.Name)
2015-02-05 17:54:06 +00:00
p.Name = *c.Policy.Name
2014-12-23 06:18:05 +00:00
db.policies[p.Name] = p
2014-11-05 05:32:17 +00:00
}
2015-02-05 17:54:06 +00:00
// Update duration.
if c.Policy.Duration != nil {
p.Duration = *c.Policy.Duration
}
// Update replication factor.
if c.Policy.ReplicaN != nil {
p.ReplicaN = *c.Policy.ReplicaN
}
2014-11-05 05:32:17 +00:00
// Persist to metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error {
2014-11-05 05:32:17 +00:00
return tx.saveDatabase(db)
})
2014-12-23 06:18:05 +00:00
return
}
2014-12-23 06:18:05 +00:00
// DeleteRetentionPolicy removes a retention policy from a database.
func (s *Server) DeleteRetentionPolicy(database, name string) error {
c := &deleteRetentionPolicyCommand{Database: database, Name: name}
_, err := s.broadcast(deleteRetentionPolicyMessageType, c)
return err
}
2014-12-23 06:18:05 +00:00
func (s *Server) applyDeleteRetentionPolicy(m *messaging.Message) (err error) {
var c deleteRetentionPolicyCommand
2014-11-05 05:32:17 +00:00
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Retrieve the database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
2014-12-23 06:18:05 +00:00
} else if c.Name == "" {
return ErrRetentionPolicyNameRequired
} else if db.policies[c.Name] == nil {
return ErrRetentionPolicyNotFound
}
2014-12-23 06:18:05 +00:00
// Remove retention policy.
delete(db.policies, c.Name)
2014-11-05 05:32:17 +00:00
// Persist to metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error {
2014-11-05 05:32:17 +00:00
return tx.saveDatabase(db)
})
2014-12-23 06:18:05 +00:00
return
}
type deleteRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
2014-12-23 06:18:05 +00:00
// SetDefaultRetentionPolicy sets the default policy to write data into and query from on a database.
func (s *Server) SetDefaultRetentionPolicy(database, name string) error {
c := &setDefaultRetentionPolicyCommand{Database: database, Name: name}
_, err := s.broadcast(setDefaultRetentionPolicyMessageType, c)
return err
}
func (s *Server) applySetDefaultRetentionPolicy(m *messaging.Message) (err error) {
2014-11-18 23:59:37 +00:00
var c setDefaultRetentionPolicyCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate command.
2014-11-18 23:59:37 +00:00
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
2014-12-23 06:18:05 +00:00
} else if db.policies[c.Name] == nil {
return ErrRetentionPolicyNotFound
2014-11-18 23:59:37 +00:00
}
2014-12-23 06:18:05 +00:00
// Update default policy.
db.defaultRetentionPolicy = c.Name
// Persist to metastore.
2014-12-23 06:18:05 +00:00
err = s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveDatabase(db)
})
2014-12-23 06:18:05 +00:00
return
2014-11-18 23:59:37 +00:00
}
type setDefaultRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
2015-02-13 22:24:24 +00:00
type createFieldsIfNotExistCommand struct {
Database string `json:"database"`
Measurement string `json:"measurement"`
Fields map[string]influxql.DataType `json:"fields"`
}
func (s *Server) applyCreateFieldsIfNotExist(m *messaging.Message) error {
var c createFieldsIfNotExistCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
defer s.mu.Unlock()
// Validate command.
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}
mm := db.measurements[c.Measurement]
if mm == nil {
return ErrMeasurementNotFound
}
// Create fields in Metastore.
for k, v := range c.Fields {
if err := mm.createFieldIfNotExists(k, v); err != nil {
if err == ErrFieldOverflow {
log.Printf("no more fields allowed: %s::%s", mm.Name, k)
continue
} else if err == ErrFieldTypeConflict {
log.Printf("field type conflict: %s::%s", mm.Name, k)
continue
}
return err
}
}
2015-02-13 23:10:46 +00:00
// Update metastore.
if err := s.meta.mustUpdate(func(tx *metatx) error {
if err := tx.saveMeasurement(db.name, mm); err != nil {
return fmt.Errorf("save measurement: %s", err)
}
return tx.saveDatabase(db)
}); err != nil {
return err
}
2015-02-13 22:24:24 +00:00
return nil
}
func (s *Server) applyCreateSeriesIfNotExists(m *messaging.Message) error {
var c createSeriesIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)
s.mu.Lock()
2014-12-23 06:18:05 +00:00
defer s.mu.Unlock()
2014-12-23 06:18:05 +00:00
// Validate command.
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}
if _, series := db.MeasurementAndSeries(c.Name, c.Tags); series != nil {
return nil
}
// save to the metastore and add it to the in memory index
var series *Series
2015-01-10 20:22:57 +00:00
if err := s.meta.mustUpdate(func(tx *metatx) error {
var err error
series, err = tx.createSeries(db.name, c.Name, c.Tags)
return err
2015-01-10 20:22:57 +00:00
}); err != nil {
return err
}
2015-01-10 20:22:57 +00:00
db.addSeriesToIndex(c.Name, series)
return nil
}
type createSeriesIfNotExistsCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Tags map[string]string `json:"tags"`
}
// Point defines the values that will be written to the database
type Point struct {
Name string
Tags map[string]string
Timestamp time.Time
Values map[string]interface{}
}
2014-12-23 06:18:05 +00:00
// WriteSeries writes series data to the database.
2015-01-13 14:47:45 +00:00
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {
// If the retention policy is not set, use the default for this database.
if retentionPolicy == "" {
rp, err := s.DefaultRetentionPolicy(database)
if err != nil {
2015-01-13 17:16:43 +00:00
return 0, fmt.Errorf("failed to determine default retention policy: %s", err.Error())
2015-01-14 23:44:09 +00:00
} else if rp == nil {
return 0, ErrDefaultRetentionPolicyNotFound
}
retentionPolicy = rp.Name
}
// Collect responses for each channel.
type resp struct {
index uint64
err error
}
ch := make(chan resp, len(points))
// Write each point in parallel.
var wg sync.WaitGroup
for i := range points {
wg.Add(1)
go func(p *Point) {
index, err := s.writePoint(database, retentionPolicy, p)
ch <- resp{index, err}
wg.Done()
}(&points[i])
}
wg.Wait()
close(ch)
// Calculate max index and check for errors.
var index uint64
var err error
for resp := range ch {
if resp.index > index {
index = resp.index
}
if err == nil && resp.err != nil {
err = resp.err
}
}
return index, err
}
func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uint64, error) {
2015-02-13 23:10:46 +00:00
measurement, tags, timestamp, values := point.Name, point.Tags, point.Timestamp, point.Values
2015-02-04 00:40:50 +00:00
// Sanity-check the data point.
2015-02-13 23:10:46 +00:00
if measurement == "" {
2015-02-04 00:40:50 +00:00
return 0, ErrMeasurementNameRequired
}
if len(values) == 0 {
return 0, ErrValuesRequired
}
// Find the id for the series and tagset
2015-02-13 23:10:46 +00:00
seriesID, err := s.createSeriesIfNotExists(database, measurement, tags)
if err != nil {
return 0, err
}
2015-01-10 15:48:50 +00:00
// Retrieve measurement.
2015-02-13 23:10:46 +00:00
m, err := s.measurement(database, measurement)
2014-12-23 06:18:05 +00:00
if err != nil {
2015-01-13 17:16:43 +00:00
return 0, err
2015-01-10 15:48:50 +00:00
} else if m == nil {
2015-01-13 17:16:43 +00:00
return 0, ErrMeasurementNotFound
2014-12-23 06:18:05 +00:00
}
2015-01-10 15:48:50 +00:00
// Retrieve shard group.
g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp)
2014-12-23 06:18:05 +00:00
if err != nil {
2015-01-13 15:00:30 +00:00
return 0, fmt.Errorf("create shard(%s/%s): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
2015-01-10 15:48:50 +00:00
}
// Find appropriate shard within the shard group.
2015-01-14 23:44:09 +00:00
sh := g.ShardBySeriesID(seriesID)
2015-01-10 15:48:50 +00:00
// Ensure fields are created as necessary.
err = s.createFieldsIfNotExists(database, measurement, values)
2015-02-13 23:10:46 +00:00
if err != nil {
return 0, err
}
2015-01-10 15:48:50 +00:00
2015-01-10 20:22:57 +00:00
// Convert string-key/values to fieldID-key/values.
2015-02-11 23:35:43 +00:00
rawValues, err := m.mapValues(values)
if err != nil {
return 0, err
}
2015-01-10 15:48:50 +00:00
// Encode point header.
2015-01-10 20:22:57 +00:00
data := marshalPointHeader(seriesID, timestamp.UnixNano())
data = append(data, m.EncodeFields(rawValues)...)
2014-12-23 06:18:05 +00:00
2015-01-10 20:22:57 +00:00
// Publish "raw write series" message on shard's topic to broker.
2015-01-13 17:16:43 +00:00
return s.client.Publish(&messaging.Message{
2015-01-10 20:22:57 +00:00
Type: writeRawSeriesMessageType,
2014-12-23 06:18:05 +00:00
TopicID: sh.ID,
Data: data,
2015-01-10 15:48:50 +00:00
})
2014-12-23 06:18:05 +00:00
}
2015-01-14 23:44:09 +00:00
// applyWriteRawSeries writes raw series data to the database.
2015-01-10 20:22:57 +00:00
// Raw series data has already converted field names to ids so the
// representation is fast and compact.
2015-01-14 23:44:09 +00:00
func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
2015-01-10 20:22:57 +00:00
// Retrieve the shard.
sh := s.Shard(m.TopicID)
if sh == nil {
return ErrShardNotFound
}
// Extract the series id and timestamp from the header.
// Everything after the header is the marshalled value.
seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
data := m.Data[pointHeaderSize:]
2015-01-27 02:14:07 +00:00
// Add to lookup.
s.addShardBySeriesID(sh, seriesID)
2015-01-10 20:22:57 +00:00
// TODO: Enable some way to specify if the data should be overwritten
overwrite := true
// Write to shard.
return sh.writeSeries(seriesID, timestamp, data, overwrite)
}
2015-01-27 02:14:07 +00:00
func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
for _, other := range s.shardsBySeriesID[seriesID] {
if other.ID == sh.ID {
return
}
}
s.shardsBySeriesID[seriesID] = append(s.shardsBySeriesID[seriesID], sh)
}
2014-12-23 15:47:32 +00:00
func (s *Server) createSeriesIfNotExists(database, name string, tags map[string]string) (uint32, error) {
// Try to find series locally first.
s.mu.RLock()
2015-01-26 12:19:35 +00:00
db := s.databases[database]
if db == nil {
s.mu.RUnlock()
return 0, fmt.Errorf("database not found %q", database)
}
2015-01-26 12:19:35 +00:00
if _, series := db.MeasurementAndSeries(name, tags); series != nil {
s.mu.RUnlock()
return series.ID, nil
2014-12-23 06:18:05 +00:00
}
// release the read lock so the broadcast can actually go through and acquire the write lock
s.mu.RUnlock()
2014-12-23 06:18:05 +00:00
2014-12-23 15:47:32 +00:00
// If it doesn't exist then create a message and broadcast.
c := &createSeriesIfNotExistsCommand{Database: database, Name: name, Tags: tags}
_, err := s.broadcast(createSeriesIfNotExistsMessageType, c)
2014-12-23 06:18:05 +00:00
if err != nil {
2014-12-23 15:47:32 +00:00
return 0, err
2014-12-23 06:18:05 +00:00
}
2014-12-23 15:47:32 +00:00
// Lookup series again.
2015-01-26 12:19:35 +00:00
_, series := db.MeasurementAndSeries(name, tags)
if series == nil {
2014-12-23 15:47:32 +00:00
return 0, ErrSeriesNotFound
2014-12-23 06:18:05 +00:00
}
return series.ID, nil
2014-12-23 06:18:05 +00:00
}
func (s *Server) createFieldsIfNotExists(database string, measurement string, values map[string]interface{}) error {
// Local function keeps locking foolproof.
f := func(database string, measurement string, values map[string]interface{}) (map[string]influxql.DataType, error) {
s.mu.RLock()
defer s.mu.RUnlock()
2015-02-13 23:10:46 +00:00
// Check to see if the fields already exist.
m, err := s.measurement(database, measurement)
if err != nil {
return nil, err
} else if m == nil {
return nil, ErrMeasurementNotFound
}
2015-02-13 23:38:32 +00:00
newFields := make(map[string]influxql.DataType)
for k, v := range values {
f := m.FieldByName(k)
if f == nil {
newFields[k] = influxql.InspectDataType(v)
} else {
if f.Type != influxql.InspectDataType(v) {
2015-02-14 00:28:12 +00:00
return nil, fmt.Errorf(fmt.Sprintf("field \"%s\" is type %T, mapped as type %s", k, v, f.Type))
}
2015-02-13 23:10:46 +00:00
}
}
return newFields, nil
2015-02-13 23:10:46 +00:00
}
newFields, err := f(database, measurement, values)
if err != nil {
return err
}
2015-02-13 23:10:46 +00:00
if len(newFields) == 0 {
return nil
2015-02-13 23:10:46 +00:00
}
2015-02-13 23:38:32 +00:00
// There are some new fields, so create field types mappings on cluster.
2015-02-13 23:10:46 +00:00
c := &createFieldsIfNotExistCommand{Database: database, Measurement: measurement, Fields: newFields}
_, err = s.broadcast(createFieldsIfNotExistsMessageType, c)
if err != nil {
return err
2015-02-13 23:10:46 +00:00
}
return nil
2015-02-13 23:10:46 +00:00
}
2015-02-16 18:28:06 +00:00
// ReadSeries reads a single point from a series in the database. It is used for debug and test only.
2015-01-10 15:48:50 +00:00
func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time) (map[string]interface{}, error) {
2015-01-10 20:22:57 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
// Find database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
// Find series.
mm, series := db.MeasurementAndSeries(name, tags)
if mm == nil {
return nil, ErrMeasurementNotFound
} else if series == nil {
return nil, ErrSeriesNotFound
}
// If the retention policy is not specified, use the default for this database.
if retentionPolicy == "" {
retentionPolicy = db.defaultRetentionPolicy
}
// Retrieve retention policy.
rp := db.policies[retentionPolicy]
if rp == nil {
return nil, ErrRetentionPolicyNotFound
}
// Retrieve shard group.
g, err := s.shardGroupByTimestamp(database, retentionPolicy, timestamp)
if err != nil {
return nil, err
} else if g == nil {
return nil, nil
}
// TODO: Verify that server owns shard.
// Find appropriate shard within the shard group.
sh := g.Shards[int(series.ID)%len(g.Shards)]
// Read raw encoded series data.
data, err := sh.readSeries(series.ID, timestamp.UnixNano())
if err != nil {
return nil, err
}
// Decode into a raw value map.
2015-02-13 00:04:09 +00:00
rawValues := mm.DecodeFields(data)
2015-01-14 23:44:09 +00:00
if rawValues == nil {
return nil, nil
}
2015-01-10 20:22:57 +00:00
// Decode into a string-key value map.
values := make(map[string]interface{}, len(rawValues))
for fieldID, value := range rawValues {
f := mm.Field(fieldID)
if f == nil {
continue
}
values[f.Name] = value
}
return values, nil
2015-01-10 15:48:50 +00:00
}
2015-01-13 17:16:43 +00:00
// ExecuteQuery executes an InfluxQL query against the server.
// Returns a resultset for each statement in the query.
// Stops on first execution error that occurs.
func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Results {
2015-01-19 20:12:48 +00:00
// Authorize user to execute the query.
2015-02-05 21:04:28 +00:00
if s.authenticationEnabled {
if err := s.Authorize(user, q, database); err != nil {
2015-02-05 21:04:28 +00:00
return Results{Err: err}
}
2015-01-19 20:12:48 +00:00
}
2015-01-13 17:16:43 +00:00
// Build empty resultsets.
results := Results{Results: make([]*Result, len(q.Statements))}
2015-01-13 17:16:43 +00:00
// Execute each statement.
for i, stmt := range q.Statements {
// Set default database and policy on the statement.
if err := s.NormalizeStatement(stmt, database); err != nil {
results.Results[i] = &Result{Err: err}
break
}
var res *Result
switch stmt := stmt.(type) {
2015-01-13 17:16:43 +00:00
case *influxql.SelectStatement:
res = s.executeSelectStatement(stmt, database, user)
2015-01-14 19:37:01 +00:00
case *influxql.CreateDatabaseStatement:
res = s.executeCreateDatabaseStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.DropDatabaseStatement:
res = s.executeDropDatabaseStatement(stmt, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowDatabasesStatement:
res = s.executeShowDatabasesStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.CreateUserStatement:
res = s.executeCreateUserStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.DropUserStatement:
res = s.executeDropUserStatement(stmt, user)
2015-01-27 03:11:48 +00:00
case *influxql.ShowUsersStatement:
res = s.executeShowUsersStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.DropSeriesStatement:
continue
2015-01-26 03:40:50 +00:00
case *influxql.ShowSeriesStatement:
2015-01-28 04:36:19 +00:00
res = s.executeShowSeriesStatement(stmt, database, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowMeasurementsStatement:
2015-01-28 05:51:09 +00:00
res = s.executeShowMeasurementsStatement(stmt, database, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowTagKeysStatement:
2015-01-29 01:26:15 +00:00
res = s.executeShowTagKeysStatement(stmt, database, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowTagValuesStatement:
2015-01-29 20:00:15 +00:00
res = s.executeShowTagValuesStatement(stmt, database, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowFieldKeysStatement:
2015-01-30 22:31:31 +00:00
res = s.executeShowFieldKeysStatement(stmt, database, user)
2015-01-14 19:37:01 +00:00
case *influxql.GrantStatement:
res = s.executeGrantStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.RevokeStatement:
2015-01-30 02:56:23 +00:00
res = s.executeRevokeStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.CreateRetentionPolicyStatement:
res = s.executeCreateRetentionPolicyStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.AlterRetentionPolicyStatement:
res = s.executeAlterRetentionPolicyStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.DropRetentionPolicyStatement:
res = s.executeDropRetentionPolicyStatement(stmt, user)
2015-01-26 03:40:50 +00:00
case *influxql.ShowRetentionPoliciesStatement:
res = s.executeShowRetentionPoliciesStatement(stmt, user)
2015-01-14 19:37:01 +00:00
case *influxql.CreateContinuousQueryStatement:
continue
case *influxql.DropContinuousQueryStatement:
continue
2015-01-26 03:40:50 +00:00
case *influxql.ShowContinuousQueriesStatement:
2015-01-14 19:37:01 +00:00
continue
default:
panic(fmt.Sprintf("unsupported statement type: %T", stmt))
2015-01-14 19:37:01 +00:00
}
// If an error occurs then stop processing remaining statements.
results.Results[i] = res
if res.Err != nil {
break
2015-01-13 17:16:43 +00:00
}
}
// Fill any empty results after error.
for i, res := range results.Results {
2015-01-13 17:16:43 +00:00
if res == nil {
results.Results[i] = &Result{Err: ErrNotExecuted}
2015-01-13 17:16:43 +00:00
}
}
return results
}
// executeSelectStatement plans and executes a select statement against a database.
func (s *Server) executeSelectStatement(stmt *influxql.SelectStatement, database string, user *User) *Result {
2015-01-13 17:16:43 +00:00
// Plan statement execution.
e, err := s.planSelectStatement(stmt)
2015-01-13 17:16:43 +00:00
if err != nil {
2015-01-14 23:44:09 +00:00
return &Result{Err: err}
2015-01-13 17:16:43 +00:00
}
// Execute plan.
ch, err := e.Execute()
if err != nil {
2015-01-14 23:44:09 +00:00
return &Result{Err: err}
2015-01-13 17:16:43 +00:00
}
// Read all rows from channel.
res := &Result{Rows: make([]*influxql.Row, 0)}
for row := range ch {
res.Rows = append(res.Rows, row)
}
return res
}
// plans a selection statement under lock.
func (s *Server) planSelectStatement(stmt *influxql.SelectStatement) (*influxql.Executor, error) {
s.mu.RLock()
defer s.mu.RUnlock()
2015-01-13 17:16:43 +00:00
// If this is a wildcard statement, expand this to use the fields
// This is temporary until we move other parts of the system further
2015-02-10 00:57:03 +00:00
isWildcard := false
for _, f := range stmt.Fields {
if _, ok := f.Expr.(*influxql.Wildcard); ok {
isWildcard = true
break
}
}
if len(stmt.Fields) != 1 && isWildcard {
return nil, fmt.Errorf("unsupported query: %s. currently only single wildcard is supported.", stmt.String())
}
if isWildcard {
if measurement, ok := stmt.Source.(*influxql.Measurement); ok {
segments, err := influxql.SplitIdent(measurement.Name)
if err != nil {
return nil, fmt.Errorf("unable to parse measurement %s", measurement.Name)
}
db, m := segments[0], segments[2]
if s.databases[db].measurements[m] == nil {
return nil, fmt.Errorf("measurement %s does not exist.", measurement.Name)
}
var fields influxql.Fields
for _, f := range s.databases[db].measurements[m].Fields {
fields = append(fields, &influxql.Field{Expr: &influxql.VarRef{Val: f.Name}})
}
2015-02-10 00:57:03 +00:00
stmt.Fields = fields
}
2015-01-13 17:16:43 +00:00
}
// Plan query.
2015-01-26 12:19:35 +00:00
p := influxql.NewPlanner(s)
2015-01-13 17:16:43 +00:00
return p.Plan(stmt)
}
func (s *Server) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement, user *User) *Result {
return &Result{Err: s.CreateDatabase(q.Name)}
}
func (s *Server) executeDropDatabaseStatement(q *influxql.DropDatabaseStatement, user *User) *Result {
return &Result{Err: s.DeleteDatabase(q.Name)}
}
2015-01-26 03:40:50 +00:00
func (s *Server) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatement, user *User) *Result {
row := &influxql.Row{Columns: []string{"name"}}
for _, name := range s.Databases() {
row.Values = append(row.Values, []interface{}{name})
}
return &Result{Rows: []*influxql.Row{row}}
}
func (s *Server) executeCreateUserStatement(q *influxql.CreateUserStatement, user *User) *Result {
isAdmin := false
if q.Privilege != nil {
isAdmin = *q.Privilege == influxql.AllPrivileges
}
return &Result{Err: s.CreateUser(q.Name, q.Password, isAdmin)}
}
func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *User) *Result {
return &Result{Err: s.DeleteUser(q.Name)}
}
2015-01-28 04:36:19 +00:00
func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string, user *User) *Result {
2015-01-28 15:41:10 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2015-01-28 16:50:14 +00:00
// Find the database.
2015-01-28 15:41:10 +00:00
db := s.databases[database]
2015-01-28 04:36:19 +00:00
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
2015-01-29 01:26:15 +00:00
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
2015-01-28 04:36:19 +00:00
}
2015-01-29 01:26:15 +00:00
// // If OFFSET is past the end of the array, return empty results.
// if stmt.Offset > len(measurements)-1 {
// return &Result{}
// }
2015-01-28 16:50:14 +00:00
2015-01-28 08:45:21 +00:00
// Create result struct that will be populated and returned.
2015-01-28 04:36:19 +00:00
result := &Result{
2015-01-28 08:45:21 +00:00
Rows: make(influxql.Rows, 0, len(measurements)),
2015-01-28 04:36:19 +00:00
}
2015-01-28 08:45:21 +00:00
// Loop through measurements to build result. One result row / measurement.
2015-01-30 22:31:31 +00:00
for _, m := range measurements {
2015-01-28 08:45:21 +00:00
var ids seriesIDs
2015-01-28 05:51:09 +00:00
2015-01-28 08:45:21 +00:00
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint32]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
2015-01-28 16:50:14 +00:00
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
2015-01-28 08:45:21 +00:00
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
// Make a new row for this measurement.
r := &influxql.Row{
Name: m.Name,
Columns: m.tagKeys(),
}
2015-01-28 04:36:19 +00:00
2015-01-28 08:45:21 +00:00
// Loop through series IDs getting matching tag sets.
for _, id := range ids {
if s, ok := m.seriesByID[id]; ok {
2015-01-28 16:50:14 +00:00
values := make([]interface{}, 0, len(r.Columns))
2015-01-28 08:45:21 +00:00
for _, column := range r.Columns {
values = append(values, s.Tags[column])
}
2015-01-28 04:36:19 +00:00
2015-01-28 08:45:21 +00:00
// Add the tag values to the row.
2015-01-28 16:50:14 +00:00
r.Values = append(r.Values, values)
2015-01-28 08:45:21 +00:00
}
2015-01-28 04:36:19 +00:00
}
2015-01-28 08:45:21 +00:00
// Append the row to the result.
result.Rows = append(result.Rows, r)
2015-01-28 04:36:19 +00:00
}
return result
}
2015-01-28 05:51:09 +00:00
func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result {
2015-01-28 15:41:10 +00:00
s.mu.RLock()
defer s.mu.RUnlock()
2015-01-29 01:26:15 +00:00
// Find the database.
2015-01-28 15:41:10 +00:00
db := s.databases[database]
2015-01-28 05:51:09 +00:00
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get all measurements in sorted order.
measurements := db.Measurements()
sort.Sort(measurements)
// If a WHERE clause was specified, filter the measurements.
if stmt.Condition != nil {
var err error
measurements, err = db.measurementsByExpr(stmt.Condition)
if err != nil {
return &Result{Err: err}
}
}
offset := stmt.Offset
limit := stmt.Limit
// If OFFSET is past the end of the array, return empty results.
if offset > len(measurements)-1 {
return &Result{}
}
// Calculate last index based on LIMIT.
end := len(measurements)
if limit > 0 && offset+limit < end {
limit = offset + limit
} else {
limit = end
}
2015-02-12 04:03:44 +00:00
// Make a result row to hold all measurement names.
row := &influxql.Row{
Name: "measurements",
Columns: []string{"name"},
2015-01-28 05:51:09 +00:00
}
2015-02-12 04:03:44 +00:00
// Add one value to the row for each measurement name.
2015-01-28 05:51:09 +00:00
for i := offset; i < limit; i++ {
m := measurements[i]
2015-02-12 04:03:44 +00:00
v := interface{}(m.Name)
row.Values = append(row.Values, []interface{}{v})
}
2015-01-28 05:51:09 +00:00
2015-02-12 04:03:44 +00:00
// Make a result.
result := &Result{
Rows: influxql.Rows{row},
2015-01-28 05:51:09 +00:00
}
return result
}
2015-01-29 01:26:15 +00:00
func (s *Server) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
// Find the database.
db := s.databases[database]
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
}
// Make result.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
// Add one row per measurement to the result.
2015-01-30 22:31:31 +00:00
for _, m := range measurements {
2015-01-29 01:26:15 +00:00
// TODO: filter tag keys by stmt.Condition
2015-01-29 02:31:23 +00:00
// Get the tag keys in sorted order.
keys := m.tagKeys()
// Convert keys to an [][]interface{}.
values := make([][]interface{}, 0, len(m.seriesByTagKeyValue))
for _, k := range keys {
v := interface{}(k)
values = append(values, []interface{}{v})
}
// Make a result row for the measurement.
2015-01-29 01:26:15 +00:00
r := &influxql.Row{
Name: m.Name,
2015-01-29 02:31:23 +00:00
Columns: []string{"tagKey"},
Values: values,
2015-01-29 01:26:15 +00:00
}
result.Rows = append(result.Rows, r)
}
// TODO: LIMIT & OFFSET
return result
}
2015-01-29 20:00:15 +00:00
func (s *Server) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
// Find the database.
db := s.databases[database]
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
}
// Make result.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
2015-01-30 22:31:31 +00:00
for _, m := range measurements {
2015-01-29 20:00:15 +00:00
var ids seriesIDs
if stmt.Condition != nil {
// Get series IDs that match the WHERE clause.
filters := map[uint32]influxql.Expr{}
ids, _, _ = m.walkWhereForSeriesIds(stmt.Condition, filters)
// If no series matched, then go to the next measurement.
if len(ids) == 0 {
continue
}
// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
}
tagValues := m.tagValuesByKeyAndSeriesID(stmt.TagKeys, ids)
r := &influxql.Row{
Name: m.Name,
Columns: []string{"tagValue"},
}
vals := tagValues.list()
sort.Strings(vals)
for _, val := range vals {
v := interface{}(val)
r.Values = append(r.Values, []interface{}{v})
}
result.Rows = append(result.Rows, r)
}
return result
}
2015-01-30 22:31:31 +00:00
// filterMeasurementsByExpr filters a list of measurements by a tags expression.
func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) {
// Create a list to hold result measurements.
filtered := make(Measurements, 0)
// Iterate measurements adding the ones that match to the result.
for _, m := range measurements {
// Look up series IDs that match the tags expression.
ids, err := m.seriesIDsAllOrByExpr(expr)
if err != nil {
return nil, err
} else if len(ids) > 0 {
filtered = append(filtered, m)
}
}
sort.Sort(filtered)
return filtered, nil
}
func (s *Server) executeShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
var err error
// Find the database.
db := s.databases[database]
if db == nil {
return &Result{Err: ErrDatabaseNotFound}
}
// Get the list of measurements we're interested in.
measurements, err := measurementsFromSourceOrDB(stmt.Source, db)
if err != nil {
return &Result{Err: err}
}
// If the statement has a where clause, filter the measurements by it.
if stmt.Condition != nil {
if measurements, err = filterMeasurementsByExpr(measurements, stmt.Condition); err != nil {
return &Result{Err: err}
}
}
// Make result.
result := &Result{
Rows: make(influxql.Rows, 0, len(measurements)),
}
// Loop through measurements, adding a result row for each.
for _, m := range measurements {
// Create a new row.
r := &influxql.Row{
Name: m.Name,
Columns: []string{"fieldKey"},
}
// Get a list of field names from the measurement then sort them.
names := make([]string, 0, len(m.Fields))
for _, f := range m.Fields {
names = append(names, f.Name)
}
sort.Strings(names)
// Add the field names to the result row values.
for _, n := range names {
v := interface{}(n)
r.Values = append(r.Values, []interface{}{v})
}
// Append the row to the result.
result.Rows = append(result.Rows, r)
}
return result
}
func (s *Server) executeGrantStatement(stmt *influxql.GrantStatement, user *User) *Result {
return &Result{Err: s.SetPrivilege(stmt.Privilege, stmt.User, stmt.On)}
2015-01-30 02:26:28 +00:00
}
2015-01-30 02:56:23 +00:00
func (s *Server) executeRevokeStatement(stmt *influxql.RevokeStatement, user *User) *Result {
return &Result{Err: s.SetPrivilege(influxql.NoPrivileges, stmt.User, stmt.On)}
2015-01-30 02:56:23 +00:00
}
2015-01-30 22:31:31 +00:00
// measurementsFromSourceOrDB returns a list of measurements from the
2015-01-29 01:26:15 +00:00
// statement passed in or, if the statement is nil, a list of all
// measurement names from the database passed in.
2015-01-30 22:31:31 +00:00
func measurementsFromSourceOrDB(stmt influxql.Source, db *database) (Measurements, error) {
var measurements Measurements
2015-01-29 01:26:15 +00:00
if stmt != nil {
// TODO: handle multiple measurement sources
if m, ok := stmt.(*influxql.Measurement); ok {
segments, err := influxql.SplitIdent(m.Name)
if err != nil {
return nil, err
}
2015-01-30 22:31:31 +00:00
name := m.Name
if len(segments) == 3 {
name = segments[2]
}
2015-01-29 01:26:15 +00:00
measurement := db.measurements[name]
if measurement == nil {
return nil, fmt.Errorf(`measurement "%s" not found`, name)
}
2015-01-30 22:31:31 +00:00
measurements = append(measurements, db.measurements[name])
2015-01-29 01:26:15 +00:00
} else {
return nil, errors.New("identifiers in FROM clause must be measurement names")
}
} else {
// No measurements specified in FROM clause so get all measurements.
2015-01-30 22:31:31 +00:00
measurements = db.Measurements()
2015-01-29 01:26:15 +00:00
}
2015-01-30 22:31:31 +00:00
sort.Sort(measurements)
2015-01-29 01:26:15 +00:00
return measurements, nil
}
2015-01-27 03:11:48 +00:00
func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user *User) *Result {
2015-01-28 00:48:17 +00:00
row := &influxql.Row{Columns: []string{"user", "admin"}}
2015-01-27 03:11:48 +00:00
for _, user := range s.Users() {
2015-01-28 00:48:17 +00:00
row.Values = append(row.Values, []interface{}{user.Name, user.Admin})
2015-01-27 03:11:48 +00:00
}
return &Result{Rows: []*influxql.Row{row}}
}
func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetentionPolicyStatement, user *User) *Result {
rp := NewRetentionPolicy(q.Name)
rp.Duration = q.Duration
rp.ReplicaN = uint32(q.Replication)
// Create new retention policy.
err := s.CreateRetentionPolicy(q.Database, rp)
if err != nil {
2015-02-05 17:54:06 +00:00
return &Result{Err: err}
}
// If requested, set new policy as the default.
if q.Default {
err = s.SetDefaultRetentionPolicy(q.Database, q.Name)
}
return &Result{Err: err}
}
2015-02-05 17:54:06 +00:00
func (s *Server) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement, user *User) *Result {
rpu := &RetentionPolicyUpdate{
Duration: stmt.Duration,
ReplicaN: func() *uint32 { n := uint32(*stmt.Replication); return &n }(),
}
// Update the retention policy.
err := s.UpdateRetentionPolicy(stmt.Database, stmt.Name, rpu)
if err != nil {
return &Result{Err: err}
}
2015-02-05 17:54:06 +00:00
// If requested, set as default retention policy.
if stmt.Default {
err = s.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}
2015-02-05 17:54:06 +00:00
return &Result{Err: err}
}
func (s *Server) executeDropRetentionPolicyStatement(q *influxql.DropRetentionPolicyStatement, user *User) *Result {
return &Result{Err: s.DeleteRetentionPolicy(q.Database, q.Name)}
}
2015-01-26 03:40:50 +00:00
func (s *Server) executeShowRetentionPoliciesStatement(q *influxql.ShowRetentionPoliciesStatement, user *User) *Result {
a, err := s.RetentionPolicies(q.Database)
if err != nil {
return &Result{Err: err}
}
row := &influxql.Row{Columns: []string{"name", "duration", "replicaN"}}
for _, rp := range a {
row.Values = append(row.Values, []interface{}{rp.Name, rp.Duration.String(), rp.ReplicaN})
}
return &Result{Rows: []*influxql.Row{row}}
}
2015-02-01 18:47:48 +00:00
// MeasurementNames returns a list of all measurements for the specified database.
func (s *Server) MeasurementNames(database string) []string {
s.mu.RLock()
defer s.mu.RUnlock()
db := s.databases[database]
if db == nil {
2014-12-23 06:18:05 +00:00
return nil
}
return db.names
}
2015-01-23 09:44:56 +00:00
/*
func (s *Server) MeasurementSeriesIDs(database, measurement string) []uint32 {
s.mu.RLock()
defer s.mu.RUnlock()
db := s.databases[database]
if db == nil {
return nil
}
2015-01-23 09:44:56 +00:00
return []uint32(db.SeriesIDs([]string{measurement}, nil))
}
2015-01-23 09:44:56 +00:00
*/
2015-01-10 20:22:57 +00:00
// measurement returns a measurement by database and name.
func (s *Server) measurement(database, name string) (*Measurement, error) {
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}
return db.measurements[name], nil
}
2015-01-26 12:19:35 +00:00
// Begin returns an unopened transaction associated with the server.
func (s *Server) Begin() (influxql.Tx, error) { return newTx(s), nil }
2015-01-23 09:44:56 +00:00
2015-01-28 08:59:02 +00:00
// NormalizeStatement adds a default database and policy to the measurements in statement.
func (s *Server) NormalizeStatement(stmt influxql.Statement, defaultDatabase string) (err error) {
s.mu.RLock()
defer s.mu.RUnlock()
// Track prefixes for replacing field names.
prefixes := make(map[string]string)
// Qualify all measurements.
influxql.WalkFunc(stmt, func(n influxql.Node) {
if err != nil {
return
}
switch n := n.(type) {
case *influxql.Measurement:
name, e := s.normalizeMeasurement(n.Name, defaultDatabase)
if e != nil {
err = e
return
}
prefixes[n.Name] = name
n.Name = name
}
})
if err != nil {
return err
}
// Replace all variable references that used measurement prefixes.
influxql.WalkFunc(stmt, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.VarRef:
for k, v := range prefixes {
if strings.HasPrefix(n.Val, k+".") {
n.Val = v + "." + influxql.QuoteIdent([]string{n.Val[len(k)+1:]})
}
}
}
})
return
}
// NormalizeMeasurement inserts the default database or policy into all measurement names.
func (s *Server) NormalizeMeasurement(name string, defaultDatabase string) (string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.normalizeMeasurement(name, defaultDatabase)
}
func (s *Server) normalizeMeasurement(name string, defaultDatabase string) (string, error) {
// Split name into segments.
segments, err := influxql.SplitIdent(name)
if err != nil {
return "", fmt.Errorf("invalid measurement: %s", name)
}
// Normalize to 3 segments.
switch len(segments) {
case 1:
segments = append([]string{"", ""}, segments...)
case 2:
segments = append([]string{""}, segments...)
case 3:
// nop
default:
return "", fmt.Errorf("invalid measurement: %s", name)
}
// Set database if unset.
if segment := segments[0]; segment == `` {
segments[0] = defaultDatabase
}
// Find database.
db := s.databases[segments[0]]
if db == nil {
return "", fmt.Errorf("database not found: %s", segments[0])
}
// Set retention policy if unset.
if segment := segments[1]; segment == `` {
if db.defaultRetentionPolicy == "" {
return "", fmt.Errorf("default retention policy not set for: %s", db.name)
}
segments[1] = db.defaultRetentionPolicy
}
// Check if retention policy exists.
if _, ok := db.policies[segments[1]]; !ok {
return "", fmt.Errorf("retention policy does not exist: %s.%s", segments[0], segments[1])
}
return influxql.QuoteIdent(segments), nil
}
2014-10-24 00:54:12 +00:00
// processor runs in a separate goroutine and processes all incoming broker messages.
2014-12-30 22:46:50 +00:00
func (s *Server) processor(client MessagingClient, done chan struct{}) {
2014-10-24 00:54:12 +00:00
for {
// Read incoming message.
var m *messaging.Message
2015-01-13 14:47:45 +00:00
var ok bool
2014-10-24 00:54:12 +00:00
select {
case <-done:
return
2015-01-13 14:47:45 +00:00
case m, ok = <-client.C():
if !ok {
return
}
}
// Exit if closed.
// TODO: Wrap this check in a lock with the apply itself.
if !s.opened() {
continue
2014-10-24 00:54:12 +00:00
}
// Process message.
2014-10-24 23:45:02 +00:00
var err error
2014-10-24 00:54:12 +00:00
switch m.Type {
2015-01-10 20:22:57 +00:00
case writeRawSeriesMessageType:
2015-01-14 23:44:09 +00:00
err = s.applyWriteRawSeries(m)
2014-12-30 15:50:15 +00:00
case createDataNodeMessageType:
err = s.applyCreateDataNode(m)
case deleteDataNodeMessageType:
err = s.applyDeleteDataNode(m)
2014-10-24 00:54:12 +00:00
case createDatabaseMessageType:
2014-10-24 23:45:02 +00:00
err = s.applyCreateDatabase(m)
case deleteDatabaseMessageType:
err = s.applyDeleteDatabase(m)
2014-12-23 06:18:05 +00:00
case createUserMessageType:
err = s.applyCreateUser(m)
case updateUserMessageType:
err = s.applyUpdateUser(m)
case deleteUserMessageType:
err = s.applyDeleteUser(m)
case createRetentionPolicyMessageType:
err = s.applyCreateRetentionPolicy(m)
2014-12-23 06:18:05 +00:00
case updateRetentionPolicyMessageType:
err = s.applyUpdateRetentionPolicy(m)
case deleteRetentionPolicyMessageType:
err = s.applyDeleteRetentionPolicy(m)
2015-01-10 15:48:50 +00:00
case createShardGroupIfNotExistsMessageType:
err = s.applyCreateShardGroupIfNotExists(m)
2015-02-10 23:43:03 +00:00
case deleteShardGroupMessageType:
err = s.applyDeleteShardGroup(m)
2014-11-18 23:59:37 +00:00
case setDefaultRetentionPolicyMessageType:
err = s.applySetDefaultRetentionPolicy(m)
2015-02-13 23:10:46 +00:00
case createFieldsIfNotExistsMessageType:
2015-02-13 22:24:24 +00:00
err = s.applyCreateFieldsIfNotExist(m)
case createSeriesIfNotExistsMessageType:
err = s.applyCreateSeriesIfNotExists(m)
case setPrivilegeMessageType:
err = s.applySetPrivilege(m)
2014-10-24 05:38:03 +00:00
}
// Sync high water mark and errors.
s.mu.Lock()
s.index = m.Index
if err != nil {
s.errors[m.Index] = err
2014-10-24 00:54:12 +00:00
}
s.mu.Unlock()
2014-10-24 00:54:12 +00:00
}
}
2015-01-13 17:16:43 +00:00
// Result represents a resultset returned from a single statement.
type Result struct {
Rows []*influxql.Row
Err error
}
// MarshalJSON encodes the result into JSON.
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Rows []*influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
o.Rows = r.Rows
if r.Err != nil {
o.Err = r.Err.Error()
}
return json.Marshal(&o)
}
2015-01-28 22:44:49 +00:00
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Rows []*influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
if err != nil {
return err
}
r.Rows = o.Rows
if o.Err != "" {
r.Err = errors.New(o.Err)
}
return nil
}
// Results represents a list of statement results.
type Results struct {
Results []*Result
Err error
}
2015-02-01 18:47:48 +00:00
// MarshalJSON encodes a Results stuct into JSON.
func (r Results) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Results []*Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
o.Results = r.Results
if r.Err != nil {
o.Err = r.Err.Error()
}
return json.Marshal(&o)
}
2015-01-28 22:44:49 +00:00
// UnmarshalJSON decodes the data into the Results struct
func (r *Results) UnmarshalJSON(b []byte) error {
var o struct {
Results []*Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
if err != nil {
return err
}
r.Results = o.Results
if o.Err != "" {
r.Err = errors.New(o.Err)
}
return nil
}
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
2015-02-01 18:47:48 +00:00
func (r *Results) Error() error {
2015-02-05 21:04:28 +00:00
if r.Err != nil {
return r.Err
}
2015-02-01 18:47:48 +00:00
for _, rr := range r.Results {
if rr.Err != nil {
return rr.Err
}
}
return nil
2015-01-13 17:16:43 +00:00
}
2014-10-22 05:32:19 +00:00
// MessagingClient represents the client used to receive messages from brokers.
type MessagingClient interface {
2014-10-24 05:38:03 +00:00
// Publishes a message to the broker.
Publish(m *messaging.Message) (index uint64, err error)
// Creates a new replica with a given ID on the broker.
2015-01-10 15:48:50 +00:00
CreateReplica(replicaID uint64) error
// Deletes an existing replica with a given ID from the broker.
2015-01-10 15:48:50 +00:00
DeleteReplica(replicaID uint64) error
// Creates a subscription for a replica to a topic.
Subscribe(replicaID, topicID uint64) error
// Removes a subscription from the replica for a topic.
Unsubscribe(replicaID, topicID uint64) error
2014-10-24 05:38:03 +00:00
// The streaming channel for all subscribed messages.
2014-10-22 05:32:19 +00:00
C() <-chan *messaging.Message
}
2014-12-30 15:50:15 +00:00
// DataNode represents a data node in the cluster.
type DataNode struct {
2014-12-29 23:12:51 +00:00
ID uint64
URL *url.URL
}
2014-12-30 15:50:15 +00:00
// newDataNode returns an instance of DataNode.
func newDataNode() *DataNode { return &DataNode{} }
2014-12-29 23:12:51 +00:00
2014-12-30 15:50:15 +00:00
type dataNodes []*DataNode
2014-12-29 23:12:51 +00:00
2014-12-30 15:50:15 +00:00
func (p dataNodes) Len() int { return len(p) }
func (p dataNodes) Less(i, j int) bool { return p[i].ID < p[j].ID }
func (p dataNodes) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
2014-12-29 23:12:51 +00:00
2015-01-19 20:12:48 +00:00
// Authorize user u to execute query q on database.
// database can be "" for queries that do not require a database.
// If u is nil, this means authorization is disabled.
func (s *Server) Authorize(u *User, q *influxql.Query, database string) error {
const authErrLogFmt = `unauthorized request | user: %q | query: %q | database %q\n`
2015-02-05 21:04:28 +00:00
if u == nil {
s.Logger.Printf(authErrLogFmt, "", q.String(), database)
2015-02-05 21:04:28 +00:00
return ErrAuthorize{text: "no user provided"}
}
2015-01-19 20:12:48 +00:00
// Cluster admins can do anything.
2015-02-05 21:04:28 +00:00
if u.Admin {
2015-01-19 20:12:48 +00:00
return nil
}
// Check each statement in the query.
for _, stmt := range q.Statements {
// Get the privileges required to execute the statement.
privs := stmt.RequiredPrivileges()
// Make sure the user has each privilege required to execute
// the statement.
for _, p := range privs {
// Use the db name specified by the statement or the db
// name passed by the caller if one wasn't specified by
// the statement.
dbname := p.Name
if dbname == "" {
dbname = database
}
// Check if user has required privilege.
if !u.Authorize(p.Privilege, dbname) {
2015-01-19 20:12:48 +00:00
var msg string
if dbname == "" {
2015-01-19 20:12:48 +00:00
msg = "requires cluster admin"
} else {
msg = fmt.Sprintf("requires %s privilege on %s", p.Privilege.String(), dbname)
}
s.Logger.Printf(authErrLogFmt, u.Name, q.String(), database)
2015-02-05 21:04:28 +00:00
return ErrAuthorize{
text: fmt.Sprintf("%s not authorized to execute '%s'. %s", u.Name, stmt.String(), msg),
2015-01-19 20:12:48 +00:00
}
}
}
}
return nil
}
2014-12-23 06:18:05 +00:00
// BcryptCost is the cost associated with generating password with Bcrypt.
// This setting is lowered during testing to improve test suite performance.
var BcryptCost = 10
// User represents a user account on the system.
// It can be given read/write permissions to individual databases.
type User struct {
2015-01-19 20:12:48 +00:00
Name string `json:"name"`
Hash string `json:"hash"`
Privileges map[string]influxql.Privilege `json:"privileges"` // db name to privilege
2015-01-19 20:12:48 +00:00
Admin bool `json:"admin,omitempty"`
2014-11-04 04:15:58 +00:00
}
2014-12-23 06:18:05 +00:00
// Authenticate returns nil if the password matches the user's password.
// Returns an error if the password was incorrect.
func (u *User) Authenticate(password string) error {
return bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password))
2014-10-24 00:54:12 +00:00
}
2015-01-19 20:12:48 +00:00
// Authorize returns true if the user is authorized and false if not.
func (u *User) Authorize(privilege influxql.Privilege, database string) bool {
p, ok := u.Privileges[database]
return (ok && p >= privilege) || (u.Admin)
}
2014-12-23 06:18:05 +00:00
// users represents a list of users, sortable by name.
type users []*User
func (p users) Len() int { return len(p) }
func (p users) Less(i, j int) bool { return p[i].Name < p[j].Name }
func (p users) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
2015-02-01 18:47:48 +00:00
// Matcher can match either a Regex or plain string.
2014-12-23 06:18:05 +00:00
type Matcher struct {
IsRegex bool
Name string
2014-10-24 00:54:12 +00:00
}
2015-02-01 18:47:48 +00:00
// Matches returns true of the name passed in matches this Matcher.
2014-12-23 06:18:05 +00:00
func (m *Matcher) Matches(name string) bool {
if m.IsRegex {
matches, _ := regexp.MatchString(m.Name, name)
return matches
2014-10-22 05:32:19 +00:00
}
2014-12-23 06:18:05 +00:00
return m.Name == name
2014-10-22 05:32:19 +00:00
}
2014-10-24 05:38:03 +00:00
2014-12-23 06:18:05 +00:00
// HashPassword generates a cryptographically secure hash for password.
// Returns an error if the password is invalid or a hash cannot be generated.
func HashPassword(password string) ([]byte, error) {
// The second arg is the cost of the hashing, higher is slower but makes
// it harder to brute force, since it will be really slow and impractical
return bcrypt.GenerateFromPassword([]byte(password), BcryptCost)
}
// ContinuousQuery represents a query that exists on the server and processes
// each incoming event.
type ContinuousQuery struct {
ID uint32
Query string
// TODO: ParsedQuery *parser.SelectQuery
}
// copyURL returns a copy of the the URL.
func copyURL(u *url.URL) *url.URL {
other := &url.URL{}
*other = *u
return other
}