From da9941b9a9096757d0155e58c0c6ae9c83160ab7 Mon Sep 17 00:00:00 2001
From: Allen Petersen <allen.petersen@osnexus.com>
Date: Mon, 5 Dec 2016 22:57:35 -0800
Subject: [PATCH] Reject invalid subscription urls

The url must have a scheme of udp,http,https and a port number.
CREATE SUBSCRIPTION will fail if there are invalid destinations.

Additionally Service.createSubscription fail invalid destinations are detected.

Fixes #7615
---
 CHANGELOG.md                   |  3 ++-
 services/meta/client_test.go   | 26 +++++++++++++++++++-
 services/meta/data.go          | 27 +++++++++++++++++++++
 services/meta/errors.go        |  5 ++++
 services/subscriber/service.go | 44 ++++++++++++++++++----------------
 5 files changed, 82 insertions(+), 23 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index c2065521ab..db35d77884 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -20,13 +20,14 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
 
 - [#7621](https://github.com/influxdata/influxdb/issues/7621): Expand string and boolean fields when using a wildcard with `sample()`.
 - [#7616](https://github.com/influxdata/influxdb/pull/7616): Fix chuid argument order in init script @ccasey
-- [#7656](https://github.com/influxdata/influxdb/issues/7656): Fix cross-platform backup/restore
+- [#7656](https://github.com/influxdata/influxdb/issues/7656): Fix cross-platform backup/restore @allenpetersen
 - [#7650](https://github.com/influxdata/influxdb/issues/7650): Ensures that all user privileges associated with a database are removed when the database is dropped.
 - [#7659](https://github.com/influxdata/influxdb/issues/7659): Fix CLI import bug when using self-signed SSL certificates.
 - [#7698](https://github.com/influxdata/influxdb/pull/7698): CLI was caching db/rp for insert into statements.
 - [#6527](https://github.com/influxdata/influxdb/issues/6527): 0.12.2 Influx CLI client PRECISION returns "Unknown precision....
 
 - [#7396](https://github.com/influxdata/influxdb/issues/7396): CLI should use spaces for alignment, not tabs.
+- [#7615](https://github.com/influxdata/influxdb/issues/7615): Reject invalid subscription urls @allenpetersen
 
 ## v1.1.1 [unreleased]
 
diff --git a/services/meta/client_test.go b/services/meta/client_test.go
index 65bc450ef5..e3f0cae4cf 100644
--- a/services/meta/client_test.go
+++ b/services/meta/client_test.go
@@ -8,6 +8,7 @@ import (
 	"path"
 	"reflect"
 	"runtime"
+	"strings"
 	"testing"
 	"time"
 
@@ -754,7 +755,8 @@ func TestMetaClient_Subscriptions_Create(t *testing.T) {
 	}
 
 	// Re-create a subscription
-	if err := c.CreateSubscription("db0", "autogen", "sub0", "ALL", []string{"udp://example.com:9090"}); err == nil || err.Error() != `subscription already exists` {
+	err := c.CreateSubscription("db0", "autogen", "sub0", "ALL", []string{"udp://example.com:9090"})
+	if err == nil || err.Error() != `subscription already exists` {
 		t.Fatalf("unexpected error: %s", err)
 	}
 
@@ -762,6 +764,28 @@ func TestMetaClient_Subscriptions_Create(t *testing.T) {
 	if err := c.CreateSubscription("db0", "autogen", "sub1", "ALL", []string{"udp://example.com:6060"}); err != nil {
 		t.Fatal(err)
 	}
+
+	// Create a subscription with invalid scheme
+	err = c.CreateSubscription("db0", "autogen", "sub2", "ALL", []string{"bad://example.com:9191"})
+	if err == nil || !strings.HasPrefix(err.Error(), "invalid subscription URL") {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	// Create a subscription without port number
+	err = c.CreateSubscription("db0", "autogen", "sub2", "ALL", []string{"udp://example.com"})
+	if err == nil || !strings.HasPrefix(err.Error(), "invalid subscription URL") {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	// Create an HTTP subscription.
+	if err := c.CreateSubscription("db0", "autogen", "sub3", "ALL", []string{"http://example.com:9092"}); err != nil {
+		t.Fatal(err)
+	}
+
+	// Create an HTTPS subscription.
+	if err := c.CreateSubscription("db0", "autogen", "sub4", "ALL", []string{"https://example.com:9092"}); err != nil {
+		t.Fatal(err)
+	}
 }
 
 func TestMetaClient_Subscriptions_Drop(t *testing.T) {
diff --git a/services/meta/data.go b/services/meta/data.go
index f079bf5bea..edfd140121 100644
--- a/services/meta/data.go
+++ b/services/meta/data.go
@@ -3,6 +3,8 @@ package meta
 import (
 	"errors"
 	"fmt"
+	"net"
+	"net/url"
 	"sort"
 	"strings"
 	"sync"
@@ -470,8 +472,33 @@ func (data *Data) DropContinuousQuery(database, name string) error {
 	return ErrContinuousQueryNotFound
 }
 
+// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or TCP.
+func validateURL(input string) error {
+	u, err := url.Parse(input)
+	if err != nil {
+		return ErrInvalidSubscriptionURL(input)
+	}
+
+	if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" {
+		return ErrInvalidSubscriptionURL(input)
+	}
+
+	_, port, err := net.SplitHostPort(u.Host)
+	if err != nil || port == "" {
+		return ErrInvalidSubscriptionURL(input)
+	}
+
+	return nil
+}
+
 // CreateSubscription adds a named subscription to a database and retention policy.
 func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error {
+	for _, d := range destinations {
+		if err := validateURL(d); err != nil {
+			return err
+		}
+	}
+
 	rpi, err := data.RetentionPolicy(database, rp)
 	if err != nil {
 		return err
diff --git a/services/meta/errors.go b/services/meta/errors.go
index 802ea2f0b6..2a8b752fef 100644
--- a/services/meta/errors.go
+++ b/services/meta/errors.go
@@ -93,6 +93,11 @@ var (
 	ErrSubscriptionNotFound = errors.New("subscription not found")
 )
 
+// ErrInvalidSubscriptionURL is returned when the destication url is invalid.
+func ErrInvalidSubscriptionURL(url string) error {
+	return fmt.Errorf("invalid subscription URL: %s", url)
+}
+
 var (
 	// ErrUserExists is returned when creating an already existing user.
 	ErrUserExists = errors.New("user already exists")
diff --git a/services/subscriber/service.go b/services/subscriber/service.go
index c8939663e3..db051efe1f 100644
--- a/services/subscriber/service.go
+++ b/services/subscriber/service.go
@@ -19,8 +19,9 @@ import (
 
 // Statistics for the Subscriber service.
 const (
-	statPointsWritten = "pointsWritten"
-	statWriteFailures = "writeFailures"
+	statCreateFailures = "createFailures"
+	statPointsWritten  = "pointsWritten"
+	statWriteFailures  = "writeFailures"
 )
 
 // PointsWriter is an interface for writing points to a subscription destination.
@@ -123,8 +124,9 @@ func (s *Service) SetLogOutput(w io.Writer) {
 
 // Statistics maintains the statistics for the subscriber service.
 type Statistics struct {
-	WriteFailures int64
-	PointsWritten int64
+	CreateFailures int64
+	PointsWritten  int64
+	WriteFailures  int64
 }
 
 // Statistics returns statistics for periodic monitoring.
@@ -133,8 +135,9 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
 		Name: "subscriber",
 		Tags: tags,
 		Values: map[string]interface{}{
-			statPointsWritten: atomic.LoadInt64(&s.stats.PointsWritten),
-			statWriteFailures: atomic.LoadInt64(&s.stats.WriteFailures),
+			statCreateFailures: atomic.LoadInt64(&s.stats.CreateFailures),
+			statPointsWritten:  atomic.LoadInt64(&s.stats.PointsWritten),
+			statWriteFailures:  atomic.LoadInt64(&s.stats.WriteFailures),
 		},
 	}}
 
@@ -183,20 +186,22 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st
 	default:
 		return nil, fmt.Errorf("unknown balance mode %q", mode)
 	}
-	writers := make([]PointsWriter, len(destinations))
-	stats := make([]writerStats, len(writers))
-	for i, dest := range destinations {
+	writers := make([]PointsWriter, 0, len(destinations))
+	stats := make([]writerStats, 0, len(destinations))
+	// add only valid destinations
+	for _, dest := range destinations {
 		u, err := url.Parse(dest)
 		if err != nil {
-			return nil, err
+			return nil, fmt.Errorf("Failed to parse subscription url: %s", dest)
 		}
 		w, err := s.NewPointsWriter(*u)
 		if err != nil {
-			return nil, err
+			return nil, fmt.Errorf("Failed to create PointsWriter for subscription url: %s", dest)
 		}
-		writers[i] = w
-		stats[i].dest = dest
+		writers = append(writers, w)
+		stats = append(stats, writerStats{dest: dest})
 	}
+
 	return &balancewriter{
 		bm:      bm,
 		writers: writers,
@@ -224,10 +229,7 @@ func (s *Service) run() {
 	for {
 		select {
 		case <-s.update:
-			err := s.updateSubs(&wg)
-			if err != nil {
-				s.Logger.Println("failed to update subscriptions:", err)
-			}
+			s.updateSubs(&wg)
 		case p, ok := <-s.points:
 			if !ok {
 				// Close out all chanWriters
@@ -260,7 +262,7 @@ func (s *Service) close(wg *sync.WaitGroup) {
 	s.subs = nil
 }
 
-func (s *Service) updateSubs(wg *sync.WaitGroup) error {
+func (s *Service) updateSubs(wg *sync.WaitGroup) {
 	s.subMu.Lock()
 	defer s.subMu.Unlock()
 
@@ -285,7 +287,9 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
 				}
 				sub, err := s.createSubscription(se, si.Mode, si.Destinations)
 				if err != nil {
-					return err
+					atomic.AddInt64(&s.stats.CreateFailures, 1)
+					s.Logger.Println(err)
+					continue
 				}
 				cw := chanWriter{
 					writeRequests: make(chan *coordinator.WritePointsRequest, s.conf.WriteBufferSize),
@@ -318,8 +322,6 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) error {
 			s.Logger.Println("deleted old subscription for", se.db, se.rp)
 		}
 	}
-
-	return nil
 }
 
 // Creates a PointsWriter from the given URL