Move kapacitor multistore to the multistore package
parent
ce87c52fa9
commit
d4a8cbb87c
|
@ -7,9 +7,8 @@ import (
|
|||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
// Ensure KapacitorStore and MultiKapacitorStore implements chronograf.ServersStore.
|
||||
// Ensure KapacitorStore implements chronograf.ServersStore.
|
||||
var _ chronograf.ServersStore = &KapacitorStore{}
|
||||
var _ chronograf.ServersStore = &MultiKapacitorStore{}
|
||||
|
||||
// KapacitorStore implements the chronograf.ServersStore interface, and keeps
|
||||
// an in-memory Kapacitor according to startup configuration
|
||||
|
@ -55,90 +54,3 @@ func (store *KapacitorStore) Update(ctx context.Context, kap chronograf.Server)
|
|||
store.Kapacitor = &kap
|
||||
return nil
|
||||
}
|
||||
|
||||
// MultiKapacitorStore implements the chronograf.ServersStore interface, and
|
||||
// delegates to all contained KapacitorStores
|
||||
type MultiKapacitorStore struct {
|
||||
Stores []chronograf.ServersStore
|
||||
}
|
||||
|
||||
// All concatenates the Kapacitors of all contained Stores
|
||||
func (multi *MultiKapacitorStore) All(ctx context.Context) ([]chronograf.Server, error) {
|
||||
all := []chronograf.Server{}
|
||||
kapSet := map[int]struct{}{}
|
||||
|
||||
ok := false
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var kaps []chronograf.Server
|
||||
kaps, err = store.All(ctx)
|
||||
if err != nil {
|
||||
// If this Store is unable to return an array of kapacitors, skip to the
|
||||
// next Store.
|
||||
continue
|
||||
}
|
||||
ok = true // We've received a response from at least one Store
|
||||
for _, kap := range kaps {
|
||||
// Enforce that the kapacitor has a unique ID
|
||||
// If the ID has been seen before, ignore the kapacitor
|
||||
if _, okay := kapSet[kap.ID]; !okay { // We have a new kapacitor
|
||||
kapSet[kap.ID] = struct{}{} // We just care that the ID is unique
|
||||
all = append(all, kap)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
return nil, err
|
||||
}
|
||||
return all, nil
|
||||
}
|
||||
|
||||
// Add the kap to the first responsive Store
|
||||
func (multi *MultiKapacitorStore) Add(ctx context.Context, kap chronograf.Server) (chronograf.Server, error) {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var k chronograf.Server
|
||||
k, err = store.Add(ctx, kap)
|
||||
if err == nil {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
return chronograf.Server{}, nil
|
||||
}
|
||||
|
||||
// Delete delegates to all Stores, returns success if one Store is successful
|
||||
func (multi *MultiKapacitorStore) Delete(ctx context.Context, kap chronograf.Server) error {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
err = store.Delete(ctx, kap)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Get finds the Source by id among all contained Stores
|
||||
func (multi *MultiKapacitorStore) Get(ctx context.Context, id int) (chronograf.Server, error) {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var k chronograf.Server
|
||||
k, err = store.Get(ctx, id)
|
||||
if err == nil {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
return chronograf.Server{}, nil
|
||||
}
|
||||
|
||||
// Update the first responsive Store
|
||||
func (multi *MultiKapacitorStore) Update(ctx context.Context, kap chronograf.Server) error {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
err = store.Update(ctx, kap)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
|
||||
func TestInterfaceImplementation(t *testing.T) {
|
||||
var _ chronograf.ServersStore = &KapacitorStore{}
|
||||
var _ chronograf.ServersStore = &MultiKapacitorStore{}
|
||||
}
|
||||
|
||||
func TestKapacitorStoreAll(t *testing.T) {
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
package multistore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
// Ensure KapacitorStore implements chronograf.ServersStore.
|
||||
var _ chronograf.ServersStore = &KapacitorStore{}
|
||||
|
||||
// KapacitorStore implements the chronograf.ServersStore interface, and
|
||||
// delegates to all contained KapacitorStores
|
||||
type KapacitorStore struct {
|
||||
Stores []chronograf.ServersStore
|
||||
}
|
||||
|
||||
// All concatenates the Kapacitors of all contained Stores
|
||||
func (multi *KapacitorStore) All(ctx context.Context) ([]chronograf.Server, error) {
|
||||
all := []chronograf.Server{}
|
||||
kapSet := map[int]struct{}{}
|
||||
|
||||
ok := false
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var kaps []chronograf.Server
|
||||
kaps, err = store.All(ctx)
|
||||
if err != nil {
|
||||
// If this Store is unable to return an array of kapacitors, skip to the
|
||||
// next Store.
|
||||
continue
|
||||
}
|
||||
ok = true // We've received a response from at least one Store
|
||||
for _, kap := range kaps {
|
||||
// Enforce that the kapacitor has a unique ID
|
||||
// If the ID has been seen before, ignore the kapacitor
|
||||
if _, okay := kapSet[kap.ID]; !okay { // We have a new kapacitor
|
||||
kapSet[kap.ID] = struct{}{} // We just care that the ID is unique
|
||||
all = append(all, kap)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
return nil, err
|
||||
}
|
||||
return all, nil
|
||||
}
|
||||
|
||||
// Add the kap to the first responsive Store
|
||||
func (multi *KapacitorStore) Add(ctx context.Context, kap chronograf.Server) (chronograf.Server, error) {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var k chronograf.Server
|
||||
k, err = store.Add(ctx, kap)
|
||||
if err == nil {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
return chronograf.Server{}, nil
|
||||
}
|
||||
|
||||
// Delete delegates to all Stores, returns success if one Store is successful
|
||||
func (multi *KapacitorStore) Delete(ctx context.Context, kap chronograf.Server) error {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
err = store.Delete(ctx, kap)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Get finds the Source by id among all contained Stores
|
||||
func (multi *KapacitorStore) Get(ctx context.Context, id int) (chronograf.Server, error) {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
var k chronograf.Server
|
||||
k, err = store.Get(ctx, id)
|
||||
if err == nil {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
return chronograf.Server{}, nil
|
||||
}
|
||||
|
||||
// Update the first responsive Store
|
||||
func (multi *KapacitorStore) Update(ctx context.Context, kap chronograf.Server) error {
|
||||
var err error
|
||||
for _, store := range multi.Stores {
|
||||
err = store.Update(ctx, kap)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
package multistore
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
)
|
||||
|
||||
func TestInterfaceImplementation(t *testing.T) {
|
||||
var _ chronograf.ServersStore = &KapacitorStore{}
|
||||
}
|
|
@ -81,7 +81,7 @@ func (fs *MultiSourceBuilder) Build(db chronograf.SourcesStore) (*memdb.MultiSou
|
|||
|
||||
// KapacitorBuilder builds a KapacitorStore
|
||||
type KapacitorBuilder interface {
|
||||
Build(chronograf.ServersStore) (*memdb.MultiKapacitorStore, error)
|
||||
Build(chronograf.ServersStore) (*multistore.KapacitorStore, error)
|
||||
}
|
||||
|
||||
// MultiKapacitorBuilder implements KapacitorBuilder
|
||||
|
@ -92,7 +92,7 @@ type MultiKapacitorBuilder struct {
|
|||
}
|
||||
|
||||
// Build will return a MultiKapacitorStore
|
||||
func (builder *MultiKapacitorBuilder) Build(db chronograf.ServersStore) (*memdb.MultiKapacitorStore, error) {
|
||||
func (builder *MultiKapacitorBuilder) Build(db chronograf.ServersStore) (*multistore.KapacitorStore, error) {
|
||||
stores := []chronograf.ServersStore{db}
|
||||
if builder.KapacitorURL != "" {
|
||||
memStore := &memdb.KapacitorStore{
|
||||
|
@ -107,7 +107,7 @@ func (builder *MultiKapacitorBuilder) Build(db chronograf.ServersStore) (*memdb.
|
|||
}
|
||||
stores = append([]chronograf.ServersStore{memStore}, stores...)
|
||||
}
|
||||
kapacitors := &memdb.MultiKapacitorStore{
|
||||
kapacitors := &multistore.KapacitorStore{
|
||||
Stores: stores,
|
||||
}
|
||||
return kapacitors, nil
|
||||
|
|
Loading…
Reference in New Issue