influxdb/services/meta/client.go

1022 lines
22 KiB
Go
Raw Normal View History

2015-12-23 15:48:25 +00:00
package meta
import (
"bytes"
2016-01-04 22:37:20 +00:00
crand "crypto/rand"
"crypto/sha256"
2016-01-06 15:11:59 +00:00
"errors"
"fmt"
2016-01-04 22:37:20 +00:00
"io"
"io/ioutil"
"log"
2016-01-07 22:52:53 +00:00
"math/rand"
"net/http"
"os"
"path/filepath"
2016-02-04 15:12:52 +00:00
"sort"
"sync"
2015-12-23 15:48:25 +00:00
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"golang.org/x/crypto/bcrypt"
2015-12-23 15:48:25 +00:00
)
const (
// errSleep is the time to sleep after we've failed on every metaserver
// before making another pass
errSleep = time.Second
// maxRetries is the maximum number of attemps to make before returning
// a failure to the caller
maxRetries = 10
2016-01-04 22:37:20 +00:00
// SaltBytes is the number of bytes used for salts
SaltBytes = 32
metaFile = "meta.db"
)
2016-01-11 01:43:05 +00:00
var (
// ErrServiceUnavailable is returned when the meta service is unavailable.
ErrServiceUnavailable = errors.New("meta service unavailable")
2016-01-19 20:49:32 +00:00
// ErrService is returned when the meta service returns an error.
ErrService = errors.New("meta service error")
2016-01-11 01:43:05 +00:00
)
2015-12-31 23:37:27 +00:00
// Client is used to execute commands on and read data from
// a meta service cluster.
2015-12-23 15:48:25 +00:00
type Client struct {
logger *log.Logger
mu sync.RWMutex
closing chan struct{}
changed chan struct{}
cacheData *Data
2016-01-04 22:37:20 +00:00
// Authentication cache.
authCache map[string]authUser
path string
retentionAutoCreate bool
}
2016-01-04 22:37:20 +00:00
type authUser struct {
bhash string
salt []byte
hash []byte
}
2015-12-31 23:37:27 +00:00
// NewClient returns a new *Client.
func NewClient(config *Config) *Client {
return &Client{
cacheData: &Data{
ClusterID: uint64(uint64(rand.Int63())),
Index: 1,
},
closing: make(chan struct{}),
changed: make(chan struct{}),
logger: log.New(os.Stderr, "[metaclient] ", log.LstdFlags),
authCache: make(map[string]authUser, 0),
path: config.Dir,
retentionAutoCreate: config.RetentionAutoCreate,
}
}
2015-12-31 23:37:27 +00:00
// Open a connection to a meta service cluster.
func (c *Client) Open() error {
c.mu.Lock()
defer c.mu.Unlock()
// Try to load from disk
if err := c.Load(); err != nil {
return err
}
// If this is a brand new instance, persist to disk immediatly.
if c.cacheData.Index == 1 {
2016-03-14 15:51:12 +00:00
if err := snapshot(c.path, c.cacheData); err != nil {
return err
}
}
return nil
2015-12-23 15:48:25 +00:00
}
2015-12-31 23:37:27 +00:00
// Close the meta service cluster connection.
func (c *Client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if t, ok := http.DefaultTransport.(*http.Transport); ok {
t.CloseIdleConnections()
}
select {
case <-c.closing:
return nil
default:
close(c.closing)
}
return nil
2015-12-23 15:48:25 +00:00
}
2016-01-09 04:29:48 +00:00
// AcquireLease attempts to acquire the specified lease.
// TODO corylanou remove this for single node
func (c *Client) AcquireLease(name string) (*Lease, error) {
l := Lease{
Name: name,
Expiration: time.Now().Add(DefaultLeaseDuration),
2016-01-09 04:29:48 +00:00
}
return &l, nil
2016-01-09 04:29:48 +00:00
}
func (c *Client) data() *Data {
c.mu.RLock()
defer c.mu.RUnlock()
data := c.cacheData.Clone()
return data
}
2015-12-31 23:37:27 +00:00
// ClusterID returns the ID of the cluster it's connected to.
2016-01-07 22:52:53 +00:00
func (c *Client) ClusterID() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cacheData.ClusterID
2015-12-23 15:48:25 +00:00
}
// Database returns info for the requested database.
2015-12-23 15:48:25 +00:00
func (c *Client) Database(name string) (*DatabaseInfo, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, d := range data.Databases {
if d.Name == name {
return &d, nil
}
}
return nil, influxdb.ErrDatabaseNotFound(name)
2015-12-23 15:48:25 +00:00
}
// Databases returns a list of all database infos.
2015-12-23 15:48:25 +00:00
func (c *Client) Databases() ([]DatabaseInfo, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
dbs := data.Databases
if dbs == nil {
return []DatabaseInfo{}, nil
}
return dbs, nil
2015-12-23 15:48:25 +00:00
}
// CreateDatabase creates a database or returns it if it already exists
2015-12-31 23:37:27 +00:00
func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if db := data.Database(name); db != nil {
return db, nil
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
// create default retention policy
if c.retentionAutoCreate {
if err := data.CreateRetentionPolicy(name, &RetentionPolicyInfo{
Name: "default",
ReplicaN: 1,
}); err != nil {
return nil, err
}
if err := data.SetDefaultRetentionPolicy(name, "default"); err != nil {
return nil, err
}
}
db := data.Database(name)
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
// CreateDatabaseWithRetentionPolicy creates a database with the specified retention policy.
2015-12-31 23:37:27 +00:00
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if rpi.Duration < MinRetentionPolicyDuration && rpi.Duration != 0 {
return nil, ErrRetentionPolicyDurationTooLow
}
if db := data.Database(name); db != nil {
// Check if the retention policy already exists. If it does and matches
// the desired retention policy, exit with no error.
if rp := db.RetentionPolicy(rpi.Name); rp != nil {
if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
return nil, ErrRetentionPolicyConflict
}
return db, nil
}
}
if err := data.CreateDatabase(name); err != nil {
return nil, err
}
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
return nil, err
}
if err := data.SetDefaultRetentionPolicy(name, rpi.Name); err != nil {
return nil, err
}
db := data.Database(name)
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
// DropDatabase deletes a database.
func (c *Client) DropDatabase(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropDatabase(name); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
// CreateRetentionPolicy creates a retention policy on the specified database.
2015-12-31 23:37:27 +00:00
func (c *Client) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) (*RetentionPolicyInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if rpi.Duration < MinRetentionPolicyDuration && rpi.Duration != 0 {
return nil, ErrRetentionPolicyDurationTooLow
}
if err := data.CreateRetentionPolicy(database, rpi); err != nil {
return nil, err
}
rp, err := data.RetentionPolicy(database, rpi.Name)
if err != nil {
return nil, err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return nil, err
}
return rp, nil
2015-12-23 15:48:25 +00:00
}
// RetentionPolicy returns the requested retention policy info.
func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInfo, err error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
db := data.Database(database)
if db == nil {
return nil, influxdb.ErrDatabaseNotFound(database)
}
return db.RetentionPolicy(name), nil
2015-12-23 15:48:25 +00:00
}
// DropRetentionPolicy drops a retention policy from a database.
2015-12-31 23:37:27 +00:00
func (c *Client) DropRetentionPolicy(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropRetentionPolicy(database, name); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
2015-12-23 15:48:25 +00:00
}
2015-12-31 23:37:27 +00:00
// SetDefaultRetentionPolicy sets a database's default retention policy.
2015-12-23 15:48:25 +00:00
func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetDefaultRetentionPolicy(database, name); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
2015-12-23 15:48:25 +00:00
}
2015-12-31 23:37:27 +00:00
// UpdateRetentionPolicy updates a retention policy.
func (c *Client) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.UpdateRetentionPolicy(database, name, rpu); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
2015-12-23 15:48:25 +00:00
}
2016-01-04 22:37:20 +00:00
func (c *Client) Users() []UserInfo {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
users := data.Users
2016-01-04 22:37:20 +00:00
if users == nil {
2016-01-04 22:37:20 +00:00
return []UserInfo{}
}
return users
2015-12-23 15:48:25 +00:00
}
func (c *Client) User(name string) (*UserInfo, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, u := range data.Users {
2016-01-04 22:37:20 +00:00
if u.Name == name {
return &u, nil
}
}
return nil, ErrUserNotFound
}
// bcryptCost is the cost associated with generating password with bcrypt.
2016-01-04 22:37:20 +00:00
// This setting is lowered during testing to improve test suite performance.
var bcryptCost = bcrypt.DefaultCost
2016-01-04 22:37:20 +00:00
// hashWithSalt returns a salted hash of password using salt
func (c *Client) hashWithSalt(salt []byte, password string) []byte {
2016-01-04 22:37:20 +00:00
hasher := sha256.New()
hasher.Write(salt)
hasher.Write([]byte(password))
return hasher.Sum(nil)
2016-01-04 22:37:20 +00:00
}
// saltedHash returns a salt and salted hash of password
func (c *Client) saltedHash(password string) (salt, hash []byte, err error) {
salt = make([]byte, SaltBytes)
if _, err := io.ReadFull(crand.Reader, salt); err != nil {
return nil, nil, err
2016-01-04 22:37:20 +00:00
}
return salt, c.hashWithSalt(salt, password), nil
2015-12-23 15:48:25 +00:00
}
func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
2016-03-14 22:40:25 +00:00
// See if the user already exists.
if u := data.User(name); u != nil {
if err := bcrypt.CompareHashAndPassword([]byte(u.Hash), []byte(password)); err != nil || u.Admin != admin {
return nil, ErrUserExists
}
return u, nil
}
2016-01-04 22:37:20 +00:00
// Hash the password before serializing it.
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
2016-01-04 22:37:20 +00:00
if err != nil {
return nil, err
}
if err := data.CreateUser(name, string(hash), admin); err != nil {
2016-01-04 22:37:20 +00:00
return nil, err
}
u := data.User(name)
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return nil, err
}
return u, nil
}
func (c *Client) UpdateUser(name, password string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
2016-01-04 22:37:20 +00:00
// Hash the password before serializing it.
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcryptCost)
2016-01-04 22:37:20 +00:00
if err != nil {
return err
}
if err := data.UpdateUser(name, string(hash)); err != nil {
return nil
}
delete(c.authCache, name)
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) DropUser(name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropUser(name); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetPrivilege(username, database string, p influxql.Privilege) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetPrivilege(username, database, p); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetAdminPrivilege(username string, admin bool) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.SetAdminPrivilege(username, admin); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) UserPrivileges(username string) (map[string]influxql.Privilege, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
p, err := data.UserPrivileges(username)
2016-01-04 22:37:20 +00:00
if err != nil {
return nil, err
}
return p, nil
}
func (c *Client) UserPrivilege(username, database string) (*influxql.Privilege, error) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
p, err := data.UserPrivilege(username, database)
2016-01-04 22:37:20 +00:00
if err != nil {
return nil, err
}
return p, nil
}
2016-01-04 22:37:20 +00:00
func (c *Client) AdminUserExists() bool {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, u := range data.Users {
2016-01-04 22:37:20 +00:00
if u.Admin {
return true
}
}
return false
2015-12-23 15:48:25 +00:00
}
func (c *Client) Authenticate(username, password string) (*UserInfo, error) {
c.mu.RLock()
defer c.mu.RUnlock()
data := c.cacheData.Clone()
2016-01-04 22:37:20 +00:00
// Find user.
userInfo := data.User(username)
if userInfo == nil {
2016-01-04 22:37:20 +00:00
return nil, ErrUserNotFound
}
// Check the local auth cache first.
if au, ok := c.authCache[username]; ok {
// verify the password using the cached salt and hash
if bytes.Equal(c.hashWithSalt(au.salt, password), au.hash) {
return userInfo, nil
2016-01-04 22:37:20 +00:00
}
// fall through to requiring a full bcrypt hash for invalid passwords
2016-01-04 22:37:20 +00:00
}
// Compare password with user hash.
if err := bcrypt.CompareHashAndPassword([]byte(userInfo.Hash), []byte(password)); err != nil {
2016-01-04 22:37:20 +00:00
return nil, ErrAuthenticate
}
// generate a salt and hash of the password for the cache
salt, hashed, err := c.saltedHash(password)
if err != nil {
return nil, err
}
c.authCache[username] = authUser{salt: salt, hash: hashed, bhash: userInfo.Hash}
2016-01-04 22:37:20 +00:00
return userInfo, nil
2015-12-23 15:48:25 +00:00
}
2016-01-04 22:37:20 +00:00
func (c *Client) UserCount() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.cacheData.Users)
2015-12-23 15:48:25 +00:00
}
2016-02-04 15:12:52 +00:00
// ShardIDs returns a list of all shard ids.
func (c *Client) ShardIDs() []uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
2016-02-04 15:12:52 +00:00
var a []uint64
for _, dbi := range c.cacheData.Databases {
2016-02-04 15:12:52 +00:00
for _, rpi := range dbi.RetentionPolicies {
for _, sgi := range rpi.ShardGroups {
for _, si := range sgi.Shards {
a = append(a, si.ID)
}
}
}
}
sort.Sort(uint64Slice(a))
return a
}
2016-01-06 15:11:59 +00:00
// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
// for the specified time range. Shard groups are sorted by start time.
2015-12-23 15:48:25 +00:00
func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []ShardGroupInfo, err error) {
c.mu.RLock()
defer c.mu.RUnlock()
2016-01-06 15:11:59 +00:00
// Find retention policy.
rpi, err := c.cacheData.RetentionPolicy(database, policy)
2016-01-06 15:11:59 +00:00
if err != nil {
return nil, err
} else if rpi == nil {
return nil, influxdb.ErrRetentionPolicyNotFound(policy)
}
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
for _, g := range rpi.ShardGroups {
if g.Deleted() || !g.Overlaps(min, max) {
continue
}
groups = append(groups, g)
}
return groups, nil
2015-12-23 15:48:25 +00:00
}
// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []ShardInfo, err error) {
m := make(map[*ShardInfo]struct{})
2015-11-04 21:06:06 +00:00
for _, src := range sources {
mm, ok := src.(*influxql.Measurement)
if !ok {
return nil, fmt.Errorf("invalid source type: %#v", src)
}
groups, err := c.ShardGroupsByTimeRange(mm.Database, mm.RetentionPolicy, tmin, tmax)
if err != nil {
return nil, err
}
for _, g := range groups {
for i := range g.Shards {
m[&g.Shards[i]] = struct{}{}
2015-11-04 21:06:06 +00:00
}
}
}
a = make([]ShardInfo, 0, len(m))
for sh := range m {
a = append(a, *sh)
2015-11-04 21:06:06 +00:00
}
return a, nil
}
2016-03-11 15:53:15 +00:00
// DropShard deletes a shard by ID.
func (c *Client) DropShard(id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
data.DropShard(id)
return c.commit(data)
}
2016-01-06 15:11:59 +00:00
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
sgi, err := createShardGroup(data, database, policy, timestamp)
if err != nil {
return nil, err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return nil, err
}
return sgi, nil
}
func createShardGroup(data *Data, database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return sg, nil
2016-01-06 15:11:59 +00:00
}
if err := data.CreateShardGroup(database, policy, timestamp); err != nil {
2016-01-06 15:11:59 +00:00
return nil, err
}
rpi, err := data.RetentionPolicy(database, policy)
2016-01-06 15:11:59 +00:00
if err != nil {
return nil, err
} else if rpi == nil {
return nil, errors.New("retention policy deleted after shard group created")
}
sgi := rpi.ShardGroupByTimestamp(timestamp)
return sgi, nil
2015-12-23 15:48:25 +00:00
}
2016-01-06 15:11:59 +00:00
// DeleteShardGroup removes a shard group from a database and retention policy by id.
2015-12-23 15:48:25 +00:00
func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DeleteShardGroup(database, policy, id); err != nil {
return err
2016-01-06 15:11:59 +00:00
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
2015-12-23 15:48:25 +00:00
}
2016-01-06 15:11:59 +00:00
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
2015-12-23 15:48:25 +00:00
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
for _, di := range data.Databases {
2016-01-06 15:11:59 +00:00
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly
// in the past.
// Create successive shard group.
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
if newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime); err != nil {
2016-01-06 15:11:59 +00:00
c.logger.Printf("failed to precreate successive shard group for group %d: %s", g.ID, err.Error())
} else {
c.logger.Printf("new shard group %d successfully precreated for database %s, retention policy %s", newGroup.ID, di.Name, rp.Name)
}
}
}
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
2015-12-23 15:48:25 +00:00
return nil
}
2016-01-06 15:11:59 +00:00
// ShardOwner returns the owning shard group info for a specific shard.
func (c *Client) ShardOwner(shardID uint64) (database, policy string, sgi *ShardGroupInfo) {
c.mu.RLock()
data := c.cacheData.Clone()
c.mu.RUnlock()
for _, dbi := range data.Databases {
2016-01-06 15:11:59 +00:00
for _, rpi := range dbi.RetentionPolicies {
for _, g := range rpi.ShardGroups {
if g.Deleted() {
continue
}
for _, sh := range g.Shards {
if sh.ID == shardID {
database = dbi.Name
policy = rpi.Name
sgi = &g
return
}
}
}
}
}
return
2015-12-23 15:48:25 +00:00
}
func (c *Client) CreateContinuousQuery(database, name, query string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.CreateContinuousQuery(database, name, query); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
2016-01-25 04:56:06 +00:00
func (c *Client) DropContinuousQuery(database, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
2016-01-25 04:56:06 +00:00
data := c.cacheData.Clone()
if err := data.DropContinuousQuery(database, name); err != nil {
return nil
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) CreateSubscription(database, rp, name, mode string, destinations []string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
2016-01-09 04:29:48 +00:00
if err := data.CreateSubscription(database, rp, name, mode, destinations); err != nil {
return err
2016-01-09 04:29:48 +00:00
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) DropSubscription(database, rp, name string) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
if err := data.DropSubscription(database, rp, name); err != nil {
return err
}
2016-03-14 15:51:12 +00:00
if err := c.commit(data); err != nil {
return err
}
return nil
}
func (c *Client) SetData(data *Data) error {
c.mu.Lock()
// reset the index so the commit will fire a change event
c.cacheData.Index = 0
2015-12-23 15:48:25 +00:00
// increment the index to force the changed channel to fire
d := data.Clone()
d.Index++
2016-03-14 15:51:12 +00:00
if err := c.commit(d); err != nil {
return err
}
2015-12-23 15:48:25 +00:00
c.mu.Unlock()
return nil
}
// WaitForDataChanged will return a channel that will get closed when
// the metastore data has changed
func (c *Client) WaitForDataChanged() chan struct{} {
c.mu.RLock()
defer c.mu.RUnlock()
return c.changed
}
// commit assumes it is under a full lock
2016-03-14 15:51:12 +00:00
func (c *Client) commit(data *Data) error {
data.Index++
2016-03-14 15:51:12 +00:00
// try to write to disk before updating in memory
if err := snapshot(c.path, data); err != nil {
return err
}
// update in memory
c.cacheData = data
2016-03-14 15:51:12 +00:00
// close channels to signal changes
close(c.changed)
c.changed = make(chan struct{})
2016-03-14 15:51:12 +00:00
return nil
}
2015-12-23 15:48:25 +00:00
func (c *Client) MarshalBinary() ([]byte, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cacheData.MarshalBinary()
}
func (c *Client) SetLogger(l *log.Logger) {
c.mu.Lock()
defer c.mu.Unlock()
c.logger = l
2015-12-23 15:48:25 +00:00
}
func (c *Client) updateAuthCache() {
// copy cached user info for still-present users
newCache := make(map[string]authUser, len(c.authCache))
for _, userInfo := range c.cacheData.Users {
if cached, ok := c.authCache[userInfo.Name]; ok {
if cached.bhash == userInfo.Hash {
newCache[userInfo.Name] = cached
}
}
}
c.authCache = newCache
}
2016-03-14 15:51:12 +00:00
// snapshot will save the current meta data to disk
func snapshot(path string, data *Data) error {
file := filepath.Join(path, metaFile)
tmpFile := file + "tmp"
2016-02-10 18:16:54 +00:00
f, err := os.Create(tmpFile)
if err != nil {
return err
}
defer f.Close()
2016-02-10 18:16:54 +00:00
2016-03-14 15:51:12 +00:00
var d []byte
if b, err := data.MarshalBinary(); err != nil {
return err
} else {
2016-03-14 15:51:12 +00:00
d = b
}
2016-02-10 18:16:54 +00:00
2016-03-14 15:51:12 +00:00
if _, err := f.Write(d); err != nil {
return err
2016-02-10 18:16:54 +00:00
}
if err = f.Close(); nil != err {
return err
2016-02-10 18:16:54 +00:00
}
return renameFile(tmpFile, file)
2016-02-10 18:16:54 +00:00
}
// Load will save the current meta data from disk
func (c *Client) Load() error {
file := filepath.Join(c.path, metaFile)
f, err := os.Open(file)
if err != nil {
if os.IsNotExist(err) {
return nil
2016-02-10 18:16:54 +00:00
}
return err
2016-02-10 18:16:54 +00:00
}
defer f.Close()
2016-02-10 18:16:54 +00:00
data, err := ioutil.ReadAll(f)
if err != nil {
return err
}
if err := c.cacheData.UnmarshalBinary(data); err != nil {
return err
}
return nil
}
2016-02-04 15:12:52 +00:00
type errCommand struct {
msg string
}
func (e errCommand) Error() string {
return e.msg
}
2016-02-04 15:12:52 +00:00
type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }