feat: add maximum age to replication queues (#23206)

Co-authored-by: Sam Arnold <sarnold@influxdata.com>
pull/23232/head
Dane Strandboge 2022-03-25 13:06:05 -05:00 committed by GitHub
parent 89916ec98a
commit 359fcc46b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 278 additions and 205 deletions

View File

@ -6,7 +6,6 @@ import (
nethttp "net/http"
"net/http/httptest"
"net/http/httputil"
"strconv"
"sync"
"testing"
"time"
@ -184,12 +183,12 @@ csv.from(csv: csvData) |> to(bucket: %q)
// Format string to be used as a flux query to get data from a bucket.
qs := `from(bucket:%q) |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
// Data that should be in a bucket which received all of the testPoints1.
// Data that should be in a bucket which received all the testPoints1.
exp1 := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n" +
`,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n\r\n"
// Data that should be in a bucket which received all of the points from testPoints1 and testPoints2.
// Data that should be in a bucket which received all the points from testPoints1 and testPoints2.
exp2 := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n" +
`,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n" +
@ -210,24 +209,13 @@ csv.from(csv: csvData) |> to(bucket: %q)
remote2BucketName := "remote2"
// Create a proxy for use in testing. This will proxy requests to the server, and also decrement the waitGroup to
// allow for synchronization. The server also returns an error on every other request to verify that remote write
// retries work correctly.
// allow for synchronization.
var wg sync.WaitGroup
var mu sync.Mutex
serverShouldErr := true
proxyHandler := httputil.NewSingleHostReverseProxy(l.URL())
proxy := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
mu.Lock()
defer mu.Unlock()
if serverShouldErr {
serverShouldErr = false
w.Header().Set("Retry-After", strconv.Itoa(0)) // writer will use a minimal retry wait time
w.WriteHeader(nethttp.StatusTooManyRequests)
return
}
serverShouldErr = true
proxyHandler.ServeHTTP(w, r)
wg.Done()
}))
@ -266,6 +254,7 @@ csv.from(csv: csvData) |> to(bucket: %q)
LocalBucketID: l.Bucket.ID.String(),
RemoteBucketID: remote1Bucket.ID.String(),
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
MaxAgeSeconds: influxdb.DefaultReplicationMaxAge,
}
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(replicationCreateReq).Execute()
@ -292,6 +281,7 @@ csv.from(csv: csvData) |> to(bucket: %q)
LocalBucketID: l.Bucket.ID.String(),
RemoteBucketID: remote2Bucket.ID.String(),
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
MaxAgeSeconds: influxdb.DefaultReplicationMaxAge,
}
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(replicationCreateReq).Execute()
require.NoError(t, err)
@ -301,7 +291,7 @@ csv.from(csv: csvData) |> to(bucket: %q)
l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(testPoints2, l.Bucket.Name))
wg.Wait()
// All of the data should be in the local bucket and first replicated bucket. Only part of the data should be in the
// All the data should be in the local bucket and first replicated bucket. Only part of the data should be in the
// second replicated bucket.
require.Equal(t, exp2, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, localBucketName)))
require.Equal(t, exp2, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, remote1BucketName)))

2
go.mod
View File

@ -89,7 +89,7 @@ require (
require (
github.com/apache/arrow/go/v7 v7.0.0
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d
github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07
)
require (

4
go.sum
View File

@ -509,8 +509,8 @@ github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1A
github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d h1:An2Su6JpQwYTmONvndYkkjxtfAE5w04rUyH1kf/tWcg=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07 h1:qfj5kTFYg5KhePA0A5BzFV6zxW0yLYF5K0O7t3drqn8=
github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg=
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=

View File

@ -10,6 +10,7 @@ import (
const (
MinReplicationMaxQueueSizeBytes int64 = 33554430 // 32 MiB
DefaultReplicationMaxQueueSizeBytes = 2 * MinReplicationMaxQueueSizeBytes
DefaultReplicationMaxAge int64 = 604800 // 1 week, in seconds
)
var ErrMaxQueueSizeTooSmall = errors.Error{
@ -31,6 +32,7 @@ type Replication struct {
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
}
// ReplicationListFilter is a selection filter for listing replications.
@ -49,6 +51,7 @@ type Replications struct {
// TrackedReplication defines a replication stream which is currently being tracked via sqlite.
type TrackedReplication struct {
MaxQueueSizeBytes int64
MaxAgeSeconds int64
OrgID platform.ID
LocalBucketID platform.ID
}
@ -64,6 +67,7 @@ type CreateReplicationRequest struct {
RemoteBucketID platform.ID `json:"remoteBucketID"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"`
DropNonRetryableData bool `json:"dropNonRetryableData,omitempty"`
MaxAgeSeconds int64 `json:"maxAgeSeconds,omitempty"`
}
func (r *CreateReplicationRequest) OK() error {
@ -82,6 +86,7 @@ type UpdateReplicationRequest struct {
RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"`
MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"`
DropNonRetryableData *bool `json:"dropNonRetryableData,omitempty"`
MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"`
}
func (r *UpdateReplicationRequest) OK() error {

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sync"
@ -19,10 +20,12 @@ import (
const (
scannerAdvanceInterval = 10 * time.Second
purgeInterval = 60 * time.Second
defaultMaxAge = 168 * time.Hour / time.Second
)
type remoteWriter interface {
Write([]byte) error
Write(data []byte, attempt int) (time.Duration, bool, error)
}
type replicationQueue struct {
@ -36,6 +39,8 @@ type replicationQueue struct {
logger *zap.Logger
metrics *metrics.ReplicationsMetrics
remoteWriter remoteWriter
failedWrites int
maxAge time.Duration
}
type durableQueueManager struct {
@ -67,7 +72,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.
}
// InitializeQueue creates and opens a new durable queue which is associated with a replication stream.
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID) error {
func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAge int64) error {
qm.mutex.Lock()
defer qm.mutex.Unlock()
@ -107,7 +112,7 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue
}
// Map new durable queue and scanner to its corresponding replication stream via replication ID
rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue)
rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAge)
qm.replicationQueues[replicationID] = rq
rq.Open()
@ -131,6 +136,24 @@ func (rq *replicationQueue) Close() error {
func (rq *replicationQueue) run() {
defer rq.wg.Done()
retry := time.NewTimer(math.MaxInt64)
purgeTicker := time.NewTicker(purgeInterval)
sendWrite := func() {
for {
waitForRetry, shouldRetry := rq.SendWrite()
if shouldRetry && waitForRetry == 0 {
continue
}
if shouldRetry {
if !retry.Stop() {
<-retry.C
}
retry.Reset(waitForRetry)
}
break
}
}
for {
select {
@ -144,7 +167,12 @@ func (rq *replicationQueue) run() {
// that rq.SendWrite will be called again in this situation and not leave data in the queue. Outside of this
// specific scenario, the buffer might result in an extra call to rq.SendWrite that will immediately return on
// EOF.
for rq.SendWrite() {
sendWrite()
case <-retry.C:
sendWrite()
case <-purgeTicker.C:
if rq.maxAge != 0 {
rq.queue.PurgeOlderThan(time.Now().Add(-rq.maxAge))
}
}
}
@ -152,8 +180,7 @@ func (rq *replicationQueue) run() {
// SendWrite processes data enqueued into the durablequeue.Queue.
// SendWrite is responsible for processing all data in the queue at the time of calling.
// Network errors will be handled by the remote writer.
func (rq *replicationQueue) SendWrite() bool {
func (rq *replicationQueue) SendWrite() (waitForRetry time.Duration, shouldRetry bool) {
// Any error in creating the scanner should exit the loop in run()
// Either it is io.EOF indicating no data, or some other failure in making
// the Scanner object that we don't know how to handle.
@ -162,7 +189,7 @@ func (rq *replicationQueue) SendWrite() bool {
if !errors.Is(err, io.EOF) {
rq.logger.Error("Error creating replications queue scanner", zap.Error(err))
}
return false
return 0, false
}
advanceScanner := func() error {
@ -183,34 +210,38 @@ func (rq *replicationQueue) SendWrite() bool {
if err := scan.Err(); err != nil {
if errors.Is(err, io.EOF) {
// An io.EOF error here indicates that there is no more data left to process, and is an expected error.
return false
return 0, false
}
// Any other error here indicates a problem reading the data from the queue, so we log the error and drop the data
// with a call to scan.Advance() later.
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
}
if err = rq.remoteWriter.Write(scan.Bytes()); err != nil {
// An error here indicates an unhandleable remote write error. The scanner will not be advanced.
rq.logger.Error("Error in replication stream", zap.Error(err))
return false
if waitForRetry, shouldRetry, err := rq.remoteWriter.Write(scan.Bytes(), rq.failedWrites); err != nil {
rq.failedWrites++
// We failed the remote write. Do not advance the scanner
rq.logger.Error("Error in replication stream", zap.Error(err), zap.Int("retries", rq.failedWrites))
return waitForRetry, shouldRetry
}
// a successful write resets the number of failed write attempts to zero
rq.failedWrites = 0
// Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue
// position.
select {
case <-ticker.C:
if err := advanceScanner(); err != nil {
return false
return 0, false
}
default:
}
}
if err := advanceScanner(); err != nil {
return false
return 0, false
}
return true
return 0, true
}
// DeleteQueue deletes a durable queue and its associated data on disk.
@ -309,7 +340,7 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl
errOccurred = true
continue
} else {
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue)
qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds)
qm.replicationQueues[id].Open()
qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir()))
}
@ -403,9 +434,16 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
return nil
}
func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue) *replicationQueue {
func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAge int64) *replicationQueue {
logger := qm.logger.With(zap.String("replication_id", id.String()))
done := make(chan struct{})
// check for max age minimum
var maxAgeTime time.Duration
if maxAge < 0 {
maxAgeTime = defaultMaxAge
} else {
maxAgeTime = time.Duration(maxAge)
}
return &replicationQueue{
id: id,
@ -417,6 +455,7 @@ func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platfor
logger: logger,
metrics: qm.metrics,
remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger, done),
maxAge: maxAgeTime,
}
}

View File

@ -37,7 +37,7 @@ func TestCreateNewQueueDirExists(t *testing.T) {
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
@ -79,26 +79,20 @@ func TestEnqueueScan(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.name == "multiple points with unsuccessful write" {
t.Skip("Fix this test when https://github.com/influxdata/influxdb/issues/23109 is fixed")
}
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
rq := qm.replicationQueues[id1]
var writeCounter sync.WaitGroup
rq.remoteWriter = getTestRemoteWriterSequenced(t, tt.testData, tt.writeFuncReturn, &writeCounter)
rq.remoteWriter = getTestRemoteWriterSequenced(t, tt.testData, tt.writeFuncReturn, nil)
// Enqueue the data
for _, dat := range tt.testData {
writeCounter.Add(1)
err = qm.EnqueueData(id1, []byte(dat), 1)
require.NoError(t, err)
}
writeCounter.Wait()
// Check queue position
closeRq(rq)
@ -125,11 +119,11 @@ func TestCreateNewQueueDuplicateID(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
// Try to initialize another queue with the same replication ID
err = qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err = qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"")
}
@ -140,7 +134,7 @@ func TestDeleteQueueDirRemoved(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
@ -179,7 +173,7 @@ func TestStartReplicationQueue(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
@ -187,6 +181,7 @@ func TestStartReplicationQueue(t *testing.T) {
trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication)
trackedReplications[id1] = &influxdb.TrackedReplication{
MaxQueueSizeBytes: maxQueueSizeBytes,
MaxAgeSeconds: 0,
OrgID: orgID1,
LocalBucketID: localBucketID1,
}
@ -213,7 +208,7 @@ func TestStartReplicationQueuePartialDelete(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
@ -241,12 +236,12 @@ func TestStartReplicationQueuesMultiple(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2)
err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
@ -254,11 +249,13 @@ func TestStartReplicationQueuesMultiple(t *testing.T) {
trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication)
trackedReplications[id1] = &influxdb.TrackedReplication{
MaxQueueSizeBytes: maxQueueSizeBytes,
MaxAgeSeconds: 0,
OrgID: orgID1,
LocalBucketID: localBucketID1,
}
trackedReplications[id2] = &influxdb.TrackedReplication{
MaxQueueSizeBytes: maxQueueSizeBytes,
MaxAgeSeconds: 0,
OrgID: orgID2,
LocalBucketID: localBucketID2,
}
@ -292,12 +289,12 @@ func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) {
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)
err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2)
err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
@ -305,6 +302,7 @@ func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) {
trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication)
trackedReplications[id1] = &influxdb.TrackedReplication{
MaxQueueSizeBytes: maxQueueSizeBytes,
MaxAgeSeconds: 0,
OrgID: orgID1,
LocalBucketID: localBucketID1,
}
@ -355,27 +353,31 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
}
type testRemoteWriter struct {
writeFn func([]byte) error
writeFn func([]byte, int) (time.Duration, bool, error)
}
func (tw *testRemoteWriter) Write(data []byte) error {
return tw.writeFn(data)
func (tw *testRemoteWriter) Write(data []byte, attempt int) (time.Duration, bool, error) {
return tw.writeFn(data, attempt)
}
func getTestRemoteWriterSequenced(t *testing.T, expected []string, returning error, wg *sync.WaitGroup) remoteWriter {
t.Helper()
count := 0
writeFn := func(b []byte) error {
writeFn := func(b []byte, attempt int) (time.Duration, bool, error) {
if count >= len(expected) {
t.Fatalf("count larger than expected len, %d > %d", count, len(expected))
}
require.Equal(t, expected[count], string(b))
count++
if wg != nil {
wg.Done()
}
return returning
// only progress the "pointer" if the data is successful
// enqueueing with a returned error means the same first point is retried
if returning == nil {
count++
}
return time.Second, true, returning
}
writer := &testRemoteWriter{}
@ -389,9 +391,9 @@ func getTestRemoteWriter(t *testing.T, expected string) remoteWriter {
t.Helper()
writer := &testRemoteWriter{
writeFn: func(b []byte) error {
writeFn: func(b []byte, i int) (time.Duration, bool, error) {
require.Equal(t, expected, string(b))
return nil
return time.Second, true, nil
},
}
@ -408,7 +410,7 @@ func TestEnqueueData(t *testing.T) {
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
sizes, err := qm.CurrentQueueSizes([]platform.ID{id1})
@ -440,7 +442,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) {
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
// close the scanner goroutine to specifically test EnqueueData()
@ -482,7 +484,7 @@ func TestEnqueueData_EnqueueFailure(t *testing.T) {
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
rq, ok := qm.replicationQueues[id1]
@ -515,7 +517,7 @@ func TestGoroutineReceives(t *testing.T) {
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
rq, ok := qm.replicationQueues[id1]
@ -538,7 +540,7 @@ func TestGoroutineCloses(t *testing.T) {
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
rq, ok := qm.replicationQueues[id1]
@ -565,13 +567,13 @@ func TestGetReplications(t *testing.T) {
defer os.RemoveAll(path)
// Initialize 3 queues (2nd and 3rd share the same orgID and localBucket)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1))
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
require.NoError(t, qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2))
require.NoError(t, qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
require.NoError(t, qm.InitializeQueue(id3, maxQueueSizeBytes, orgID2, localBucketID2))
require.NoError(t, qm.InitializeQueue(id3, maxQueueSizeBytes, orgID2, localBucketID2, 0))
require.DirExists(t, filepath.Join(path, id1.String()))
// Should return one matching replication queue (repl ID 1)

View File

@ -49,7 +49,8 @@ func (s *Store) Unlock() {
func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data").
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications")
if filter.OrgID.Valid() {
@ -91,10 +92,11 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques
"remote_bucket_id": request.RemoteBucketID,
"max_queue_size_bytes": request.MaxQueueSizeBytes,
"drop_non_retryable_data": request.DropNonRetryableData,
"max_age_seconds": request.MaxAgeSeconds,
"created_at": "datetime('now')",
"updated_at": "datetime('now')",
}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data")
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
query, args, err := q.ToSql()
if err != nil {
@ -117,7 +119,8 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques
func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data").
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications").
Where(sq.Eq{"id": id})
@ -158,9 +161,12 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i
if request.DropNonRetryableData != nil {
updates["drop_non_retryable_data"] = *request.DropNonRetryableData
}
if request.MaxAgeSeconds != nil {
updates["max_age_seconds"] = *request.MaxAgeSeconds
}
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data")
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
query, args, err := q.ToSql()
if err != nil {

View File

@ -29,6 +29,7 @@ var (
LocalBucketID: platform.ID(1000),
RemoteBucketID: platform.ID(99999),
MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes,
MaxAgeSeconds: 0,
}
createReq = influxdb.CreateReplicationRequest{
OrgID: replication.OrgID,
@ -38,6 +39,7 @@ var (
LocalBucketID: replication.LocalBucketID,
RemoteBucketID: replication.RemoteBucketID,
MaxQueueSizeBytes: replication.MaxQueueSizeBytes,
MaxAgeSeconds: replication.MaxAgeSeconds,
}
httpConfig = influxdb.ReplicationHTTPConfig{
RemoteURL: fmt.Sprintf("http://%s.cloud", replication.RemoteID),
@ -63,6 +65,7 @@ var (
RemoteBucketID: replication.RemoteBucketID,
MaxQueueSizeBytes: *updateReq.MaxQueueSizeBytes,
DropNonRetryableData: true,
MaxAgeSeconds: replication.MaxAgeSeconds,
}
)

View File

@ -9,7 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2"
influxdb "github.com/influxdata/influxdb/v2"
platform "github.com/influxdata/influxdb/v2/kit/platform"
)

View File

@ -107,17 +107,17 @@ func (mr *MockDurableQueueManagerMockRecorder) GetReplications(arg0, arg1 interf
}
// InitializeQueue mocks base method.
func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64, arg2, arg3 platform.ID) error {
func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64, arg2, arg3 platform.ID, arg4 int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1, arg2, arg3)
ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(error)
return ret0
}
// InitializeQueue indicates an expected call of InitializeQueue.
func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3, arg4)
}
// StartReplicationQueues mocks base method.

View File

@ -9,6 +9,7 @@ import (
"net/url"
"runtime"
"strconv"
"sync"
"time"
"github.com/influxdata/influx-cli/v2/api"
@ -85,68 +86,68 @@ func NewWriter(replicationID platform.ID, store HttpConfigStore, metrics *metric
}
}
func (w *writer) Write(data []byte) error {
// Clean up the cancellation goroutine on a successful write.
writeDone := make(chan struct{})
defer func() {
select {
case writeDone <- struct{}{}: // send signal if the cancellation goroutine is still waiting to receive
default:
}
}()
func (w *writer) Write(data []byte, attempts int) (backoff time.Duration, shouldRetry bool, err error) {
cancelOnce := &sync.Once{}
// Cancel any outstanding HTTP requests if the replicationQueue is closed.
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancelOnce.Do(cancel)
}()
go func() {
select {
case <-w.done:
cancel()
case <-writeDone:
cancelOnce.Do(cancel)
case <-ctx.Done():
// context is cancelled already
}
}()
attempts := 0
for {
// Get the most recent config on every attempt, in case the user has updated the config to correct errors.
conf, err := w.configStore.GetFullHTTPConfig(ctx, w.replicationID)
if err != nil {
return err
return w.backoff(attempts), true, err
}
res, err := PostWrite(ctx, conf, data, w.clientTimeout)
res, msg, ok := normalizeResponse(res, err)
res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout)
res, msg, ok := normalizeResponse(res, postWriteErr)
if !ok {
// Can't retry - bail out.
return err
// bail out
return w.backoff(attempts), true, postWriteErr
}
// Update metrics and most recent error diagnostic information.
if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil {
return err
// TODO: We shouldn't fail/retry a successful remote write for not successfully writing to the config store
// we should log instead of returning, like:
// w.logger.Warn("failed to update config store with latest remote write response info", zap.Error(err))
// Unfortunately this will mess up a lot of tests that are using UpdateResponseInfo failures as a proxy for
// write failures.
return w.backoff(attempts), true, err
}
if err == nil {
if postWriteErr == nil {
// Successful write
w.metrics.RemoteWriteSent(w.replicationID, len(data))
w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data)))
return nil
return 0, true, nil
}
w.metrics.RemoteWriteError(w.replicationID, res.StatusCode)
w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode))
attempts++
var waitTime time.Duration
hasSetWaitTime := false
switch res.StatusCode {
case http.StatusBadRequest:
if conf.DropNonRetryableData {
w.logger.Debug("dropped data", zap.Int("bytes", len(data)))
var errBody []byte
res.Body.Read(errBody)
w.logger.Warn("dropped data", zap.Int("bytes", len(data)), zap.String("reason", string(errBody)))
w.metrics.RemoteWriteDropped(w.replicationID, len(data))
return nil
return 0, false, nil
}
case http.StatusTooManyRequests:
headerTime := w.waitTimeFromHeader(res)
@ -159,18 +160,12 @@ func (w *writer) Write(data []byte) error {
if !hasSetWaitTime {
waitTime = w.backoff(attempts)
}
w.logger.Debug("waiting to retry", zap.Duration("wait time", waitTime))
select {
case <-w.waitFunc(waitTime):
case <-ctx.Done():
return ctx.Err()
}
}
return waitTime, true, postWriteErr
}
// normalizeReponse returns a guaranteed non-nil value for *http.Response, and an extracted error message string for use
// in logging. The returned bool indicates that the response is retryable - false means that the write request should be
// normalizeResponse returns a guaranteed non-nil value for *http.Response, and an extracted error message string for use
// in logging. The returned bool indicates if the response is a time-out - false means that the write request should be
// aborted due to a malformed request.
func normalizeResponse(r *http.Response, err error) (*http.Response, string, bool) {
var errMsg string

View File

@ -72,7 +72,9 @@ func TestWrite(t *testing.T) {
w, configStore, _ := testWriter(t)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(nil, wantErr)
require.Equal(t, wantErr, w.Write([]byte{}))
_, shouldRetry, actualErr := w.Write([]byte{}, 1)
require.Equal(t, wantErr, actualErr)
require.True(t, shouldRetry)
})
t.Run("nil response from PostWrite", func(t *testing.T) {
@ -83,7 +85,9 @@ func TestWrite(t *testing.T) {
w, configStore, _ := testWriter(t)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
require.Error(t, w.Write([]byte{}))
_, shouldRetry, actualErr := w.Write([]byte{}, 1)
require.Error(t, actualErr)
require.True(t, shouldRetry)
})
t.Run("immediate good response", func(t *testing.T) {
@ -98,7 +102,9 @@ func TestWrite(t *testing.T) {
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil)
require.NoError(t, w.Write(testData))
_, shouldRetry, actualErr := w.Write(testData, 0)
require.NoError(t, actualErr)
require.True(t, shouldRetry)
})
t.Run("error updating response info", func(t *testing.T) {
@ -115,22 +121,17 @@ func TestWrite(t *testing.T) {
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(wantErr)
require.Equal(t, wantErr, w.Write(testData))
_, shouldRetry, actualErr := w.Write(testData, 1)
require.Equal(t, wantErr, actualErr)
require.True(t, shouldRetry)
})
t.Run("bad server responses at first followed by good server responses", func(t *testing.T) {
attemptsBeforeSuccess := 3
badStatus := http.StatusInternalServerError
goodStatus := http.StatusNoContent
t.Run("bad server responses that never succeed", func(t *testing.T) {
testAttempts := 3
status := func(count int) int {
if count >= attemptsBeforeSuccess {
return goodStatus
}
return badStatus
}
svr := testServer(t, status, testData)
for _, status := range []int{http.StatusOK, http.StatusTeapot, http.StatusInternalServerError} {
t.Run(fmt.Sprintf("status code %d", status), func(t *testing.T) {
svr := testServer(t, constantStatus(status), testData)
defer svr.Close()
testConfig := &influxdb.ReplicationHTTPConfig{
@ -140,10 +141,13 @@ func TestWrite(t *testing.T) {
w, configStore, _ := testWriter(t)
w.waitFunc = instaWait()
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(attemptsBeforeSuccess + 1)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, badStatus, invalidResponseCode(badStatus).Error()).Return(nil).Times(attemptsBeforeSuccess)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, goodStatus, "").Return(nil)
require.NoError(t, w.Write(testData))
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, status, invalidResponseCode(status).Error()).Return(nil)
_, _, actualErr := w.Write(testData, testAttempts)
require.NotNil(t, actualErr)
require.Contains(t, actualErr.Error(), fmt.Sprintf("invalid response code %d", status))
})
}
})
t.Run("drops bad data after config is updated", func(t *testing.T) {
@ -167,7 +171,34 @@ func TestWrite(t *testing.T) {
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(testAttempts - 1)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(updatedConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil).Times(testAttempts)
require.NoError(t, w.Write(testData))
for i := 1; i <= testAttempts; i++ {
_, shouldRetry, actualErr := w.Write(testData, i)
if testAttempts == i {
require.NoError(t, actualErr)
require.False(t, shouldRetry)
} else {
require.Error(t, actualErr)
require.True(t, shouldRetry)
}
}
})
t.Run("gives backoff time on write response", func(t *testing.T) {
svr := testServer(t, constantStatus(http.StatusBadRequest), testData)
defer svr.Close()
testConfig := &influxdb.ReplicationHTTPConfig{
RemoteURL: svr.URL,
}
w, configStore, _ := testWriter(t)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, gomock.Any()).Return(nil)
backoff, shouldRetry, actualErr := w.Write(testData, 1)
require.Equal(t, backoff, w.backoff(1))
require.Equal(t, invalidResponseCode(http.StatusBadRequest), actualErr)
require.True(t, shouldRetry)
})
t.Run("uses wait time from response header if present", func(t *testing.T) {
@ -194,10 +225,11 @@ func TestWrite(t *testing.T) {
return instaWait()(dur)
}
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).MinTimes(1)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTooManyRequests, invalidResponseCode(http.StatusTooManyRequests).Error()).Return(nil).MinTimes(1)
err := w.Write(testData)
require.ErrorIs(t, err, context.Canceled)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTooManyRequests, invalidResponseCode(http.StatusTooManyRequests).Error()).Return(nil)
_, shouldRetry, actualErr := w.Write(testData, 1)
require.Equal(t, invalidResponseCode(http.StatusTooManyRequests), actualErr)
require.True(t, shouldRetry)
})
t.Run("can cancel with done channel", func(t *testing.T) {
@ -208,15 +240,12 @@ func TestWrite(t *testing.T) {
RemoteURL: svr.URL,
}
w, configStore, done := testWriter(t)
w, configStore, _ := testWriter(t)
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).MinTimes(1)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusInternalServerError, invalidResponseCode(http.StatusInternalServerError).Error()).
DoAndReturn(func(_, _, _, _ interface{}) error {
close(done)
return nil
})
require.Equal(t, context.Canceled, w.Write(testData))
configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil)
configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusInternalServerError, invalidResponseCode(http.StatusInternalServerError).Error()).Return(nil)
_, _, actualErr := w.Write(testData, 1)
require.Equal(t, invalidResponseCode(http.StatusInternalServerError), actualErr)
})
}
@ -226,21 +255,19 @@ func TestWrite_Metrics(t *testing.T) {
tests := []struct {
name string
status func(int) int
expectedErr error
data []byte
registerExpectations func(*testing.T, *replicationsMock.MockHttpConfigStore, *influxdb.ReplicationHTTPConfig)
checkMetrics func(*testing.T, *prom.Registry)
}{
{
name: "server errors",
status: func(i int) int {
arr := []int{http.StatusTeapot, http.StatusTeapot, http.StatusTeapot, http.StatusNoContent}
return arr[i]
},
status: constantStatus(http.StatusTeapot),
expectedErr: invalidResponseCode(http.StatusTeapot),
data: []byte{},
registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) {
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil).Times(4)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil).Times(3)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil).Times(1)
store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil)
store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil)
},
checkMetrics: func(t *testing.T, reg *prom.Registry) {
mfs := promtest.MustGather(t, reg)
@ -249,7 +276,6 @@ func TestWrite_Metrics(t *testing.T) {
"code": strconv.Itoa(http.StatusTeapot),
})
require.NotNil(t, errorCodes)
require.Equal(t, 3.0, errorCodes.Counter.GetValue())
},
},
{
@ -306,7 +332,8 @@ func TestWrite_Metrics(t *testing.T) {
reg.MustRegister(w.metrics.PrometheusCollectors()...)
tt.registerExpectations(t, configStore, testConfig)
require.NoError(t, w.Write(tt.data))
_, _, actualErr := w.Write(tt.data, 1)
require.Equal(t, tt.expectedErr, actualErr)
tt.checkMetrics(t, reg)
})
}

View File

@ -71,7 +71,7 @@ type BucketService interface {
}
type DurableQueueManager interface {
InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID) error
InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAge int64) error
DeleteQueue(replicationID platform.ID) error
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
@ -143,7 +143,7 @@ func (s *service) CreateReplication(ctx context.Context, request influxdb.Create
}
newID := s.idGenerator.ID()
if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes, request.OrgID, request.LocalBucketID); err != nil {
if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes, request.OrgID, request.LocalBucketID, request.MaxAgeSeconds); err != nil {
return nil, err
}
@ -404,6 +404,7 @@ func (s *service) Open(ctx context.Context) error {
for _, r := range trackedReplications.Replications {
trackedReplicationsMap[r.ID] = &influxdb.TrackedReplication{
MaxQueueSizeBytes: r.MaxQueueSizeBytes,
MaxAgeSeconds: r.MaxAgeSeconds,
OrgID: r.OrgID,
LocalBucketID: r.LocalBucketID,
}

View File

@ -227,7 +227,7 @@ func TestCreateReplication(t *testing.T) {
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.create.LocalBucketID).Return(nil, tt.bucketErr)
if tt.bucketErr == nil {
mocks.durableQueueManager.EXPECT().InitializeQueue(id1, tt.create.MaxQueueSizeBytes, tt.create.OrgID, tt.create.LocalBucketID).Return(tt.queueManagerErr)
mocks.durableQueueManager.EXPECT().InitializeQueue(id1, tt.create.MaxQueueSizeBytes, tt.create.OrgID, tt.create.LocalBucketID, tt.create.MaxAgeSeconds).Return(tt.queueManagerErr)
}
if tt.queueManagerErr == nil && tt.bucketErr == nil {
@ -817,11 +817,13 @@ func TestOpen(t *testing.T) {
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},
replication2.ID: {
MaxQueueSizeBytes: replication2.MaxQueueSizeBytes,
MaxAgeSeconds: replication2.MaxAgeSeconds,
OrgID: replication2.OrgID,
LocalBucketID: replication2.LocalBucketID,
},
@ -835,6 +837,7 @@ func TestOpen(t *testing.T) {
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},
@ -852,6 +855,7 @@ func TestOpen(t *testing.T) {
replicationsMap: map[platform.ID]*influxdb.TrackedReplication{
replication1.ID: {
MaxQueueSizeBytes: replication1.MaxQueueSizeBytes,
MaxAgeSeconds: replication1.MaxAgeSeconds,
OrgID: replication1.OrgID,
LocalBucketID: replication1.LocalBucketID,
},

View File

@ -8,6 +8,7 @@ CREATE TABLE replications
local_bucket_id VARCHAR(16) NOT NULL,
remote_bucket_id VARCHAR(16) NOT NULL,
max_queue_size_bytes INTEGER NOT NULL,
max_age_seconds INTEGER NOT NULL,
latest_response_code INTEGER,
latest_error_message TEXT,
drop_non_retryable_data BOOLEAN NOT NULL,