185 lines
5.3 KiB
Go
185 lines
5.3 KiB
Go
package kv
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
|
|
"github.com/influxdata/chronograf"
|
|
"github.com/influxdata/chronograf/id"
|
|
"github.com/influxdata/chronograf/mocks"
|
|
)
|
|
|
|
var _ chronograf.KVClient = (*Service)(nil)
|
|
|
|
var (
|
|
cellBucket = []byte("cellsv2")
|
|
configBucket = []byte("ConfigV1")
|
|
dashboardsBucket = []byte("Dashoard") // keep spelling for backwards compat
|
|
mappingsBucket = []byte("MappingsV1")
|
|
organizationConfigBucket = []byte("OrganizationConfigV1")
|
|
organizationsBucket = []byte("OrganizationsV1")
|
|
serversBucket = []byte("Servers")
|
|
sourcesBucket = []byte("Sources")
|
|
usersBucket = []byte("UsersV2")
|
|
)
|
|
|
|
// Store is an interface for a generic key value store. It is modeled after
|
|
// the boltdb database struct.
|
|
type Store interface {
|
|
// View opens up a transaction that will not write to any data. Implementing interfaces
|
|
// should take care to ensure that all view transactions do not mutate any data.
|
|
View(context.Context, func(Tx) error) error
|
|
// Update opens up a transaction that will mutate data.
|
|
Update(context.Context, func(Tx) error) error
|
|
// Close closes the connection to the db.
|
|
Close() error
|
|
}
|
|
|
|
// Tx is a transaction in the store.
|
|
type Tx interface {
|
|
// Bucket creates and returns bucket, b.
|
|
Bucket(b []byte) Bucket
|
|
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
|
|
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
|
// The bucket instance is only valid for the lifetime of the transaction.
|
|
CreateBucketIfNotExists(b []byte) (Bucket, error)
|
|
}
|
|
|
|
// Bucket is the abstraction used to perform get/put/delete/get-many operations
|
|
// in a key value store.
|
|
type Bucket interface {
|
|
// Get returns a key within this bucket. Errors if key does not exist.
|
|
Get(key []byte) ([]byte, error)
|
|
// Put should error if the transaction it was called in is not writable.
|
|
Put(key, value []byte) error
|
|
// Delete should error if the transaction it was called in is not writable.
|
|
Delete(key []byte) error
|
|
// NextSequence returns a unique id for the bucket.
|
|
NextSequence() (uint64, error)
|
|
// ForEach executes a function for each key/value pair in a bucket.
|
|
// If the provided function returns an error then the iteration is stopped and
|
|
// the error is returned to the caller. The provided function must not modify
|
|
// the bucket; this will result in undefined behavior.
|
|
ForEach(fn func(k, v []byte) error) error
|
|
}
|
|
|
|
// Service is the struct that chronograf services are implemented on.
|
|
type Service struct {
|
|
kv Store
|
|
log chronograf.Logger
|
|
}
|
|
|
|
// Option to change behavior of Open()
|
|
type Option func(s *Service) error
|
|
|
|
// WithLogger allows setting the logger on the kv service.
|
|
func WithLogger(logger chronograf.Logger) Option {
|
|
return func(s *Service) error {
|
|
s.log = logger
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewService returns an instance of a Service.
|
|
func NewService(ctx context.Context, kv Store, opts ...Option) (*Service, error) {
|
|
s := &Service{
|
|
log: mocks.NewLogger(),
|
|
kv: kv,
|
|
}
|
|
|
|
for i := range opts {
|
|
if err := opts[i](s); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := s.kv.Update(ctx, func(tx Tx) error {
|
|
return s.initialize(ctx, tx)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s, s.OrganizationsStore().CreateDefault(ctx)
|
|
}
|
|
|
|
// Close closes the service's kv store.
|
|
func (s *Service) Close() error {
|
|
return s.kv.Close()
|
|
}
|
|
|
|
func (s *Service) initialize(ctx context.Context, tx Tx) error {
|
|
buckets := [][]byte{
|
|
cellBucket,
|
|
configBucket,
|
|
dashboardsBucket,
|
|
mappingsBucket,
|
|
organizationConfigBucket,
|
|
organizationsBucket,
|
|
serversBucket,
|
|
sourcesBucket,
|
|
usersBucket,
|
|
}
|
|
|
|
for i := range buckets {
|
|
if _, err := tx.CreateBucketIfNotExists(buckets[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// itob returns an 8-byte big endian representation of v.
|
|
func itob(v int) []byte {
|
|
b := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(b, uint64(v))
|
|
return b
|
|
}
|
|
|
|
// u64tob returns an 8-byte big endian representation of v.
|
|
func u64tob(v uint64) []byte {
|
|
b := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(b, v)
|
|
return b
|
|
}
|
|
|
|
// ConfigStore returns a chronograf.ConfigStore.
|
|
func (s *Service) ConfigStore() chronograf.ConfigStore {
|
|
return &configStore{client: s}
|
|
}
|
|
|
|
// DashboardsStore returns a chronograf.DashboardsStore.
|
|
func (s *Service) DashboardsStore() chronograf.DashboardsStore {
|
|
return &dashboardsStore{client: s, IDs: &id.UUID{}}
|
|
}
|
|
|
|
// MappingsStore returns a chronograf.MappingsStore.
|
|
func (s *Service) MappingsStore() chronograf.MappingsStore {
|
|
return &mappingsStore{client: s}
|
|
}
|
|
|
|
// OrganizationConfigStore returns a chronograf.OrganizationConfigStore.
|
|
func (s *Service) OrganizationConfigStore() chronograf.OrganizationConfigStore {
|
|
return &organizationConfigStore{client: s}
|
|
}
|
|
|
|
// OrganizationsStore returns a chronograf.OrganizationsStore.
|
|
func (s *Service) OrganizationsStore() chronograf.OrganizationsStore {
|
|
return &organizationsStore{client: s}
|
|
}
|
|
|
|
// ServersStore returns a chronograf.ServersStore.
|
|
func (s *Service) ServersStore() chronograf.ServersStore {
|
|
return &serversStore{client: s}
|
|
}
|
|
|
|
// SourcesStore returns a chronograf.SourcesStore.
|
|
func (s *Service) SourcesStore() chronograf.SourcesStore {
|
|
return &sourcesStore{client: s}
|
|
}
|
|
|
|
// UsersStore returns a chronograf.UsersStore.
|
|
func (s *Service) UsersStore() chronograf.UsersStore {
|
|
return &usersStore{client: s}
|
|
}
|