Reject invalid subscription urls

The url must have a scheme of udp,http,https and a port number.
CREATE SUBSCRIPTION will fail if there are invalid destinations.

Additionally Service.createSubscription fail invalid destinations are detected.

Fixes #7615
pull/7673/head
Allen Petersen 2016-12-05 22:57:35 -08:00
parent 6f027ac60e
commit da9941b9a9
5 changed files with 82 additions and 23 deletions

View File

@ -20,13 +20,14 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7621](https://github.com/influxdata/influxdb/issues/7621): Expand string and boolean fields when using a wildcard with `sample()`. - [#7621](https://github.com/influxdata/influxdb/issues/7621): Expand string and boolean fields when using a wildcard with `sample()`.
- [#7616](https://github.com/influxdata/influxdb/pull/7616): Fix chuid argument order in init script @ccasey - [#7616](https://github.com/influxdata/influxdb/pull/7616): Fix chuid argument order in init script @ccasey
- [#7656](https://github.com/influxdata/influxdb/issues/7656): Fix cross-platform backup/restore - [#7656](https://github.com/influxdata/influxdb/issues/7656): Fix cross-platform backup/restore @allenpetersen
- [#7650](https://github.com/influxdata/influxdb/issues/7650): Ensures that all user privileges associated with a database are removed when the database is dropped. - [#7650](https://github.com/influxdata/influxdb/issues/7650): Ensures that all user privileges associated with a database are removed when the database is dropped.
- [#7659](https://github.com/influxdata/influxdb/issues/7659): Fix CLI import bug when using self-signed SSL certificates. - [#7659](https://github.com/influxdata/influxdb/issues/7659): Fix CLI import bug when using self-signed SSL certificates.
- [#7698](https://github.com/influxdata/influxdb/pull/7698): CLI was caching db/rp for insert into statements. - [#7698](https://github.com/influxdata/influxdb/pull/7698): CLI was caching db/rp for insert into statements.
- [#6527](https://github.com/influxdata/influxdb/issues/6527): 0.12.2 Influx CLI client PRECISION returns "Unknown precision.... - [#6527](https://github.com/influxdata/influxdb/issues/6527): 0.12.2 Influx CLI client PRECISION returns "Unknown precision....
- [#7396](https://github.com/influxdata/influxdb/issues/7396): CLI should use spaces for alignment, not tabs. - [#7396](https://github.com/influxdata/influxdb/issues/7396): CLI should use spaces for alignment, not tabs.
- [#7615](https://github.com/influxdata/influxdb/issues/7615): Reject invalid subscription urls @allenpetersen
## v1.1.1 [unreleased] ## v1.1.1 [unreleased]

View File

@ -8,6 +8,7 @@ import (
"path" "path"
"reflect" "reflect"
"runtime" "runtime"
"strings"
"testing" "testing"
"time" "time"
@ -754,7 +755,8 @@ func TestMetaClient_Subscriptions_Create(t *testing.T) {
} }
// Re-create a subscription // Re-create a subscription
if err := c.CreateSubscription("db0", "autogen", "sub0", "ALL", []string{"udp://example.com:9090"}); err == nil || err.Error() != `subscription already exists` { err := c.CreateSubscription("db0", "autogen", "sub0", "ALL", []string{"udp://example.com:9090"})
if err == nil || err.Error() != `subscription already exists` {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -762,6 +764,28 @@ func TestMetaClient_Subscriptions_Create(t *testing.T) {
if err := c.CreateSubscription("db0", "autogen", "sub1", "ALL", []string{"udp://example.com:6060"}); err != nil { if err := c.CreateSubscription("db0", "autogen", "sub1", "ALL", []string{"udp://example.com:6060"}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Create a subscription with invalid scheme
err = c.CreateSubscription("db0", "autogen", "sub2", "ALL", []string{"bad://example.com:9191"})
if err == nil || !strings.HasPrefix(err.Error(), "invalid subscription URL") {
t.Fatalf("unexpected error: %s", err)
}
// Create a subscription without port number
err = c.CreateSubscription("db0", "autogen", "sub2", "ALL", []string{"udp://example.com"})
if err == nil || !strings.HasPrefix(err.Error(), "invalid subscription URL") {
t.Fatalf("unexpected error: %s", err)
}
// Create an HTTP subscription.
if err := c.CreateSubscription("db0", "autogen", "sub3", "ALL", []string{"http://example.com:9092"}); err != nil {
t.Fatal(err)
}
// Create an HTTPS subscription.
if err := c.CreateSubscription("db0", "autogen", "sub4", "ALL", []string{"https://example.com:9092"}); err != nil {
t.Fatal(err)
}
} }
func TestMetaClient_Subscriptions_Drop(t *testing.T) { func TestMetaClient_Subscriptions_Drop(t *testing.T) {

View File

@ -3,6 +3,8 @@ package meta
import ( import (
"errors" "errors"
"fmt" "fmt"
"net"
"net/url"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -470,8 +472,33 @@ func (data *Data) DropContinuousQuery(database, name string) error {
return ErrContinuousQueryNotFound return ErrContinuousQueryNotFound
} }
// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or TCP.
func validateURL(input string) error {
u, err := url.Parse(input)
if err != nil {
return ErrInvalidSubscriptionURL(input)
}
if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" {
return ErrInvalidSubscriptionURL(input)
}
_, port, err := net.SplitHostPort(u.Host)
if err != nil || port == "" {
return ErrInvalidSubscriptionURL(input)
}
return nil
}
// CreateSubscription adds a named subscription to a database and retention policy. // CreateSubscription adds a named subscription to a database and retention policy.
func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error { func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error {
for _, d := range destinations {
if err := validateURL(d); err != nil {
return err
}
}
rpi, err := data.RetentionPolicy(database, rp) rpi, err := data.RetentionPolicy(database, rp)
if err != nil { if err != nil {
return err return err

View File

@ -93,6 +93,11 @@ var (
ErrSubscriptionNotFound = errors.New("subscription not found") ErrSubscriptionNotFound = errors.New("subscription not found")
) )
// ErrInvalidSubscriptionURL is returned when the destication url is invalid.
func ErrInvalidSubscriptionURL(url string) error {
return fmt.Errorf("invalid subscription URL: %s", url)
}
var ( var (
// ErrUserExists is returned when creating an already existing user. // ErrUserExists is returned when creating an already existing user.
ErrUserExists = errors.New("user already exists") ErrUserExists = errors.New("user already exists")

View File

@ -19,6 +19,7 @@ import (
// Statistics for the Subscriber service. // Statistics for the Subscriber service.
const ( const (
statCreateFailures = "createFailures"
statPointsWritten = "pointsWritten" statPointsWritten = "pointsWritten"
statWriteFailures = "writeFailures" statWriteFailures = "writeFailures"
) )
@ -123,8 +124,9 @@ func (s *Service) SetLogOutput(w io.Writer) {
// Statistics maintains the statistics for the subscriber service. // Statistics maintains the statistics for the subscriber service.
type Statistics struct { type Statistics struct {
WriteFailures int64 CreateFailures int64
PointsWritten int64 PointsWritten int64
WriteFailures int64
} }
// Statistics returns statistics for periodic monitoring. // Statistics returns statistics for periodic monitoring.
@ -133,6 +135,7 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
Name: "subscriber", Name: "subscriber",
Tags: tags, Tags: tags,
Values: map[string]interface{}{ Values: map[string]interface{}{
statCreateFailures: atomic.LoadInt64(&s.stats.CreateFailures),
statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten), statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures), statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
}, },
@ -183,20 +186,22 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
default: default:
return nil, fmt.Errorf("unknown balance mode %q", mode) return nil, fmt.Errorf("unknown balance mode %q", mode)
} }
writers := make([]PointsWriter, len(destinations)) writers := make([]PointsWriter, 0, len(destinations))
stats := make([]writerStats, len(writers)) stats := make([]writerStats, 0, len(destinations))
for i, dest := range destinations { // add only valid destinations
for _, dest := range destinations {
u, err := url.Parse(dest) u, err := url.Parse(dest)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("Failed to parse subscription url: %s", dest)
} }
w, err := s.NewPointsWriter(*u) w, err := s.NewPointsWriter(*u)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("Failed to create PointsWriter for subscription url: %s", dest)
} }
writers[i] = w writers = append(writers, w)
stats[i].dest = dest stats = append(stats, writerStats{dest: dest})
} }
return &balancewriter{ return &balancewriter{
bm: bm, bm: bm,
writers: writers, writers: writers,
@ -224,10 +229,7 @@ func (s *Service) run() {
for { for {
select { select {
case <-s.update: case <-s.update:
err := s.updateSubs(&wg) s.updateSubs(&wg)
if err != nil {
s.Logger.Println("failed to update subscriptions:", err)
}
case p, ok := <-s.points: case p, ok := <-s.points:
if !ok { if !ok {
// Close out all chanWriters // Close out all chanWriters
@ -260,7 +262,7 @@ func (s *Service) close(wg *sync.WaitGroup) {
s.subs = nil s.subs = nil
} }
func (s *Service) updateSubs(wg *sync.WaitGroup) error { func (s *Service) updateSubs(wg *sync.WaitGroup) {
s.subMu.Lock() s.subMu.Lock()
defer s.subMu.Unlock() defer s.subMu.Unlock()
@ -285,7 +287,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
} }
sub, err := s.createSubscription(se, si.Mode, si.Destinations) sub, err := s.createSubscription(se, si.Mode, si.Destinations)
if err != nil { if err != nil {
return err atomic.AddInt64(&s.stats.CreateFailures, 1)
s.Logger.Println(err)
continue
} }
cw := chanWriter{ cw := chanWriter{
writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize), writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize),
@ -318,8 +322,6 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
s.Logger.Println("deleted old subscription for", se.db, se.rp) s.Logger.Println("deleted old subscription for", se.db, se.rp)
} }
} }
return nil
} }
// Creates a PointsWriter from the given URL // Creates a PointsWriter from the given URL