Merge pull request #1695 from influxdata/feature/persist_datasource_flag-1555
Add server flag to add new InfluxDb source and Kapacitor serverpull/10616/head
commit
f988da6e7f
|
@ -9,6 +9,7 @@
|
||||||
1. [#1645](https://github.com/influxdata/chronograf/pull/1645): Add Auth0 as a supported OAuth2 provider
|
1. [#1645](https://github.com/influxdata/chronograf/pull/1645): Add Auth0 as a supported OAuth2 provider
|
||||||
1. [#1660](https://github.com/influxdata/chronograf/pull/1660): Add ability to add custom links to User menu via server CLI or ENV vars
|
1. [#1660](https://github.com/influxdata/chronograf/pull/1660): Add ability to add custom links to User menu via server CLI or ENV vars
|
||||||
1. [#1674](https://github.com/influxdata/chronograf/pull/1674): Add support for organizations in Auth0
|
1. [#1674](https://github.com/influxdata/chronograf/pull/1674): Add support for organizations in Auth0
|
||||||
|
1. [#1695](https://github.com/influxdata/chronograf/pull/1695): Add server flag for adding new InfluxDb sources with Kapacitor servers
|
||||||
|
|
||||||
### UI Improvements
|
### UI Improvements
|
||||||
1. [#1644](https://github.com/influxdata/chronograf/pull/1644): Redesign Alerts History table to have sticky headers
|
1. [#1644](https://github.com/influxdata/chronograf/pull/1644): Redesign Alerts History table to have sticky headers
|
||||||
|
|
|
@ -625,3 +625,44 @@ type LayoutStore interface {
|
||||||
// Update the dashboard in the store.
|
// Update the dashboard in the store.
|
||||||
Update(context.Context, Layout) error
|
Update(context.Context, Layout) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SourceAndKapacitor is used to parse any NewSources server flag arguments
|
||||||
|
type SourceAndKapacitor struct {
|
||||||
|
Source Source `json:"influxdb"`
|
||||||
|
Kapacitor Server `json:"kapacitor"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSources adds sources to BoltDb idempotently by name, as well as respective kapacitors
|
||||||
|
func NewSources(ctx context.Context, sourcesStore SourcesStore, serversStore ServersStore, srcsKaps []SourceAndKapacitor, logger Logger) error {
|
||||||
|
srcs, err := sourcesStore.All(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
SourceLoop:
|
||||||
|
for _, srcKap := range srcsKaps {
|
||||||
|
for _, src := range srcs {
|
||||||
|
// If source already exists, do nothing
|
||||||
|
if src.Name == srcKap.Source.Name {
|
||||||
|
logger.
|
||||||
|
WithField("component", "server").
|
||||||
|
WithField("NewSources", src.Name).
|
||||||
|
Info("Source already exists")
|
||||||
|
continue SourceLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
src, err := sourcesStore.Add(ctx, srcKap.Source)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
srcKap.Kapacitor.SrcID = src.ID
|
||||||
|
_, err = serversStore.Add(ctx, srcKap.Kapacitor)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
package chronograf_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/chronograf"
|
||||||
|
"github.com/influxdata/chronograf/mocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_NewSources(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
srcsKaps := []chronograf.SourceAndKapacitor{
|
||||||
|
{
|
||||||
|
Source: chronograf.Source{
|
||||||
|
Default: true,
|
||||||
|
InsecureSkipVerify: false,
|
||||||
|
MetaURL: "http://metaurl.com",
|
||||||
|
Name: "Influx 1",
|
||||||
|
Password: "pass1",
|
||||||
|
Telegraf: "telegraf",
|
||||||
|
URL: "http://localhost:8086",
|
||||||
|
Username: "user1",
|
||||||
|
},
|
||||||
|
Kapacitor: chronograf.Server{
|
||||||
|
Active: true,
|
||||||
|
Name: "Kapa 1",
|
||||||
|
URL: "http://localhost:9092",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
saboteurSrcsKaps := []chronograf.SourceAndKapacitor{
|
||||||
|
{
|
||||||
|
Source: chronograf.Source{
|
||||||
|
Name: "Influx 1",
|
||||||
|
},
|
||||||
|
Kapacitor: chronograf.Server{
|
||||||
|
Name: "Kapa Aspiring Saboteur",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
srcs := []chronograf.Source{}
|
||||||
|
srcsStore := mocks.SourcesStore{
|
||||||
|
AllF: func(ctx context.Context) ([]chronograf.Source, error) {
|
||||||
|
return srcs, nil
|
||||||
|
},
|
||||||
|
AddF: func(ctx context.Context, src chronograf.Source) (chronograf.Source, error) {
|
||||||
|
srcs = append(srcs, src)
|
||||||
|
return src, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srvs := []chronograf.Server{}
|
||||||
|
srvsStore := mocks.ServersStore{
|
||||||
|
AddF: func(ctx context.Context, srv chronograf.Server) (chronograf.Server, error) {
|
||||||
|
srvs = append(srvs, srv)
|
||||||
|
return srv, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := chronograf.NewSources(ctx, &srcsStore, &srvsStore, srcsKaps, &mocks.TestLogger{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Expected no error when creating New Sources. Error:", err)
|
||||||
|
}
|
||||||
|
if len(srcs) != 1 {
|
||||||
|
t.Error("Expected one source in sourcesStore")
|
||||||
|
}
|
||||||
|
if len(srvs) != 1 {
|
||||||
|
t.Error("Expected one source in serversStore")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = chronograf.NewSources(ctx, &srcsStore, &srvsStore, saboteurSrcsKaps, &mocks.TestLogger{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Expected no error when creating New Sources. Error:", err)
|
||||||
|
}
|
||||||
|
if len(srcs) != 1 {
|
||||||
|
t.Error("Expected one source in sourcesStore")
|
||||||
|
}
|
||||||
|
if len(srvs) != 1 {
|
||||||
|
t.Error("Expected one source in serversStore")
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(srcs[0], srcsKaps[0].Source) {
|
||||||
|
t.Error("Expected source in sourceStore to remain unchanged")
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package server_test
|
package mocks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
|
@ -0,0 +1,38 @@
|
||||||
|
package mocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/influxdata/chronograf"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ chronograf.ServersStore = &ServersStore{}
|
||||||
|
|
||||||
|
// ServersStore mock allows all functions to be set for testing
|
||||||
|
type ServersStore struct {
|
||||||
|
AllF func(context.Context) ([]chronograf.Server, error)
|
||||||
|
AddF func(context.Context, chronograf.Server) (chronograf.Server, error)
|
||||||
|
DeleteF func(context.Context, chronograf.Server) error
|
||||||
|
GetF func(ctx context.Context, ID int) (chronograf.Server, error)
|
||||||
|
UpdateF func(context.Context, chronograf.Server) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServersStore) All(ctx context.Context) ([]chronograf.Server, error) {
|
||||||
|
return s.AllF(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServersStore) Add(ctx context.Context, srv chronograf.Server) (chronograf.Server, error) {
|
||||||
|
return s.AddF(ctx, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServersStore) Delete(ctx context.Context, srv chronograf.Server) error {
|
||||||
|
return s.DeleteF(ctx, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServersStore) Get(ctx context.Context, id int) (chronograf.Server, error) {
|
||||||
|
return s.GetF(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ServersStore) Update(ctx context.Context, srv chronograf.Server) error {
|
||||||
|
return s.UpdateF(ctx, srv)
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package server
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
|
@ -50,6 +51,8 @@ type Server struct {
|
||||||
KapacitorUsername string `long:"kapacitor-username" description:"Username of your Kapacitor instance" env:"KAPACITOR_USERNAME"`
|
KapacitorUsername string `long:"kapacitor-username" description:"Username of your Kapacitor instance" env:"KAPACITOR_USERNAME"`
|
||||||
KapacitorPassword string `long:"kapacitor-password" description:"Password of your Kapacitor instance" env:"KAPACITOR_PASSWORD"`
|
KapacitorPassword string `long:"kapacitor-password" description:"Password of your Kapacitor instance" env:"KAPACITOR_PASSWORD"`
|
||||||
|
|
||||||
|
NewSources string `long:"new-sources" description:"Config for adding a new InfluxDb source and Kapacitor server, in JSON as an array of objects, and surrounded by single quotes. E.g. --new-sources='[{\"influxdb\":{\"name\":\"Influx 1\",\"username\":\"user1\",\"password\":\"pass1\",\"url\":\"http://localhost:8086\",\"metaUrl\":\"http://metaurl.com\",\"insecureSkipVerify\":false,\"default\":true,\"telegraf\":\"telegraf\"},\"kapacitor\":{\"name\":\"Kapa 1\",\"url\":\"http://localhost:9092\",\"active\":true}}]'" env:"NEW_SOURCES"`
|
||||||
|
|
||||||
Develop bool `short:"d" long:"develop" description:"Run server in develop mode."`
|
Develop bool `short:"d" long:"develop" description:"Run server in develop mode."`
|
||||||
BoltPath string `short:"b" long:"bolt-path" description:"Full path to boltDB file (/var/lib/chronograf/chronograf-v1.db)" env:"BOLT_PATH" default:"chronograf-v1.db"`
|
BoltPath string `short:"b" long:"bolt-path" description:"Full path to boltDB file (/var/lib/chronograf/chronograf-v1.db)" env:"BOLT_PATH" default:"chronograf-v1.db"`
|
||||||
CannedPath string `short:"c" long:"canned-path" description:"Path to directory of pre-canned application layouts (/usr/share/chronograf/canned)" env:"CANNED_PATH" default:"canned"`
|
CannedPath string `short:"c" long:"canned-path" description:"Path to directory of pre-canned application layouts (/usr/share/chronograf/canned)" env:"CANNED_PATH" default:"canned"`
|
||||||
|
@ -298,6 +301,9 @@ func (s *Server) Serve(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
service := openService(ctx, s.BoltPath, layoutBuilder, sourcesBuilder, kapacitorBuilder, logger, s.useAuth())
|
service := openService(ctx, s.BoltPath, layoutBuilder, sourcesBuilder, kapacitorBuilder, logger, s.useAuth())
|
||||||
|
|
||||||
|
go processNewSources(ctx, service, s.NewSources, logger)
|
||||||
|
|
||||||
basepath = s.Basepath
|
basepath = s.Basepath
|
||||||
if basepath != "" && s.PrefixRoutes == false {
|
if basepath != "" && s.PrefixRoutes == false {
|
||||||
logger.
|
logger.
|
||||||
|
@ -431,6 +437,33 @@ func openService(ctx context.Context, boltPath string, lBuilder LayoutBuilder, s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processNewSources parses and persists new sources passed in via server flag
|
||||||
|
func processNewSources(ctx context.Context, service Service, newSources string, logger chronograf.Logger) error {
|
||||||
|
if newSources == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var srcsKaps []chronograf.SourceAndKapacitor
|
||||||
|
// On JSON unmarshal error, continue server process without new source and write error to log
|
||||||
|
if err := json.Unmarshal([]byte(newSources), &srcsKaps); err != nil {
|
||||||
|
logger.
|
||||||
|
WithField("component", "server").
|
||||||
|
WithField("NewSources", "invalid").
|
||||||
|
Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add any new sources and kapacitors as specified via server flag
|
||||||
|
if err := chronograf.NewSources(ctx, service.SourcesStore, service.ServersStore, srcsKaps, logger); err != nil {
|
||||||
|
// Continue with server run even if adding NewSources fails
|
||||||
|
logger.
|
||||||
|
WithField("component", "server").
|
||||||
|
WithField("NewSources", "invalid").
|
||||||
|
Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// reportUsageStats starts periodic server reporting.
|
// reportUsageStats starts periodic server reporting.
|
||||||
func reportUsageStats(bi BuildInfo, logger chronograf.Logger) {
|
func reportUsageStats(bi BuildInfo, logger chronograf.Logger) {
|
||||||
rand.Seed(time.Now().UTC().UnixNano())
|
rand.Seed(time.Now().UTC().UnixNano())
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/chronograf/mocks"
|
||||||
"github.com/influxdata/chronograf/server"
|
"github.com/influxdata/chronograf/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -138,7 +139,7 @@ func Test_Server_Prefixer_NoPrefixingWithoutFlusther(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
tl := &TestLogger{}
|
tl := &mocks.TestLogger{}
|
||||||
pfx := &server.URLPrefixer{
|
pfx := &server.URLPrefixer{
|
||||||
Prefix: "/hill",
|
Prefix: "/hill",
|
||||||
Next: backend,
|
Next: backend,
|
||||||
|
|
Loading…
Reference in New Issue