feat: remote write function for replications (#22942)
* feat: remote write function for replications * chore: implement UpdateResponseInfo store method * chore: only set gzip heading for non-empty requests * fix: address review feedbackpull/22949/head
parent
5ce164f849
commit
9873ccd657
3
go.mod
3
go.mod
|
@ -99,7 +99,7 @@ require (
|
|||
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect
|
||||
)
|
||||
|
||||
require github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3
|
||||
require github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.82.0 // indirect
|
||||
|
@ -137,6 +137,7 @@ require (
|
|||
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
|
||||
github.com/c-bata/go-prompt v0.2.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.1 // indirect
|
||||
github.com/daixiang0/gci v0.2.8 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.6.0 // indirect
|
||||
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
|
||||
github.com/dimchansky/utfbom v1.1.0 // indirect
|
||||
|
|
5
go.sum
5
go.sum
|
@ -180,6 +180,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
|
|||
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
github.com/daixiang0/gci v0.2.8 h1:1mrIGMBQsBu0P7j7m1M8Lb+ZeZxsZL+jyGX4YoMJJpg=
|
||||
github.com/daixiang0/gci v0.2.8/go.mod h1:+4dZ7TISfSmqfAGv59ePaHfNzgGtIkHAhhdKggP1JAc=
|
||||
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -432,6 +433,10 @@ github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0f
|
|||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
|
||||
github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3 h1:DJFtOP/Gji5K6iut794K1pTKPd9SqM9J+Cb7vXgsnq0=
|
||||
github.com/influxdata/influx-cli/v2 v2.1.1-0.20211007122339-c4a5a13c8ee3/go.mod h1:piIN/dAOSRqdZZc2sHO7CORuWUQ0UXdNrjugF3cEr8k=
|
||||
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d h1:An2Su6JpQwYTmONvndYkkjxtfAE5w04rUyH1kf/tWcg=
|
||||
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
|
||||
github.com/influxdata/influx-cli/v2 v2.2.1 h1:K4kzXqPwfe0Qv3eY0TSiI9LEplwFGWiAKi4VfKy8KFs=
|
||||
github.com/influxdata/influx-cli/v2 v2.2.1/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
|
||||
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 h1:8io3jjCJ0j9NFvq3/m/rMrDiEILpsfOqWDPItUt/078=
|
||||
|
|
|
@ -88,3 +88,14 @@ func (r *UpdateReplicationRequest) OK() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReplicationHTTPConfig contains all info needed by a client to make HTTP requests against the
|
||||
// remote bucket targeted by a replication.
|
||||
type ReplicationHTTPConfig struct {
|
||||
RemoteURL string `db:"remote_url"`
|
||||
RemoteToken string `db:"remote_api_token"`
|
||||
RemoteOrgID platform.ID `db:"remote_org_id"`
|
||||
AllowInsecureTLS bool `db:"allow_insecure_tls"`
|
||||
RemoteBucketID platform.ID `db:"remote_bucket_id"`
|
||||
DropNonRetryableData bool `db:"drop_non_retryable_data"`
|
||||
}
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
package internal
|
||||
|
||||
import "github.com/influxdata/influxdb/v2/kit/platform"
|
||||
|
||||
// ReplicationHTTPConfig contains all info needed by a client to make HTTP requests against the
|
||||
// remote bucket targeted by a replication.
|
||||
type ReplicationHTTPConfig struct {
|
||||
RemoteURL string `db:"remote_url"`
|
||||
RemoteToken string `db:"remote_api_token"`
|
||||
RemoteOrgID platform.ID `db:"remote_org_id"`
|
||||
AllowInsecureTLS bool `db:"allow_insecure_tls"`
|
||||
RemoteBucketID platform.ID `db:"remote_bucket_id"`
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -11,18 +12,23 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/pkg/durablequeue"
|
||||
"github.com/influxdata/influxdb/v2/replications/metrics"
|
||||
"github.com/influxdata/influxdb/v2/replications/remotewrite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type remoteWriter interface {
|
||||
Write(context.Context, []byte) error
|
||||
}
|
||||
|
||||
type replicationQueue struct {
|
||||
id platform.ID
|
||||
queue *durablequeue.Queue
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
receive chan struct{}
|
||||
logger *zap.Logger
|
||||
metrics *metrics.ReplicationsMetrics
|
||||
writeFunc func([]byte) error
|
||||
id platform.ID
|
||||
queue *durablequeue.Queue
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
receive chan struct{}
|
||||
logger *zap.Logger
|
||||
metrics *metrics.ReplicationsMetrics
|
||||
remoteWriter remoteWriter
|
||||
}
|
||||
|
||||
type durableQueueManager struct {
|
||||
|
@ -31,7 +37,7 @@ type durableQueueManager struct {
|
|||
queuePath string
|
||||
mutex sync.RWMutex
|
||||
metrics *metrics.ReplicationsMetrics
|
||||
writeFunc func([]byte) error
|
||||
configStore remotewrite.HttpConfigStore
|
||||
}
|
||||
|
||||
var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details")
|
||||
|
@ -39,7 +45,7 @@ var errShutdown = errors.New("shutdown tasks for replications durable queues fai
|
|||
|
||||
// NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with
|
||||
// replication streams.
|
||||
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, writeFunc func([]byte) error) *durableQueueManager {
|
||||
func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, configStore remotewrite.HttpConfigStore) *durableQueueManager {
|
||||
replicationQueues := make(map[platform.ID]*replicationQueue)
|
||||
|
||||
os.MkdirAll(queuePath, 0777)
|
||||
|
@ -49,7 +55,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.
|
|||
logger: log,
|
||||
queuePath: queuePath,
|
||||
metrics: metrics,
|
||||
writeFunc: writeFunc,
|
||||
configStore: configStore,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,13 +122,6 @@ func (rq *replicationQueue) Close() error {
|
|||
return rq.queue.Close()
|
||||
}
|
||||
|
||||
// WriteFunc is currently a placeholder for the "default" behavior
|
||||
// of the queue scanner sending data from the durable queue to a remote host.
|
||||
func WriteFunc(b []byte) error {
|
||||
// TODO: Add metrics updates for BytesSent, BytesDropped, and ErrorCodes
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rq *replicationQueue) run() {
|
||||
defer rq.wg.Done()
|
||||
|
||||
|
@ -131,7 +130,7 @@ func (rq *replicationQueue) run() {
|
|||
case <-rq.done: // end the goroutine when done is messaged
|
||||
return
|
||||
case <-rq.receive: // run the scanner on data append
|
||||
for rq.SendWrite(rq.writeFunc) {
|
||||
for rq.SendWrite() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +140,7 @@ func (rq *replicationQueue) run() {
|
|||
// SendWrite is responsible for processing all data in the queue at the time of calling.
|
||||
// Retryable errors should be handled and retried in the dp function.
|
||||
// Unprocessable data should be dropped in the dp function.
|
||||
func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
|
||||
func (rq *replicationQueue) SendWrite() bool {
|
||||
// Any error in creating the scanner should exit the loop in run()
|
||||
// Either it is io.EOF indicating no data, or some other failure in making
|
||||
// the Scanner object that we don't know how to handle.
|
||||
|
@ -170,7 +169,8 @@ func (rq *replicationQueue) SendWrite(dp func([]byte) error) bool {
|
|||
// An error here indicates an unhandlable error. Data is not corrupt, and
|
||||
// the remote write is not retryable. A potential example of an error here
|
||||
// is an authentication error with the remote host.
|
||||
if err = dp(scan.Bytes()); err != nil {
|
||||
// TODO: Propagate context if needed to allow for graceful shutdowns, see https://github.com/influxdata/influxdb/issues/22944
|
||||
if err = rq.remoteWriter.Write(context.Background(), scan.Bytes()); err != nil {
|
||||
rq.logger.Error("Error in replication stream", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
@ -369,13 +369,15 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
|
|||
}
|
||||
|
||||
func (qm *durableQueueManager) newReplicationQueue(id platform.ID, queue *durablequeue.Queue) *replicationQueue {
|
||||
logger := qm.logger.With(zap.String("replication_id", id.String()))
|
||||
|
||||
return &replicationQueue{
|
||||
id: id,
|
||||
queue: queue,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
logger: qm.logger.With(zap.String("replication_id", id.String())),
|
||||
metrics: qm.metrics,
|
||||
writeFunc: qm.writeFunc,
|
||||
id: id,
|
||||
queue: queue,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
logger: logger,
|
||||
metrics: qm.metrics,
|
||||
remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -11,6 +14,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/kit/prom"
|
||||
"github.com/influxdata/influxdb/v2/kit/prom/promtest"
|
||||
"github.com/influxdata/influxdb/v2/replications/metrics"
|
||||
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
@ -37,38 +41,69 @@ func TestCreateNewQueueDirExists(t *testing.T) {
|
|||
func TestEnqueueScan(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
data := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
|
||||
// Create new queue
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
tests := []struct {
|
||||
name string
|
||||
testData []string
|
||||
writeFuncReturn error
|
||||
}{
|
||||
{
|
||||
name: "single point with successful write",
|
||||
testData: []string{data},
|
||||
writeFuncReturn: nil,
|
||||
},
|
||||
{
|
||||
name: "multiple points with successful write",
|
||||
testData: []string{data, data, data},
|
||||
writeFuncReturn: nil,
|
||||
},
|
||||
{
|
||||
name: "single point with unsuccessful write",
|
||||
testData: []string{data},
|
||||
writeFuncReturn: errors.New("some error"),
|
||||
},
|
||||
{
|
||||
name: "multiple points with unsuccessful write",
|
||||
testData: []string{data, data, data},
|
||||
writeFuncReturn: errors.New("some error"),
|
||||
},
|
||||
}
|
||||
|
||||
// Enqueue some data
|
||||
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
qm.writeFunc = getTestWriteFunc(t, testData)
|
||||
err = qm.EnqueueData(id1, []byte(testData), 1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
|
||||
func TestEnqueueScanMultiple(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Create new queue
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
rq := qm.replicationQueues[id1]
|
||||
rq.remoteWriter = getTestRemoteWriter(t, data, tt.writeFuncReturn)
|
||||
|
||||
queuePath, qm := initQueueManager(t)
|
||||
defer os.RemoveAll(filepath.Dir(queuePath))
|
||||
// Enqueue the data
|
||||
for _, dat := range tt.testData {
|
||||
err = qm.EnqueueData(id1, []byte(dat), 1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Create new queue
|
||||
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
|
||||
require.NoError(t, err)
|
||||
// Check queue position
|
||||
close(rq.done)
|
||||
rq.wg.Wait()
|
||||
scan, err := rq.queue.NewScanner()
|
||||
|
||||
// Enqueue some data
|
||||
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
|
||||
qm.writeFunc = getTestWriteFunc(t, testData)
|
||||
err = qm.EnqueueData(id1, []byte(testData), 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = qm.EnqueueData(id1, []byte(testData), 1)
|
||||
require.NoError(t, err)
|
||||
if tt.writeFuncReturn == nil {
|
||||
require.ErrorIs(t, io.EOF, err)
|
||||
} else {
|
||||
// Queue should not have advanced at all
|
||||
for range tt.testData {
|
||||
require.True(t, scan.Next())
|
||||
}
|
||||
// Should now be at the end of the queue
|
||||
require.False(t, scan.Next())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateNewQueueDuplicateID(t *testing.T) {
|
||||
|
@ -274,7 +309,7 @@ func initQueueManager(t *testing.T) (string, *durableQueueManager) {
|
|||
queuePath := filepath.Join(enginePath, "replicationq")
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
|
||||
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil))
|
||||
|
||||
return queuePath, qm
|
||||
}
|
||||
|
@ -291,12 +326,27 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
|
|||
qm.replicationQueues = emptyMap
|
||||
}
|
||||
|
||||
func getTestWriteFunc(t *testing.T, expected string) func([]byte) error {
|
||||
type testRemoteWriter struct {
|
||||
writeFn func(context.Context, []byte) error
|
||||
}
|
||||
|
||||
func (tw *testRemoteWriter) Write(ctx context.Context, data []byte) error {
|
||||
return tw.writeFn(ctx, data)
|
||||
}
|
||||
|
||||
func getTestRemoteWriter(t *testing.T, expected string, returning error) remoteWriter {
|
||||
t.Helper()
|
||||
return func(b []byte) error {
|
||||
|
||||
writeFn := func(ctx context.Context, b []byte) error {
|
||||
require.Equal(t, expected, string(b))
|
||||
return nil
|
||||
return returning
|
||||
}
|
||||
|
||||
writer := &testRemoteWriter{}
|
||||
|
||||
writer.writeFn = writeFn
|
||||
|
||||
return writer
|
||||
}
|
||||
|
||||
func TestEnqueueData(t *testing.T) {
|
||||
|
@ -307,7 +357,7 @@ func TestEnqueueData(t *testing.T) {
|
|||
defer os.RemoveAll(queuePath)
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), WriteFunc)
|
||||
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil))
|
||||
|
||||
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
|
||||
require.DirExists(t, filepath.Join(queuePath, id1.String()))
|
||||
|
@ -376,9 +426,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
|
|||
require.NoError(t, rq.queue.SetMaxSegmentSize(8))
|
||||
|
||||
queueSizeBefore := rq.queue.DiskUsage()
|
||||
rq.SendWrite(func(bytes []byte) error {
|
||||
return nil
|
||||
})
|
||||
rq.SendWrite()
|
||||
|
||||
// Ensure that the smaller queue disk size was reflected in the metrics.
|
||||
currentBytesQueued := getPromMetric(t, "replications_queue_current_bytes_queued", reg)
|
||||
|
|
|
@ -181,6 +181,31 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i
|
|||
return &r, nil
|
||||
}
|
||||
|
||||
// UpdateResponseInfo sets the most recent HTTP status code and error message received for a replication remote write.
|
||||
func (s *Store) UpdateResponseInfo(ctx context.Context, id platform.ID, code int, message string) error {
|
||||
updates := sq.Eq{
|
||||
"latest_response_code": code,
|
||||
"latest_error_message": message,
|
||||
}
|
||||
|
||||
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).Suffix("RETURNING id")
|
||||
|
||||
query, args, err := q.ToSql()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var d platform.ID
|
||||
if err := s.sqlStore.DB.GetContext(ctx, &d, query, args...); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return errReplicationNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteReplication deletes a replication by ID from the database. Caller is responsible for managing locks.
|
||||
func (s *Store) DeleteReplication(ctx context.Context, id platform.ID) error {
|
||||
q := sq.Delete("replications").Where(sq.Eq{"id": id}).Suffix("RETURNING id")
|
||||
|
@ -217,8 +242,8 @@ func (s *Store) DeleteBucketReplications(ctx context.Context, localBucketID plat
|
|||
return deleted, nil
|
||||
}
|
||||
|
||||
func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*ReplicationHTTPConfig, error) {
|
||||
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id").
|
||||
func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*influxdb.ReplicationHTTPConfig, error) {
|
||||
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.drop_non_retryable_data").
|
||||
From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id)
|
||||
|
||||
query, args, err := q.ToSql()
|
||||
|
@ -226,7 +251,7 @@ func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*Replica
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var rc ReplicationHTTPConfig
|
||||
var rc influxdb.ReplicationHTTPConfig
|
||||
if err := s.sqlStore.DB.GetContext(ctx, &rc, query, args...); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, errReplicationNotFound
|
||||
|
@ -236,7 +261,7 @@ func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*Replica
|
|||
return &rc, nil
|
||||
}
|
||||
|
||||
func (s *Store) PopulateRemoteHTTPConfig(ctx context.Context, id platform.ID, target *ReplicationHTTPConfig) error {
|
||||
func (s *Store) PopulateRemoteHTTPConfig(ctx context.Context, id platform.ID, target *influxdb.ReplicationHTTPConfig) error {
|
||||
q := sq.Select("remote_url", "remote_api_token", "remote_org_id", "allow_insecure_tls").
|
||||
From("remotes").Where(sq.Eq{"id": id})
|
||||
query, args, err := q.ToSql()
|
||||
|
|
|
@ -3,6 +3,7 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
|
@ -38,7 +39,7 @@ var (
|
|||
RemoteBucketID: replication.RemoteBucketID,
|
||||
MaxQueueSizeBytes: replication.MaxQueueSizeBytes,
|
||||
}
|
||||
httpConfig = ReplicationHTTPConfig{
|
||||
httpConfig = influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: fmt.Sprintf("http://%s.cloud", replication.RemoteID),
|
||||
RemoteToken: replication.RemoteID.String(),
|
||||
RemoteOrgID: platform.ID(888888),
|
||||
|
@ -131,6 +132,38 @@ func TestUpdateAndGetReplication(t *testing.T) {
|
|||
require.Equal(t, updatedReplication, *updated)
|
||||
}
|
||||
|
||||
func TestUpdateResponseInfo(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testStore, clean := newTestStore(t)
|
||||
defer clean(t)
|
||||
|
||||
insertRemote(t, testStore, replication.RemoteID)
|
||||
insertRemote(t, testStore, updatedReplication.RemoteID)
|
||||
|
||||
testCode := http.StatusBadRequest
|
||||
testMsg := "some error message"
|
||||
|
||||
// Updating a nonexistent ID fails.
|
||||
err := testStore.UpdateResponseInfo(ctx, initID, testCode, testMsg)
|
||||
require.Equal(t, errReplicationNotFound, err)
|
||||
|
||||
// Create a replication.
|
||||
created, err := testStore.CreateReplication(ctx, initID, createReq)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *created)
|
||||
|
||||
// Update the replication response info.
|
||||
err = testStore.UpdateResponseInfo(ctx, initID, testCode, testMsg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the updated response code and error message.
|
||||
got, err := testStore.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int32(testCode), *got.LatestResponseCode)
|
||||
require.Equal(t, testMsg, *got.LatestErrorMessage)
|
||||
}
|
||||
|
||||
func TestUpdateMissingRemote(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -372,16 +405,16 @@ func TestPopulateRemoteHTTPConfig(t *testing.T) {
|
|||
testStore, clean := newTestStore(t)
|
||||
defer clean(t)
|
||||
|
||||
emptyConfig := &ReplicationHTTPConfig{}
|
||||
emptyConfig := &influxdb.ReplicationHTTPConfig{}
|
||||
|
||||
// Remote not found returns the appropriate error
|
||||
target := &ReplicationHTTPConfig{}
|
||||
target := &influxdb.ReplicationHTTPConfig{}
|
||||
err := testStore.PopulateRemoteHTTPConfig(ctx, replication.RemoteID, target)
|
||||
require.Equal(t, errRemoteNotFound(replication.RemoteID, nil), err)
|
||||
require.Equal(t, emptyConfig, target)
|
||||
|
||||
// Valid result
|
||||
want := ReplicationHTTPConfig{
|
||||
want := influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: httpConfig.RemoteURL,
|
||||
RemoteToken: httpConfig.RemoteToken,
|
||||
RemoteOrgID: httpConfig.RemoteOrgID,
|
||||
|
|
|
@ -2,13 +2,9 @@ package internal
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"runtime"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
"github.com/influxdata/influxdb/v2/replications/remotewrite"
|
||||
)
|
||||
|
||||
func NewValidator() *noopWriteValidator {
|
||||
|
@ -19,40 +15,7 @@ func NewValidator() *noopWriteValidator {
|
|||
// to the remote host using the configured information.
|
||||
type noopWriteValidator struct{}
|
||||
|
||||
var userAgent = fmt.Sprintf(
|
||||
"influxdb-oss/%s (%s) Sha/%s Date/%s",
|
||||
influxdb.GetBuildInfo().Version,
|
||||
runtime.GOOS,
|
||||
influxdb.GetBuildInfo().Commit,
|
||||
influxdb.GetBuildInfo().Date)
|
||||
|
||||
func (s noopWriteValidator) ValidateReplication(ctx context.Context, config *ReplicationHTTPConfig) error {
|
||||
u, err := url.Parse(config.RemoteURL)
|
||||
if err != nil {
|
||||
return &ierrors.Error{
|
||||
Code: ierrors.EInvalid,
|
||||
Msg: fmt.Sprintf("host URL %q is invalid", config.RemoteURL),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
params := api.ConfigParams{
|
||||
Host: u,
|
||||
UserAgent: userAgent,
|
||||
Token: &config.RemoteToken,
|
||||
AllowInsecureTLS: config.AllowInsecureTLS,
|
||||
}
|
||||
client := api.NewAPIClient(api.NewAPIConfig(params)).WriteApi
|
||||
|
||||
noopReq := client.PostWrite(ctx).
|
||||
Org(config.RemoteOrgID.String()).
|
||||
Bucket(config.RemoteBucketID.String()).
|
||||
Body([]byte{})
|
||||
|
||||
if err := noopReq.Execute(); err != nil {
|
||||
return &ierrors.Error{
|
||||
Code: ierrors.EInvalid,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
func (s noopWriteValidator) ValidateReplication(ctx context.Context, config *influxdb.ReplicationHTTPConfig) error {
|
||||
_, err := remotewrite.PostWrite(ctx, config, []byte{}, remotewrite.DefaultTimeout)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValidateReplication(t *testing.T) {
|
||||
tests := []struct {
|
||||
status int
|
||||
valid bool
|
||||
}{
|
||||
{http.StatusNoContent, true},
|
||||
{http.StatusOK, false},
|
||||
{http.StatusBadRequest, false},
|
||||
{http.StatusTeapot, false},
|
||||
{http.StatusInternalServerError, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("status code %d", tt.status), func(t *testing.T) {
|
||||
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(tt.status)
|
||||
}))
|
||||
defer svr.Close()
|
||||
|
||||
validator := noopWriteValidator{}
|
||||
|
||||
config := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
err := validator.ValidateReplication(context.Background(), config)
|
||||
if tt.valid {
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/influxdata/influxdb/v2/replications/remotewrite (interfaces: HttpConfigStore)
|
||||
|
||||
// Package mock is a generated GoMock package.
|
||||
package mock
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
platform "github.com/influxdata/influxdb/v2/kit/platform"
|
||||
)
|
||||
|
||||
// MockHttpConfigStore is a mock of HttpConfigStore interface.
|
||||
type MockHttpConfigStore struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockHttpConfigStoreMockRecorder
|
||||
}
|
||||
|
||||
// MockHttpConfigStoreMockRecorder is the mock recorder for MockHttpConfigStore.
|
||||
type MockHttpConfigStoreMockRecorder struct {
|
||||
mock *MockHttpConfigStore
|
||||
}
|
||||
|
||||
// NewMockHttpConfigStore creates a new mock instance.
|
||||
func NewMockHttpConfigStore(ctrl *gomock.Controller) *MockHttpConfigStore {
|
||||
mock := &MockHttpConfigStore{ctrl: ctrl}
|
||||
mock.recorder = &MockHttpConfigStoreMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockHttpConfigStore) EXPECT() *MockHttpConfigStoreMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// GetFullHTTPConfig mocks base method.
|
||||
func (m *MockHttpConfigStore) GetFullHTTPConfig(arg0 context.Context, arg1 platform.ID) (*influxdb.ReplicationHTTPConfig, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetFullHTTPConfig", arg0, arg1)
|
||||
ret0, _ := ret[0].(*influxdb.ReplicationHTTPConfig)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetFullHTTPConfig indicates an expected call of GetFullHTTPConfig.
|
||||
func (mr *MockHttpConfigStoreMockRecorder) GetFullHTTPConfig(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFullHTTPConfig", reflect.TypeOf((*MockHttpConfigStore)(nil).GetFullHTTPConfig), arg0, arg1)
|
||||
}
|
||||
|
||||
// UpdateResponseInfo mocks base method.
|
||||
func (m *MockHttpConfigStore) UpdateResponseInfo(arg0 context.Context, arg1 platform.ID, arg2 int, arg3 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "UpdateResponseInfo", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// UpdateResponseInfo indicates an expected call of UpdateResponseInfo.
|
||||
func (mr *MockHttpConfigStoreMockRecorder) UpdateResponseInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateResponseInfo", reflect.TypeOf((*MockHttpConfigStore)(nil).UpdateResponseInfo), arg0, arg1, arg2, arg3)
|
||||
}
|
|
@ -11,7 +11,6 @@ import (
|
|||
gomock "github.com/golang/mock/gomock"
|
||||
influxdb "github.com/influxdata/influxdb/v2"
|
||||
platform "github.com/influxdata/influxdb/v2/kit/platform"
|
||||
internal "github.com/influxdata/influxdb/v2/replications/internal"
|
||||
)
|
||||
|
||||
// MockServiceStore is a mock of ServiceStore interface.
|
||||
|
@ -82,10 +81,10 @@ func (mr *MockServiceStoreMockRecorder) DeleteReplication(arg0, arg1 interface{}
|
|||
}
|
||||
|
||||
// GetFullHTTPConfig mocks base method.
|
||||
func (m *MockServiceStore) GetFullHTTPConfig(arg0 context.Context, arg1 platform.ID) (*internal.ReplicationHTTPConfig, error) {
|
||||
func (m *MockServiceStore) GetFullHTTPConfig(arg0 context.Context, arg1 platform.ID) (*influxdb.ReplicationHTTPConfig, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetFullHTTPConfig", arg0, arg1)
|
||||
ret0, _ := ret[0].(*internal.ReplicationHTTPConfig)
|
||||
ret0, _ := ret[0].(*influxdb.ReplicationHTTPConfig)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
@ -139,7 +138,7 @@ func (mr *MockServiceStoreMockRecorder) Lock() *gomock.Call {
|
|||
}
|
||||
|
||||
// PopulateRemoteHTTPConfig mocks base method.
|
||||
func (m *MockServiceStore) PopulateRemoteHTTPConfig(arg0 context.Context, arg1 platform.ID, arg2 *internal.ReplicationHTTPConfig) error {
|
||||
func (m *MockServiceStore) PopulateRemoteHTTPConfig(arg0 context.Context, arg1 platform.ID, arg2 *influxdb.ReplicationHTTPConfig) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "PopulateRemoteHTTPConfig", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
internal "github.com/influxdata/influxdb/v2/replications/internal"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
)
|
||||
|
||||
// MockReplicationValidator is a mock of ReplicationValidator interface.
|
||||
|
@ -36,7 +36,7 @@ func (m *MockReplicationValidator) EXPECT() *MockReplicationValidatorMockRecorde
|
|||
}
|
||||
|
||||
// ValidateReplication mocks base method.
|
||||
func (m *MockReplicationValidator) ValidateReplication(arg0 context.Context, arg1 *internal.ReplicationHTTPConfig) error {
|
||||
func (m *MockReplicationValidator) ValidateReplication(arg0 context.Context, arg1 *influxdb.ReplicationHTTPConfig) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ValidateReplication", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
|
|
|
@ -0,0 +1,243 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
"github.com/influxdata/influxdb/v2/replications/metrics"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
retryAfterHeaderKey = "Retry-After"
|
||||
maximumBackoffTime = 15 * time.Minute
|
||||
maximumAttempts = 10 // After this many attempts, wait maximumBackoffTime
|
||||
DefaultTimeout = 2 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
userAgent = fmt.Sprintf(
|
||||
"influxdb-oss/%s (%s) Sha/%s Date/%s",
|
||||
influxdb.GetBuildInfo().Version,
|
||||
runtime.GOOS,
|
||||
influxdb.GetBuildInfo().Commit,
|
||||
influxdb.GetBuildInfo().Date)
|
||||
)
|
||||
|
||||
func invalidRemoteUrl(remoteUrl string, err error) *ierrors.Error {
|
||||
return &ierrors.Error{
|
||||
Code: ierrors.EInvalid,
|
||||
Msg: fmt.Sprintf("host URL %q is invalid", remoteUrl),
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func invalidResponseCode(code int) *ierrors.Error {
|
||||
return &ierrors.Error{
|
||||
Code: ierrors.EInvalid,
|
||||
Msg: fmt.Sprintf("invalid response code %d, must be %d", code, http.StatusNoContent),
|
||||
}
|
||||
}
|
||||
|
||||
type HttpConfigStore interface {
|
||||
GetFullHTTPConfig(context.Context, platform.ID) (*influxdb.ReplicationHTTPConfig, error)
|
||||
UpdateResponseInfo(context.Context, platform.ID, int, string) error
|
||||
}
|
||||
|
||||
type waitFunc func(time.Duration) <-chan time.Time
|
||||
|
||||
type writer struct {
|
||||
replicationID platform.ID
|
||||
configStore HttpConfigStore
|
||||
metrics *metrics.ReplicationsMetrics
|
||||
logger *zap.Logger
|
||||
maximumBackoffTime time.Duration
|
||||
maximumAttemptsForBackoffTime int
|
||||
clientTimeout time.Duration
|
||||
maximumAttemptsBeforeErr int // used for testing, 0 for unlimited
|
||||
waitFunc waitFunc // used for testing
|
||||
}
|
||||
|
||||
func NewWriter(replicationID platform.ID, store HttpConfigStore, metrics *metrics.ReplicationsMetrics, logger *zap.Logger) *writer {
|
||||
return &writer{
|
||||
replicationID: replicationID,
|
||||
configStore: store,
|
||||
metrics: metrics,
|
||||
logger: logger,
|
||||
maximumBackoffTime: maximumBackoffTime,
|
||||
maximumAttemptsForBackoffTime: maximumAttempts,
|
||||
clientTimeout: DefaultTimeout,
|
||||
waitFunc: func(t time.Duration) <-chan time.Time {
|
||||
return time.After(t)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Write(ctx context.Context, data []byte) error {
|
||||
attempts := 0
|
||||
|
||||
for {
|
||||
if w.maximumAttemptsBeforeErr > 0 && attempts >= w.maximumAttemptsBeforeErr {
|
||||
return errors.New("maximum number of attempts exceeded")
|
||||
}
|
||||
|
||||
// Get the most recent config on every attempt, in case the user has updated the config to correct errors.
|
||||
conf, err := w.configStore.GetFullHTTPConfig(ctx, w.replicationID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := PostWrite(ctx, conf, data, w.clientTimeout)
|
||||
res, msg, ok := normalizeResponse(res, err)
|
||||
if !ok {
|
||||
// Can't retry - bail out.
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
// Successful write
|
||||
return nil
|
||||
}
|
||||
|
||||
attempts++
|
||||
var waitTime time.Duration
|
||||
hasSetWaitTime := false
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusBadRequest:
|
||||
if conf.DropNonRetryableData {
|
||||
w.logger.Debug(fmt.Sprintf("dropped %d bytes of data due to %d response from server", len(data), http.StatusBadRequest))
|
||||
return nil
|
||||
}
|
||||
case http.StatusTooManyRequests:
|
||||
headerTime := w.waitTimeFromHeader(res)
|
||||
if headerTime != 0 {
|
||||
waitTime = headerTime
|
||||
hasSetWaitTime = true
|
||||
}
|
||||
}
|
||||
|
||||
if !hasSetWaitTime {
|
||||
waitTime = w.backoff(attempts)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-w.waitFunc(waitTime):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// normalizeReponse returns a guaranteed non-nil value for *http.Response, and an extracted error message string for use
|
||||
// in logging. The returned bool indicates that the response is retryable - false means that the write request should be
|
||||
// aborted due to a malformed request.
|
||||
func normalizeResponse(r *http.Response, err error) (*http.Response, string, bool) {
|
||||
var errMsg string
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
if r == nil {
|
||||
if errorIsTimeout(err) {
|
||||
return &http.Response{}, errMsg, true
|
||||
}
|
||||
|
||||
return &http.Response{}, errMsg, false
|
||||
}
|
||||
|
||||
return r, errMsg, true
|
||||
}
|
||||
|
||||
func errorIsTimeout(err error) bool {
|
||||
if err, ok := err.(net.Error); ok && err.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func PostWrite(ctx context.Context, config *influxdb.ReplicationHTTPConfig, data []byte, timeout time.Duration) (*http.Response, error) {
|
||||
u, err := url.Parse(config.RemoteURL)
|
||||
if err != nil {
|
||||
return nil, invalidRemoteUrl(config.RemoteURL, err)
|
||||
}
|
||||
|
||||
params := api.ConfigParams{
|
||||
Host: u,
|
||||
UserAgent: userAgent,
|
||||
Token: &config.RemoteToken,
|
||||
AllowInsecureTLS: config.AllowInsecureTLS,
|
||||
}
|
||||
conf := api.NewAPIConfig(params)
|
||||
conf.HTTPClient.Timeout = timeout
|
||||
client := api.NewAPIClient(conf).WriteApi
|
||||
|
||||
req := client.PostWrite(ctx).
|
||||
Org(config.RemoteOrgID.String()).
|
||||
Bucket(config.RemoteBucketID.String()).
|
||||
Body(data)
|
||||
|
||||
// Don't set the encoding header for empty bodies, like those used for validation.
|
||||
if len(data) > 0 {
|
||||
req = req.ContentEncoding("gzip")
|
||||
}
|
||||
|
||||
res, err := req.ExecuteWithHttpInfo()
|
||||
if res == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only a response of 204 is valid for a successful write
|
||||
if res.StatusCode != http.StatusNoContent {
|
||||
err = invalidResponseCode(res.StatusCode)
|
||||
}
|
||||
|
||||
// Must return the response so that the status code and headers can be inspected by the caller, even if the response
|
||||
// was not 204.
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (w *writer) backoff(numAttempts int) time.Duration {
|
||||
if numAttempts > w.maximumAttemptsForBackoffTime {
|
||||
return w.maximumBackoffTime
|
||||
}
|
||||
|
||||
s := 0.5 * math.Pow(2, float64(numAttempts-1))
|
||||
return time.Duration(s * float64(time.Second))
|
||||
}
|
||||
|
||||
func (w *writer) waitTimeFromHeader(r *http.Response) time.Duration {
|
||||
str := r.Header.Get(retryAfterHeaderKey)
|
||||
if str == "" {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Use a minimal backoff time if the header is set to 0 for some reason, maybe due to rounding.
|
||||
if str == "0" {
|
||||
return w.backoff(1)
|
||||
}
|
||||
|
||||
rtr, err := strconv.Atoi(str)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return time.Duration(rtr * int(time.Second))
|
||||
}
|
|
@ -0,0 +1,340 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||
"github.com/influxdata/influxdb/v2/replications/metrics"
|
||||
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ../mock/http_config_store.go github.com/influxdata/influxdb/v2/replications/remotewrite HttpConfigStore
|
||||
|
||||
var (
|
||||
testID = platform.ID(1)
|
||||
)
|
||||
|
||||
func testWriter(t *testing.T) (*writer, *replicationsMock.MockHttpConfigStore) {
|
||||
ctrl := gomock.NewController(t)
|
||||
configStore := replicationsMock.NewMockHttpConfigStore(ctrl)
|
||||
w := NewWriter(testID, configStore, metrics.NewReplicationsMetrics(), zaptest.NewLogger(t))
|
||||
return w, configStore
|
||||
}
|
||||
|
||||
func testServer(t *testing.T, status int, wantData []byte) *httptest.Server {
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gotData, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wantData, gotData)
|
||||
w.WriteHeader(status)
|
||||
}))
|
||||
}
|
||||
|
||||
func instaWait() waitFunc {
|
||||
return func(t time.Duration) <-chan time.Time {
|
||||
out := make(chan time.Time)
|
||||
close(out)
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testData := []byte("some data")
|
||||
|
||||
t.Run("error getting config", func(t *testing.T) {
|
||||
wantErr := errors.New("uh oh")
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(nil, wantErr)
|
||||
require.Equal(t, wantErr, w.Write(context.Background(), []byte{}))
|
||||
})
|
||||
|
||||
t.Run("nil response from PostWrite", func(t *testing.T) {
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: "not a good URL",
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
||||
require.Error(t, w.Write(context.Background(), []byte{}))
|
||||
})
|
||||
|
||||
t.Run("immediate good response", func(t *testing.T) {
|
||||
svr := testServer(t, http.StatusNoContent, testData)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil)
|
||||
require.NoError(t, w.Write(context.Background(), testData))
|
||||
})
|
||||
|
||||
t.Run("error updating response info", func(t *testing.T) {
|
||||
wantErr := errors.New("o no")
|
||||
|
||||
svr := testServer(t, http.StatusNoContent, testData)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(wantErr)
|
||||
require.Equal(t, wantErr, w.Write(context.Background(), testData))
|
||||
})
|
||||
|
||||
t.Run("bad server responses that never succeed", func(t *testing.T) {
|
||||
testAttempts := 3
|
||||
|
||||
for _, status := range []int{http.StatusOK, http.StatusTeapot, http.StatusInternalServerError} {
|
||||
t.Run(fmt.Sprintf("status code %d", status), func(t *testing.T) {
|
||||
svr := testServer(t, status, testData)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
w.waitFunc = instaWait()
|
||||
w.maximumAttemptsBeforeErr = testAttempts
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(testAttempts)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, status, invalidResponseCode(status).Error()).Return(nil).Times(testAttempts)
|
||||
require.Equal(t, errors.New("maximum number of attempts exceeded"), w.Write(context.Background(), testData))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("bad server responses at first followed by good server responses", func(t *testing.T) {
|
||||
testAttempts := 10
|
||||
attemptsBeforeSuccess := 3
|
||||
serverCounter := 0
|
||||
badStatus := http.StatusInternalServerError
|
||||
goodStatus := http.StatusNoContent
|
||||
|
||||
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
serverCounter++
|
||||
gotData, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testData, gotData)
|
||||
if serverCounter >= attemptsBeforeSuccess {
|
||||
w.WriteHeader(goodStatus)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(badStatus)
|
||||
}))
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
w.waitFunc = instaWait()
|
||||
w.maximumAttemptsBeforeErr = testAttempts
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(attemptsBeforeSuccess)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, badStatus, invalidResponseCode(badStatus).Error()).Return(nil).Times(attemptsBeforeSuccess - 1)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, goodStatus, "").Return(nil)
|
||||
require.NoError(t, w.Write(context.Background(), testData))
|
||||
})
|
||||
|
||||
t.Run("drops bad data after config is updated", func(t *testing.T) {
|
||||
testAttempts := 5
|
||||
|
||||
svr := testServer(t, http.StatusBadRequest, testData)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
updatedConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
DropNonRetryableData: true,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
w.waitFunc = instaWait()
|
||||
w.maximumAttemptsBeforeErr = testAttempts
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(testAttempts - 1)
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(updatedConfig, nil)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil).Times(testAttempts)
|
||||
require.NoError(t, w.Write(context.Background(), testData))
|
||||
})
|
||||
|
||||
t.Run("uses wait time from response header if present", func(t *testing.T) {
|
||||
testAttempts := 3
|
||||
numSeconds := 5
|
||||
waitTimeFromHeader := 5 * time.Second
|
||||
|
||||
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gotData, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testData, gotData)
|
||||
w.Header().Set(retryAfterHeaderKey, strconv.Itoa(numSeconds))
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
}))
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
w.waitFunc = func(dur time.Duration) <-chan time.Time {
|
||||
require.Equal(t, waitTimeFromHeader, dur)
|
||||
out := make(chan time.Time)
|
||||
close(out)
|
||||
return out
|
||||
}
|
||||
w.maximumAttemptsBeforeErr = testAttempts
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(testAttempts)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTooManyRequests, invalidResponseCode(http.StatusTooManyRequests).Error()).Return(nil).Times(testAttempts)
|
||||
require.Equal(t, errors.New("maximum number of attempts exceeded"), w.Write(context.Background(), testData))
|
||||
})
|
||||
|
||||
t.Run("can cancel with context", func(t *testing.T) {
|
||||
svr := testServer(t, http.StatusInternalServerError, testData)
|
||||
defer svr.Close()
|
||||
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
w, configStore := testWriter(t)
|
||||
w.waitFunc = func(dur time.Duration) <-chan time.Time {
|
||||
cancel()
|
||||
return time.After(time.Second)
|
||||
}
|
||||
|
||||
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
|
||||
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusInternalServerError, invalidResponseCode(http.StatusInternalServerError).Error()).Return(nil)
|
||||
require.Equal(t, context.Canceled, w.Write(ctx, testData))
|
||||
})
|
||||
}
|
||||
|
||||
func TestPostWrite(t *testing.T) {
|
||||
testData := []byte("some data")
|
||||
|
||||
tests := []struct {
|
||||
status int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
status: http.StatusOK,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
status: http.StatusNoContent,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
status: http.StatusBadRequest,
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("status code %d", tt.status), func(t *testing.T) {
|
||||
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
recData, err := ioutil.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testData, recData)
|
||||
|
||||
w.WriteHeader(tt.status)
|
||||
}))
|
||||
defer svr.Close()
|
||||
|
||||
config := &influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: svr.URL,
|
||||
}
|
||||
|
||||
res, err := PostWrite(context.Background(), config, testData, time.Second)
|
||||
if tt.wantErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, tt.status, res.StatusCode)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitTimeFromHeader(t *testing.T) {
|
||||
w := &writer{
|
||||
maximumAttemptsForBackoffTime: maximumAttempts,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
headerKey string
|
||||
headerVal string
|
||||
want time.Duration
|
||||
}{
|
||||
{
|
||||
headerKey: retryAfterHeaderKey,
|
||||
headerVal: "30",
|
||||
want: 30 * time.Second,
|
||||
},
|
||||
{
|
||||
headerKey: retryAfterHeaderKey,
|
||||
headerVal: "0",
|
||||
want: w.backoff(1),
|
||||
},
|
||||
{
|
||||
headerKey: retryAfterHeaderKey,
|
||||
headerVal: "not a number",
|
||||
want: 0,
|
||||
},
|
||||
{
|
||||
headerKey: "some other thing",
|
||||
headerVal: "not a number",
|
||||
want: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(fmt.Sprintf("%q - %q", tt.headerKey, tt.headerVal), func(t *testing.T) {
|
||||
r := &http.Response{
|
||||
Header: http.Header{
|
||||
tt.headerKey: []string{tt.headerVal},
|
||||
},
|
||||
}
|
||||
|
||||
got := w.waitTimeFromHeader(r)
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -31,9 +31,10 @@ func errLocalBucketNotFound(id platform.ID, cause error) error {
|
|||
|
||||
func NewService(sqlStore *sqlite.SqlStore, bktSvc BucketService, localWriter storage.PointsWriter, log *zap.Logger, enginePath string) (*service, *metrics.ReplicationsMetrics) {
|
||||
metrs := metrics.NewReplicationsMetrics()
|
||||
store := internal.NewStore(sqlStore)
|
||||
|
||||
return &service{
|
||||
store: internal.NewStore(sqlStore),
|
||||
store: store,
|
||||
idGenerator: snowflake.NewIDGenerator(),
|
||||
bucketService: bktSvc,
|
||||
localWriter: localWriter,
|
||||
|
@ -43,13 +44,13 @@ func NewService(sqlStore *sqlite.SqlStore, bktSvc BucketService, localWriter sto
|
|||
log,
|
||||
filepath.Join(enginePath, "replicationq"),
|
||||
metrs,
|
||||
internal.WriteFunc,
|
||||
store,
|
||||
),
|
||||
}, metrs
|
||||
}
|
||||
|
||||
type ReplicationValidator interface {
|
||||
ValidateReplication(context.Context, *internal.ReplicationHTTPConfig) error
|
||||
ValidateReplication(context.Context, *influxdb.ReplicationHTTPConfig) error
|
||||
}
|
||||
|
||||
type BucketService interface {
|
||||
|
@ -76,8 +77,8 @@ type ServiceStore interface {
|
|||
GetReplication(context.Context, platform.ID) (*influxdb.Replication, error)
|
||||
UpdateReplication(context.Context, platform.ID, influxdb.UpdateReplicationRequest) (*influxdb.Replication, error)
|
||||
DeleteReplication(context.Context, platform.ID) error
|
||||
PopulateRemoteHTTPConfig(context.Context, platform.ID, *internal.ReplicationHTTPConfig) error
|
||||
GetFullHTTPConfig(context.Context, platform.ID) (*internal.ReplicationHTTPConfig, error)
|
||||
PopulateRemoteHTTPConfig(context.Context, platform.ID, *influxdb.ReplicationHTTPConfig) error
|
||||
GetFullHTTPConfig(context.Context, platform.ID) (*influxdb.ReplicationHTTPConfig, error)
|
||||
DeleteBucketReplications(context.Context, platform.ID) ([]platform.ID, error)
|
||||
}
|
||||
|
||||
|
@ -149,7 +150,7 @@ func (s service) ValidateNewReplication(ctx context.Context, request influxdb.Cr
|
|||
return errLocalBucketNotFound(request.LocalBucketID, err)
|
||||
}
|
||||
|
||||
config := internal.ReplicationHTTPConfig{RemoteBucketID: request.RemoteBucketID}
|
||||
config := influxdb.ReplicationHTTPConfig{RemoteBucketID: request.RemoteBucketID}
|
||||
if err := s.store.PopulateRemoteHTTPConfig(ctx, request.RemoteID, &config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
|
||||
"github.com/influxdata/influxdb/v2/mock"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/replications/internal"
|
||||
replicationsMock "github.com/influxdata/influxdb/v2/replications/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
@ -90,7 +89,7 @@ var (
|
|||
RemoteBucketID: replication1.RemoteBucketID,
|
||||
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
|
||||
}
|
||||
httpConfig = internal.ReplicationHTTPConfig{
|
||||
httpConfig = influxdb.ReplicationHTTPConfig{
|
||||
RemoteURL: fmt.Sprintf("http://%s.cloud", replication1.RemoteID),
|
||||
RemoteToken: replication1.RemoteID.String(),
|
||||
RemoteOrgID: platform.ID(888888),
|
||||
|
@ -291,7 +290,7 @@ func TestValidateNewReplication(t *testing.T) {
|
|||
|
||||
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.req.LocalBucketID).Return(nil, tt.bucketErr)
|
||||
|
||||
testConfig := &internal.ReplicationHTTPConfig{RemoteBucketID: tt.req.RemoteBucketID}
|
||||
testConfig := &influxdb.ReplicationHTTPConfig{RemoteBucketID: tt.req.RemoteBucketID}
|
||||
if tt.bucketErr == nil {
|
||||
mocks.serviceStore.EXPECT().PopulateRemoteHTTPConfig(gomock.Any(), tt.req.RemoteID, testConfig).Return(tt.storeErr)
|
||||
}
|
||||
|
@ -444,7 +443,7 @@ func TestValidateUpdatedReplication(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
request influxdb.UpdateReplicationRequest
|
||||
baseConfig *internal.ReplicationHTTPConfig
|
||||
baseConfig *influxdb.ReplicationHTTPConfig
|
||||
storeGetConfigErr error
|
||||
storePopulateConfigErr error
|
||||
validatorErr error
|
||||
|
|
Loading…
Reference in New Issue