influxdb/services/meta/client.go

1047 lines
23 KiB
Go

package meta
import (
"bytes"
crand "crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"golang.org/x/crypto/bcrypt"
)
const (
// SaltBytes is the number of bytes used for salts
SaltBytes = 32
metaFile = "meta.db"
)
var (
// ErrServiceUnavailable is returned when the meta service is unavailable.
ErrServiceUnavailable = errors.New("meta service unavailable")
// ErrService is returned when the meta service returns an error.
ErrService = errors.New("meta service error")
)
// Client is used to execute commands on and read data from
// a meta service cluster.
type Client struct {
logger *log.Logger
mu sync.RWMutex
closing chan struct{}
changed chan struct{}
cacheData *Data
// Authentication cache.
authCache map[string]authUser
path string
retentionAutoCreate bool
}
type authUser struct {
bhash string
salt []byte
hash []byte
}
// NewClient returns a new *Client.
func NewClient(config *Config) *Client {
return &Client{
cacheData: &Data{
ClusterID: uint64(rand.Int63()),
Index: 1,
},
closing: make(chan struct{}),
changed: make(chan struct{}),
logger: log.New(ioutil.Discard, "[metaclient] ", log.LstdFlags),
authCache: make(map[string]authUser, 0),
path: config.Dir,
retentionAutoCreate: config.RetentionAutoCreate,
}
}
// Open a connection to a meta service cluster.
func (c *Client) Open() error {
c.mu.Lock()
defer c.mu.Unlock()
// Try to load from disk
if err := c.Load(); err != nil {
return err
}
// If this is a brand new instance, persist to disk immediatly.
if c.cacheData.Index == 1 {
if err := snapshot(c.path, c.cacheData); err != nil {
return err
}
}
return nil
}
// Close the meta service cluster connection.
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if t, ok := http.DefaultTransport.(*http.Transport); ok {
t.CloseIdleConnections()
}
select {
case <-c.closing:
return nil
default:
close(c.closing)
}
return nil
}
// AcquireLease attempts to acquire the specified lease.
// TODO corylanou remove this for single node
func (c *Client) AcquireLease(name string) (*Lease, error) {
l := Lease{
Name: name,
Expiration: time.Now().Add(DefaultLeaseDuration),
}
return &l, nil
}
func (c *Client) data() *Data {
c.mu.RLock()
defer c.mu.RUnlock()
data := c.cacheData.Clone()
return data
}
// ClusterID returns the ID of the cluster it's connected to.
func (c *Client) ClusterID() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cacheData.ClusterID
}
// Database returns info for the requested database.
func (c *Client) Database(name string) *DatabaseInfo {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, d := range data.Databases {
if d.Name == name {
return &d
}
}
return nil
}
// Databases returns a list of all database infos.
func (c *Client) Databases() []DatabaseInfo {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
dbs := data.Databases
if dbs == nil {
return []DatabaseInfo{}
}
return dbs
}
// CreateDatabase creates a database or returns it if it already exists
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if db := data.Database(name); db != nil {
return db, nil
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
// create default retention policy
if c.retentionAutoCreate {
if err := data.CreateRetentionPolicy(name, &RetentionPolicyInfo{
ReplicaN: 1,
}); err != nil {
return nil, err
}
if err := data.SetDefaultRetentionPolicy(name, ""); err != nil {
return nil, err
}
}
db := data.Database(name)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
// CreateDatabaseWithRetentionPolicy creates a database with the specified retention policy.
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if rpi.Duration < MinRetentionPolicyDuration && rpi.Duration != 0 {
return nil, ErrRetentionPolicyDurationTooLow
}
if db := data.Database(name); db != nil {
// Check if the retention policy already exists. If it does and matches
// the desired retention policy, exit with no error.
if rp := db.RetentionPolicy(rpi.Name); rp != nil {
// Normalise ShardDuration before comparing to any existing retention policies.
rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)
if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
return nil, ErrRetentionPolicyConflict
}
return db, nil
}
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
return nil, err
}
if err := data.SetDefaultRetentionPolicy(name, rpi.Name); err != nil {
return nil, err
}
db := data.Database(name)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
// DropDatabase deletes a database.
func (c *Client) DropDatabase(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropDatabase(name); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
// CreateRetentionPolicy creates a retention policy on the specified database.
func (c *Client) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if rpi.Duration < MinRetentionPolicyDuration && rpi.Duration != 0 {
return nil, ErrRetentionPolicyDurationTooLow
}
if err := data.CreateRetentionPolicy(database, rpi); err != nil {
return nil, err
}
rp, err := data.RetentionPolicy(database, rpi.Name)
if err != nil {
return nil, err
}
if err := c.commit(data); err != nil {
return nil, err
}
return rp, nil
}
// RetentionPolicy returns the requested retention policy info.
func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
db := data.Database(database)
if db == nil {
return nil, influxdb.ErrDatabaseNotFound(database)
}
return db.RetentionPolicy(name), nil
}
// DropRetentionPolicy drops a retention policy from a database.
func (c *Client) DropRetentionPolicy(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropRetentionPolicy(database, name); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
// SetDefaultRetentionPolicy sets a database's default retention policy.
func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetDefaultRetentionPolicy(database, name); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
// UpdateRetentionPolicy updates a retention policy.
func (c *Client) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.UpdateRetentionPolicy(database, name, rpu); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) Users() []UserInfo {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
users := data.Users
if users == nil {
return []UserInfo{}
}
return users
}
func (c *Client) User(name string) (*UserInfo, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, u := range data.Users {
if u.Name == name {
return &u, nil
}
}
return nil, ErrUserNotFound
}
// bcryptCost is the cost associated with generating password with bcrypt.
// This setting is lowered during testing to improve test suite performance.
var bcryptCost = bcrypt.DefaultCost
// hashWithSalt returns a salted hash of password using salt
func (c *Client) hashWithSalt(salt []byte, password string) []byte {
hasher := sha256.New()
hasher.Write(salt)
hasher.Write([]byte(password))
return hasher.Sum(nil)
}
// saltedHash returns a salt and salted hash of password
func (c *Client) saltedHash(password string) (salt, hash []byte, err error) {
salt = make([]byte, SaltBytes)
if _, err := io.ReadFull(crand.Reader, salt); err != nil {
return nil, nil, err
}
return salt, c.hashWithSalt(salt, password), nil
}
func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
// See if the user already exists.
if u := data.User(name); u != nil {
if err := bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password)); err != nil || u.Admin != admin {
return nil, ErrUserExists
}
return u, nil
}
// Hash the password before serializing it.
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
if err != nil {
return nil, err
}
if err := data.CreateUser(name, string(hash), admin); err != nil {
return nil, err
}
u := data.User(name)
if err := c.commit(data); err != nil {
return nil, err
}
return u, nil
}
func (c *Client) UpdateUser(name, password string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
// Hash the password before serializing it.
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
if err != nil {
return err
}
if err := data.UpdateUser(name, string(hash)); err != nil {
return nil
}
delete(c.authCache, name)
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) DropUser(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropUser(name); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetPrivilege(username, database string, p influxql.Privilege) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetPrivilege(username, database, p); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetAdminPrivilege(username string, admin bool) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetAdminPrivilege(username, admin); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) UserPrivileges(username string) (map[string]influxql.Privilege, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
p, err := data.UserPrivileges(username)
if err != nil {
return nil, err
}
return p, nil
}
func (c *Client) UserPrivilege(username, database string) (*influxql.Privilege, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
p, err := data.UserPrivilege(username, database)
if err != nil {
return nil, err
}
return p, nil
}
func (c *Client) AdminUserExists() bool {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, u := range data.Users {
if u.Admin {
return true
}
}
return false
}
func (c *Client) Authenticate(username, password string) (*UserInfo, error) {
// Find user.
c.mu.RLock()
userInfo := c.cacheData.User(username)
c.mu.RUnlock()
if userInfo == nil {
return nil, ErrUserNotFound
}
// Check the local auth cache first.
c.mu.RLock()
au, ok := c.authCache[username]
c.mu.RUnlock()
if ok {
// verify the password using the cached salt and hash
if bytes.Equal(c.hashWithSalt(au.salt, password), au.hash) {
return userInfo, nil
}
// fall through to requiring a full bcrypt hash for invalid passwords
}
// Compare password with user hash.
if err := bcrypt.CompareHashAndPassword([]byte(userInfo.Hash), []byte(password)); err != nil {
return nil, ErrAuthenticate
}
// generate a salt and hash of the password for the cache
salt, hashed, err := c.saltedHash(password)
if err != nil {
return nil, err
}
c.mu.Lock()
c.authCache[username] = authUser{salt: salt, hash: hashed, bhash: userInfo.Hash}
c.mu.Unlock()
return userInfo, nil
}
func (c *Client) UserCount() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cacheData.Users)
}
// ShardIDs returns a list of all shard ids.
func (c *Client) ShardIDs() []uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
var a []uint64
for _, dbi := range c.cacheData.Databases {
for _, rpi := range dbi.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
for _, si := range sgi.Shards {
a = append(a, si.ID)
}
}
}
}
sort.Sort(uint64Slice(a))
return a
}
// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
// for the specified time range. Shard groups are sorted by start time.
func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []ShardGroupInfo, err error) {
c.mu.RLock()
defer c.mu.RUnlock()
// Find retention policy.
rpi, err := c.cacheData.RetentionPolicy(database, policy)
if err != nil {
return nil, err
} else if rpi == nil {
return nil, influxdb.ErrRetentionPolicyNotFound(policy)
}
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
for _, g := range rpi.ShardGroups {
if g.Deleted() || !g.Overlaps(min, max) {
continue
}
groups = append(groups, g)
}
return groups, nil
}
// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
// Shards are returned in ascending time order.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) {
m := make(map[uint64]struct{})
for _, src := range sources {
mm, ok := src.(*influxql.Measurement)
if !ok {
return nil, fmt.Errorf("invalid source type: %#v", src)
}
groups, err := c.ShardGroupsByTimeRange(mm.Database, mm.RetentionPolicy, tmin, tmax)
if err != nil {
return nil, err
}
for _, g := range groups {
for _, sh := range g.Shards {
if _, ok := m[sh.ID]; ok {
continue
}
a = append(a, sh)
m[sh.ID] = struct{}{}
}
}
}
return a, nil
}
// DropShard deletes a shard by ID.
func (c *Client) DropShard(id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
data.DropShard(id)
return c.commit(data)
}
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Check under a read-lock
c.mu.RLock()
if sg, _ := c.cacheData.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
c.mu.RUnlock()
return sg, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
// Check again under the write lock
data := c.cacheData.Clone()
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return sg, nil
}
sgi, err := createShardGroup(data, database, policy, timestamp)
if err != nil {
return nil, err
}
if err := c.commit(data); err != nil {
return nil, err
}
return sgi, nil
}
func createShardGroup(data *Data, database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// It is the responsibility of the caller to check if it exists before calling this method.
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return nil, ErrShardGroupExists
}
if err := data.CreateShardGroup(database, policy, timestamp); err != nil {
return nil, err
}
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return nil, err
} else if rpi == nil {
return nil, errors.New("retention policy deleted after shard group created")
}
sgi := rpi.ShardGroupByTimestamp(timestamp)
return sgi, nil
}
// DeleteShardGroup removes a shard group from a database and retention policy by id.
func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DeleteShardGroup(database, policy, id); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
var changed bool
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly
// in the past.
// Create successive shard group.
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Printf("shard group %d exists for database %s, retention policy %s", sg.ID, di.Name, rp.Name)
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
if err != nil {
c.logger.Printf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error())
continue
}
changed = true
c.logger.Printf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name)
}
}
}
if changed {
if err := c.commit(data); err != nil {
return err
}
}
return nil
}
// ShardOwner returns the owning shard group info for a specific shard.
func (c *Client) ShardOwner(shardID uint64) (database, policy string, sgi *ShardGroupInfo) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, dbi := range data.Databases {
for _, rpi := range dbi.RetentionPolicies {
for _, g := range rpi.ShardGroups {
if g.Deleted() {
continue
}
for _, sh := range g.Shards {
if sh.ID == shardID {
database = dbi.Name
policy = rpi.Name
sgi = &g
return
}
}
}
}
}
return
}
func (c *Client) CreateContinuousQuery(database, name, query string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.CreateContinuousQuery(database, name, query); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) DropContinuousQuery(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropContinuousQuery(database, name); err != nil {
return nil
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) CreateSubscription(database, rp, name, mode string, destinations []string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.CreateSubscription(database, rp, name, mode, destinations); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) DropSubscription(database, rp, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropSubscription(database, rp, name); err != nil {
return err
}
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetData(data *Data) error {
c.mu.Lock()
// reset the index so the commit will fire a change event
c.cacheData.Index = 0
// increment the index to force the changed channel to fire
d := data.Clone()
d.Index++
if err := c.commit(d); err != nil {
return err
}
c.mu.Unlock()
return nil
}
func (c *Client) Data() Data {
c.mu.RLock()
defer c.mu.RUnlock()
d := c.cacheData.Clone()
return *d
}
// WaitForDataChanged will return a channel that will get closed when
// the metastore data has changed
func (c *Client) WaitForDataChanged() chan struct{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.changed
}
// commit assumes it is under a full lock
func (c *Client) commit(data *Data) error {
data.Index++
// try to write to disk before updating in memory
if err := snapshot(c.path, data); err != nil {
return err
}
// update in memory
c.cacheData = data
// close channels to signal changes
close(c.changed)
c.changed = make(chan struct{})
return nil
}
func (c *Client) MarshalBinary() ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cacheData.MarshalBinary()
}
// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (c *Client) SetLogOutput(w io.Writer) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger = log.New(w, "[metaclient] ", log.LstdFlags)
}
func (c *Client) updateAuthCache() {
// copy cached user info for still-present users
newCache := make(map[string]authUser, len(c.authCache))
for _, userInfo := range c.cacheData.Users {
if cached, ok := c.authCache[userInfo.Name]; ok {
if cached.bhash == userInfo.Hash {
newCache[userInfo.Name] = cached
}
}
}
c.authCache = newCache
}
// snapshot will save the current meta data to disk
func snapshot(path string, data *Data) error {
file := filepath.Join(path, metaFile)
tmpFile := file + "tmp"
f, err := os.Create(tmpFile)
if err != nil {
return err
}
defer f.Close()
var d []byte
if b, err := data.MarshalBinary(); err != nil {
return err
} else {
d = b
}
if _, err := f.Write(d); err != nil {
return err
}
if err = f.Sync(); err != nil {
return err
}
//close file handle before renaming to support Windows
if err = f.Close(); err != nil {
return err
}
return renameFile(tmpFile, file)
}
// Load will save the current meta data from disk
func (c *Client) Load() error {
file := filepath.Join(c.path, metaFile)
f, err := os.Open(file)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer f.Close()
data, err := ioutil.ReadAll(f)
if err != nil {
return err
}
if err := c.cacheData.UnmarshalBinary(data); err != nil {
return err
}
return nil
}
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }