Merge pull request #403 from influxdata/bugfix/tr-singular-default-source
Make API guarantee one default source at all timespull/10616/head
commit
6484509b56
184
bolt/sources.go
184
bolt/sources.go
|
@ -21,14 +21,9 @@ type SourcesStore struct {
|
|||
func (s *SourcesStore) All(ctx context.Context) ([]chronograf.Source, error) {
|
||||
var srcs []chronograf.Source
|
||||
if err := s.client.db.View(func(tx *bolt.Tx) error {
|
||||
if err := tx.Bucket(SourcesBucket).ForEach(func(k, v []byte) error {
|
||||
var src chronograf.Source
|
||||
if err := internal.UnmarshalSource(v, &src); err != nil {
|
||||
return err
|
||||
}
|
||||
srcs = append(srcs, src)
|
||||
return nil
|
||||
}); err != nil {
|
||||
var err error
|
||||
srcs, err = s.all(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -42,20 +37,16 @@ func (s *SourcesStore) All(ctx context.Context) ([]chronograf.Source, error) {
|
|||
|
||||
// Add creates a new Source in the SourceStore.
|
||||
func (s *SourcesStore) Add(ctx context.Context, src chronograf.Source) (chronograf.Source, error) {
|
||||
if err := s.client.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(SourcesBucket)
|
||||
seq, err := b.NextSequence()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
src.ID = int(seq)
|
||||
|
||||
if v, err := internal.MarshalSource(src); err != nil {
|
||||
return err
|
||||
} else if err := b.Put(itob(src.ID), v); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
// force first source added to be default
|
||||
if srcs, err := s.All(ctx); err != nil {
|
||||
return chronograf.Source{}, err
|
||||
} else if len(srcs) == 0 {
|
||||
src.Default = true
|
||||
}
|
||||
|
||||
if err := s.client.db.Update(func(tx *bolt.Tx) error {
|
||||
return s.add(ctx, &src, tx)
|
||||
}); err != nil {
|
||||
return chronograf.Source{}, err
|
||||
}
|
||||
|
@ -66,10 +57,10 @@ func (s *SourcesStore) Add(ctx context.Context, src chronograf.Source) (chronogr
|
|||
// Delete removes the Source from the SourcesStore
|
||||
func (s *SourcesStore) Delete(ctx context.Context, src chronograf.Source) error {
|
||||
if err := s.client.db.Update(func(tx *bolt.Tx) error {
|
||||
if err := tx.Bucket(SourcesBucket).Delete(itob(src.ID)); err != nil {
|
||||
if err := s.setRandomDefault(ctx, src, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.delete(ctx, src, tx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -81,9 +72,9 @@ func (s *SourcesStore) Delete(ctx context.Context, src chronograf.Source) error
|
|||
func (s *SourcesStore) Get(ctx context.Context, id int) (chronograf.Source, error) {
|
||||
var src chronograf.Source
|
||||
if err := s.client.db.View(func(tx *bolt.Tx) error {
|
||||
if v := tx.Bucket(SourcesBucket).Get(itob(id)); v == nil {
|
||||
return chronograf.ErrSourceNotFound
|
||||
} else if err := internal.UnmarshalSource(v, &src); err != nil {
|
||||
var err error
|
||||
src, err = s.get(ctx, id, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -97,21 +88,138 @@ func (s *SourcesStore) Get(ctx context.Context, id int) (chronograf.Source, erro
|
|||
// Update a Source
|
||||
func (s *SourcesStore) Update(ctx context.Context, src chronograf.Source) error {
|
||||
if err := s.client.db.Update(func(tx *bolt.Tx) error {
|
||||
// Get an existing soource with the same ID.
|
||||
b := tx.Bucket(SourcesBucket)
|
||||
if v := b.Get(itob(src.ID)); v == nil {
|
||||
return chronograf.ErrSourceNotFound
|
||||
}
|
||||
|
||||
if v, err := internal.MarshalSource(src); err != nil {
|
||||
return err
|
||||
} else if err := b.Put(itob(src.ID), v); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return s.update(ctx, src, tx)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SourcesStore) all(ctx context.Context, tx *bolt.Tx) ([]chronograf.Source, error) {
|
||||
var srcs []chronograf.Source
|
||||
if err := tx.Bucket(SourcesBucket).ForEach(func(k, v []byte) error {
|
||||
var src chronograf.Source
|
||||
if err := internal.UnmarshalSource(v, &src); err != nil {
|
||||
return err
|
||||
}
|
||||
srcs = append(srcs, src)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return srcs, err
|
||||
}
|
||||
return srcs, nil
|
||||
}
|
||||
|
||||
func (s *SourcesStore) add(ctx context.Context, src *chronograf.Source, tx *bolt.Tx) error {
|
||||
b := tx.Bucket(SourcesBucket)
|
||||
seq, err := b.NextSequence()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
src.ID = int(seq)
|
||||
|
||||
if src.Default {
|
||||
if err := s.resetDefaultSource(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if v, err := internal.MarshalSource(*src); err != nil {
|
||||
return err
|
||||
} else if err := b.Put(itob(src.ID), v); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SourcesStore) delete(ctx context.Context, src chronograf.Source, tx *bolt.Tx) error {
|
||||
if err := tx.Bucket(SourcesBucket).Delete(itob(src.ID)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SourcesStore) get(ctx context.Context, id int, tx *bolt.Tx) (chronograf.Source, error) {
|
||||
var src chronograf.Source
|
||||
if v := tx.Bucket(SourcesBucket).Get(itob(id)); v == nil {
|
||||
return src, chronograf.ErrSourceNotFound
|
||||
} else if err := internal.UnmarshalSource(v, &src); err != nil {
|
||||
return src, err
|
||||
}
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func (s *SourcesStore) update(ctx context.Context, src chronograf.Source, tx *bolt.Tx) error {
|
||||
// Get an existing soource with the same ID.
|
||||
b := tx.Bucket(SourcesBucket)
|
||||
if v := b.Get(itob(src.ID)); v == nil {
|
||||
return chronograf.ErrSourceNotFound
|
||||
}
|
||||
|
||||
if src.Default {
|
||||
if err := s.resetDefaultSource(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if v, err := internal.MarshalSource(src); err != nil {
|
||||
return err
|
||||
} else if err := b.Put(itob(src.ID), v); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// resetDefaultSource unsets the Default flag on all sources
|
||||
func (s *SourcesStore) resetDefaultSource(ctx context.Context, tx *bolt.Tx) error {
|
||||
b := tx.Bucket(SourcesBucket)
|
||||
srcs, err := s.all(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, other := range srcs {
|
||||
if other.Default {
|
||||
other.Default = false
|
||||
if v, err := internal.MarshalSource(other); err != nil {
|
||||
return err
|
||||
} else if err := b.Put(itob(other.ID), v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// setRandomDefault will locate a source other than the provided
|
||||
// chronograf.Source and set it as the default source. If no other sources are
|
||||
// available, the provided source will be set to the default source if is not
|
||||
// already. It assumes that the provided chronograf.Source has been persisted.
|
||||
func (s *SourcesStore) setRandomDefault(ctx context.Context, src chronograf.Source, tx *bolt.Tx) error {
|
||||
// Check if requested source is the current default
|
||||
if target, err := s.get(ctx, src.ID, tx); err != nil {
|
||||
return err
|
||||
} else if target.Default {
|
||||
// Locate another source to be the new default
|
||||
if srcs, err := s.all(ctx, tx); err != nil {
|
||||
return err
|
||||
} else {
|
||||
var other *chronograf.Source
|
||||
for idx, _ := range srcs {
|
||||
other = &srcs[idx]
|
||||
// avoid selecting the source we're about to delete as the new default
|
||||
if other.ID != target.ID {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// set the other to be the default
|
||||
other.Default = true
|
||||
if err := s.update(ctx, *other, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/bolt"
|
||||
)
|
||||
|
||||
// Ensure an SourceStore can store, retrieve, update, and delete sources.
|
||||
|
@ -54,11 +55,8 @@ func TestSourceStore(t *testing.T) {
|
|||
// Update source.
|
||||
srcs[0].Username = "calvinklein"
|
||||
srcs[1].Name = "Enchantment Under the Sea Dance"
|
||||
if err := s.Update(nil, srcs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if err := s.Update(nil, srcs[1]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mustUpdateSource(t, s, srcs[0])
|
||||
mustUpdateSource(t, s, srcs[1])
|
||||
|
||||
// Confirm sources have updated.
|
||||
if src, err := s.Get(nil, srcs[0].ID); err != nil {
|
||||
|
@ -72,6 +70,44 @@ func TestSourceStore(t *testing.T) {
|
|||
t.Fatalf("source 1 update error: got %v, expected %v", src.Name, "Enchantment Under the Sea Dance")
|
||||
}
|
||||
|
||||
// Attempt to make two default sources
|
||||
srcs[0].Default = true
|
||||
srcs[1].Default = true
|
||||
mustUpdateSource(t, s, srcs[0])
|
||||
mustUpdateSource(t, s, srcs[1])
|
||||
|
||||
if actual, err := s.Get(nil, srcs[0].ID); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if actual.Default == true {
|
||||
t.Fatal("Able to set two default sources when only one should be permitted")
|
||||
}
|
||||
|
||||
// Attempt to add a new default source
|
||||
srcs = append(srcs, chronograf.Source{
|
||||
Name: "Biff Tannen",
|
||||
Type: "influx",
|
||||
Username: "HELLO",
|
||||
Password: "MCFLY",
|
||||
URL: "anybody.in.there.local",
|
||||
Default: true,
|
||||
})
|
||||
|
||||
srcs[2] = mustAddSource(t, s, srcs[2])
|
||||
if srcs, err := s.All(nil); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
defaults := 0
|
||||
for _, src := range srcs {
|
||||
if src.Default {
|
||||
defaults++
|
||||
}
|
||||
}
|
||||
|
||||
if defaults != 1 {
|
||||
t.Fatal("Able to add more than one default source")
|
||||
}
|
||||
}
|
||||
|
||||
// Delete an source.
|
||||
if err := s.Delete(nil, srcs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -82,6 +118,11 @@ func TestSourceStore(t *testing.T) {
|
|||
t.Fatalf("source delete error: got %v, expected %v", err, chronograf.ErrSourceNotFound)
|
||||
}
|
||||
|
||||
// Delete the other source we created
|
||||
if err := s.Delete(nil, srcs[2]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if bsrcs, err := s.All(nil); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(bsrcs) != 1 {
|
||||
|
@ -89,4 +130,41 @@ func TestSourceStore(t *testing.T) {
|
|||
} else if !reflect.DeepEqual(bsrcs[0], srcs[1]) {
|
||||
t.Fatalf("After delete All returned incorrect source; got %v, expected %v", bsrcs[0], srcs[1])
|
||||
}
|
||||
|
||||
// Delete the final source
|
||||
if err := s.Delete(nil, srcs[1]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Try to add one source as a non-default and ensure that it becomes a
|
||||
// default
|
||||
src := mustAddSource(t, s, chronograf.Source{
|
||||
Name: "Biff Tannen",
|
||||
Type: "influx",
|
||||
Username: "HELLO",
|
||||
Password: "MCFLY",
|
||||
URL: "anybody.in.there.local",
|
||||
Default: false,
|
||||
})
|
||||
|
||||
if actual, err := s.Get(nil, src.ID); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !actual.Default {
|
||||
t.Fatal("Expected first source added to be default but wasn't")
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateSource(t *testing.T, s *bolt.SourcesStore, src chronograf.Source) {
|
||||
if err := s.Update(nil, src); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustAddSource(t *testing.T, s *bolt.SourcesStore, src chronograf.Source) chronograf.Source {
|
||||
if src, err := s.Add(nil, src); err != nil {
|
||||
t.Fatal(err)
|
||||
return src
|
||||
} else {
|
||||
return src
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue