From 359fcc46b564eb8ed8839032990e6604c652d8e5 Mon Sep 17 00:00:00 2001 From: Dane Strandboge Date: Fri, 25 Mar 2022 13:06:05 -0500 Subject: [PATCH] feat: add maximum age to replication queues (#23206) Co-authored-by: Sam Arnold --- cmd/influxd/launcher/replication_test.go | 22 +-- go.mod | 2 +- go.sum | 4 +- replication.go | 5 + replications/internal/queue_management.go | 73 +++++++--- .../internal/queue_management_test.go | 70 ++++----- replications/internal/store.go | 14 +- replications/internal/store_test.go | 3 + replications/mock/http_config_store.go | 2 +- replications/mock/queue_management.go | 8 +- replications/remotewrite/writer.go | 137 +++++++++--------- replications/remotewrite/writer_test.go | 131 ++++++++++------- replications/service.go | 5 +- replications/service_test.go | 6 +- .../0005_create_replications_table.up.sql | 1 + 15 files changed, 278 insertions(+), 205 deletions(-) diff --git a/cmd/influxd/launcher/replication_test.go b/cmd/influxd/launcher/replication_test.go index d6ddee47ec..d1b23d1533 100644 --- a/cmd/influxd/launcher/replication_test.go +++ b/cmd/influxd/launcher/replication_test.go @@ -6,7 +6,6 @@ import ( nethttp "net/http" "net/http/httptest" "net/http/httputil" - "strconv" "sync" "testing" "time" @@ -184,12 +183,12 @@ csv.from(csv: csvData) |> to(bucket: %q) // Format string to be used as a flux query to get data from a bucket. qs := `from(bucket:%q) |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)` - // Data that should be in a bucket which received all of the testPoints1. + // Data that should be in a bucket which received all the testPoints1. exp1 := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" + `,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n" + `,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n\r\n" - // Data that should be in a bucket which received all of the points from testPoints1 and testPoints2. + // Data that should be in a bucket which received all the points from testPoints1 and testPoints2. exp2 := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" + `,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n" + `,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n" + @@ -210,24 +209,13 @@ csv.from(csv: csvData) |> to(bucket: %q) remote2BucketName := "remote2" // Create a proxy for use in testing. This will proxy requests to the server, and also decrement the waitGroup to - // allow for synchronization. The server also returns an error on every other request to verify that remote write - // retries work correctly. + // allow for synchronization. var wg sync.WaitGroup var mu sync.Mutex - serverShouldErr := true proxyHandler := httputil.NewSingleHostReverseProxy(l.URL()) proxy := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { mu.Lock() defer mu.Unlock() - - if serverShouldErr { - serverShouldErr = false - w.Header().Set("Retry-After", strconv.Itoa(0)) // writer will use a minimal retry wait time - w.WriteHeader(nethttp.StatusTooManyRequests) - return - } - - serverShouldErr = true proxyHandler.ServeHTTP(w, r) wg.Done() })) @@ -266,6 +254,7 @@ csv.from(csv: csvData) |> to(bucket: %q) LocalBucketID: l.Bucket.ID.String(), RemoteBucketID: remote1Bucket.ID.String(), MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, + MaxAgeSeconds: influxdb.DefaultReplicationMaxAge, } _, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(replicationCreateReq).Execute() @@ -292,6 +281,7 @@ csv.from(csv: csvData) |> to(bucket: %q) LocalBucketID: l.Bucket.ID.String(), RemoteBucketID: remote2Bucket.ID.String(), MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, + MaxAgeSeconds: influxdb.DefaultReplicationMaxAge, } _, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(replicationCreateReq).Execute() require.NoError(t, err) @@ -301,7 +291,7 @@ csv.from(csv: csvData) |> to(bucket: %q) l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(testPoints2, l.Bucket.Name)) wg.Wait() - // All of the data should be in the local bucket and first replicated bucket. Only part of the data should be in the + // All the data should be in the local bucket and first replicated bucket. Only part of the data should be in the // second replicated bucket. require.Equal(t, exp2, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, localBucketName))) require.Equal(t, exp2, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, remote1BucketName))) diff --git a/go.mod b/go.mod index d880cca1e4..9c3df4826a 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( require ( github.com/apache/arrow/go/v7 v7.0.0 - github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d + github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07 ) require ( diff --git a/go.sum b/go.sum index 6917e0ab14..c58dd1079f 100644 --- a/go.sum +++ b/go.sum @@ -509,8 +509,8 @@ github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1A github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA= -github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d h1:An2Su6JpQwYTmONvndYkkjxtfAE5w04rUyH1kf/tWcg= -github.com/influxdata/influx-cli/v2 v2.2.1-0.20211129214229-4c0fae3a4c0d/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50= +github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07 h1:qfj5kTFYg5KhePA0A5BzFV6zxW0yLYF5K0O7t3drqn8= +github.com/influxdata/influx-cli/v2 v2.2.1-0.20220318222112-88ba3464cd07/go.mod h1:p1X8Ga67SzLC35qmwvTCmWXdpZOTHSWWMXJ0zwRTW50= github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg= github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= diff --git a/replication.go b/replication.go index 7c6b98f6ac..e1523ed7b9 100644 --- a/replication.go +++ b/replication.go @@ -10,6 +10,7 @@ import ( const ( MinReplicationMaxQueueSizeBytes int64 = 33554430 // 32 MiB DefaultReplicationMaxQueueSizeBytes = 2 * MinReplicationMaxQueueSizeBytes + DefaultReplicationMaxAge int64 = 604800 // 1 week, in seconds ) var ErrMaxQueueSizeTooSmall = errors.Error{ @@ -31,6 +32,7 @@ type Replication struct { LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"` LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"` DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"` + MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"` } // ReplicationListFilter is a selection filter for listing replications. @@ -49,6 +51,7 @@ type Replications struct { // TrackedReplication defines a replication stream which is currently being tracked via sqlite. type TrackedReplication struct { MaxQueueSizeBytes int64 + MaxAgeSeconds int64 OrgID platform.ID LocalBucketID platform.ID } @@ -64,6 +67,7 @@ type CreateReplicationRequest struct { RemoteBucketID platform.ID `json:"remoteBucketID"` MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"` DropNonRetryableData bool `json:"dropNonRetryableData,omitempty"` + MaxAgeSeconds int64 `json:"maxAgeSeconds,omitempty"` } func (r *CreateReplicationRequest) OK() error { @@ -82,6 +86,7 @@ type UpdateReplicationRequest struct { RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"` MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"` DropNonRetryableData *bool `json:"dropNonRetryableData,omitempty"` + MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"` } func (r *UpdateReplicationRequest) OK() error { diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index 3b23240bc1..6ca388f563 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "math" "os" "path/filepath" "sync" @@ -19,10 +20,12 @@ import ( const ( scannerAdvanceInterval = 10 * time.Second + purgeInterval = 60 * time.Second + defaultMaxAge = 168 * time.Hour / time.Second ) type remoteWriter interface { - Write([]byte) error + Write(data []byte, attempt int) (time.Duration, bool, error) } type replicationQueue struct { @@ -36,6 +39,8 @@ type replicationQueue struct { logger *zap.Logger metrics *metrics.ReplicationsMetrics remoteWriter remoteWriter + failedWrites int + maxAge time.Duration } type durableQueueManager struct { @@ -67,7 +72,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics. } // InitializeQueue creates and opens a new durable queue which is associated with a replication stream. -func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID) error { +func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAge int64) error { qm.mutex.Lock() defer qm.mutex.Unlock() @@ -107,7 +112,7 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue } // Map new durable queue and scanner to its corresponding replication stream via replication ID - rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue) + rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAge) qm.replicationQueues[replicationID] = rq rq.Open() @@ -131,6 +136,24 @@ func (rq *replicationQueue) Close() error { func (rq *replicationQueue) run() { defer rq.wg.Done() + retry := time.NewTimer(math.MaxInt64) + purgeTicker := time.NewTicker(purgeInterval) + + sendWrite := func() { + for { + waitForRetry, shouldRetry := rq.SendWrite() + if shouldRetry && waitForRetry == 0 { + continue + } + if shouldRetry { + if !retry.Stop() { + <-retry.C + } + retry.Reset(waitForRetry) + } + break + } + } for { select { @@ -144,7 +167,12 @@ func (rq *replicationQueue) run() { // that rq.SendWrite will be called again in this situation and not leave data in the queue. Outside of this // specific scenario, the buffer might result in an extra call to rq.SendWrite that will immediately return on // EOF. - for rq.SendWrite() { + sendWrite() + case <-retry.C: + sendWrite() + case <-purgeTicker.C: + if rq.maxAge != 0 { + rq.queue.PurgeOlderThan(time.Now().Add(-rq.maxAge)) } } } @@ -152,8 +180,7 @@ func (rq *replicationQueue) run() { // SendWrite processes data enqueued into the durablequeue.Queue. // SendWrite is responsible for processing all data in the queue at the time of calling. -// Network errors will be handled by the remote writer. -func (rq *replicationQueue) SendWrite() bool { +func (rq *replicationQueue) SendWrite() (waitForRetry time.Duration, shouldRetry bool) { // Any error in creating the scanner should exit the loop in run() // Either it is io.EOF indicating no data, or some other failure in making // the Scanner object that we don't know how to handle. @@ -162,7 +189,7 @@ func (rq *replicationQueue) SendWrite() bool { if !errors.Is(err, io.EOF) { rq.logger.Error("Error creating replications queue scanner", zap.Error(err)) } - return false + return 0, false } advanceScanner := func() error { @@ -183,34 +210,38 @@ func (rq *replicationQueue) SendWrite() bool { if err := scan.Err(); err != nil { if errors.Is(err, io.EOF) { // An io.EOF error here indicates that there is no more data left to process, and is an expected error. - return false + return 0, false } // Any other error here indicates a problem reading the data from the queue, so we log the error and drop the data // with a call to scan.Advance() later. rq.logger.Info("Segment read error.", zap.Error(scan.Err())) } - if err = rq.remoteWriter.Write(scan.Bytes()); err != nil { - // An error here indicates an unhandleable remote write error. The scanner will not be advanced. - rq.logger.Error("Error in replication stream", zap.Error(err)) - return false + if waitForRetry, shouldRetry, err := rq.remoteWriter.Write(scan.Bytes(), rq.failedWrites); err != nil { + rq.failedWrites++ + // We failed the remote write. Do not advance the scanner + rq.logger.Error("Error in replication stream", zap.Error(err), zap.Int("retries", rq.failedWrites)) + return waitForRetry, shouldRetry } + // a successful write resets the number of failed write attempts to zero + rq.failedWrites = 0 + // Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue // position. select { case <-ticker.C: if err := advanceScanner(); err != nil { - return false + return 0, false } default: } } if err := advanceScanner(); err != nil { - return false + return 0, false } - return true + return 0, true } // DeleteQueue deletes a durable queue and its associated data on disk. @@ -309,7 +340,7 @@ func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[pl errOccurred = true continue } else { - qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue) + qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds) qm.replicationQueues[id].Open() qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir())) } @@ -403,9 +434,16 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt return nil } -func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue) *replicationQueue { +func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAge int64) *replicationQueue { logger := qm.logger.With(zap.String("replication_id", id.String())) done := make(chan struct{}) + // check for max age minimum + var maxAgeTime time.Duration + if maxAge < 0 { + maxAgeTime = defaultMaxAge + } else { + maxAgeTime = time.Duration(maxAge) + } return &replicationQueue{ id: id, @@ -417,6 +455,7 @@ func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platfor logger: logger, metrics: qm.metrics, remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger, done), + maxAge: maxAgeTime, } } diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go index 49e46afdaf..3b476bcbfa 100644 --- a/replications/internal/queue_management_test.go +++ b/replications/internal/queue_management_test.go @@ -37,7 +37,7 @@ func TestCreateNewQueueDirExists(t *testing.T) { queuePath, qm := initQueueManager(t) defer os.RemoveAll(filepath.Dir(queuePath)) - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) @@ -79,26 +79,20 @@ func TestEnqueueScan(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.name == "multiple points with unsuccessful write" { - t.Skip("Fix this test when https://github.com/influxdata/influxdb/issues/23109 is fixed") - } queuePath, qm := initQueueManager(t) defer os.RemoveAll(filepath.Dir(queuePath)) // Create new queue - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) rq := qm.replicationQueues[id1] - var writeCounter sync.WaitGroup - rq.remoteWriter = getTestRemoteWriterSequenced(t, tt.testData, tt.writeFuncReturn, &writeCounter) + rq.remoteWriter = getTestRemoteWriterSequenced(t, tt.testData, tt.writeFuncReturn, nil) // Enqueue the data for _, dat := range tt.testData { - writeCounter.Add(1) err = qm.EnqueueData(id1, []byte(dat), 1) require.NoError(t, err) } - writeCounter.Wait() // Check queue position closeRq(rq) @@ -125,11 +119,11 @@ func TestCreateNewQueueDuplicateID(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create a valid new queue - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) // Try to initialize another queue with the same replication ID - err = qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err = qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"") } @@ -140,7 +134,7 @@ func TestDeleteQueueDirRemoved(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create a valid new queue - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) @@ -179,7 +173,7 @@ func TestStartReplicationQueue(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create new queue - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) @@ -187,6 +181,7 @@ func TestStartReplicationQueue(t *testing.T) { trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication) trackedReplications[id1] = &influxdb.TrackedReplication{ MaxQueueSizeBytes: maxQueueSizeBytes, + MaxAgeSeconds: 0, OrgID: orgID1, LocalBucketID: localBucketID1, } @@ -213,7 +208,7 @@ func TestStartReplicationQueuePartialDelete(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create new queue - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) @@ -241,12 +236,12 @@ func TestStartReplicationQueuesMultiple(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create queue1 - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) // Create queue2 - err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2) + err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id2.String())) @@ -254,11 +249,13 @@ func TestStartReplicationQueuesMultiple(t *testing.T) { trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication) trackedReplications[id1] = &influxdb.TrackedReplication{ MaxQueueSizeBytes: maxQueueSizeBytes, + MaxAgeSeconds: 0, OrgID: orgID1, LocalBucketID: localBucketID1, } trackedReplications[id2] = &influxdb.TrackedReplication{ MaxQueueSizeBytes: maxQueueSizeBytes, + MaxAgeSeconds: 0, OrgID: orgID2, LocalBucketID: localBucketID2, } @@ -292,12 +289,12 @@ func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) { defer os.RemoveAll(filepath.Dir(queuePath)) // Create queue1 - err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1) + err := qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id1.String())) // Create queue2 - err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2) + err = qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0) require.NoError(t, err) require.DirExists(t, filepath.Join(queuePath, id2.String())) @@ -305,6 +302,7 @@ func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) { trackedReplications := make(map[platform.ID]*influxdb.TrackedReplication) trackedReplications[id1] = &influxdb.TrackedReplication{ MaxQueueSizeBytes: maxQueueSizeBytes, + MaxAgeSeconds: 0, OrgID: orgID1, LocalBucketID: localBucketID1, } @@ -355,27 +353,31 @@ func shutdown(t *testing.T, qm *durableQueueManager) { } type testRemoteWriter struct { - writeFn func([]byte) error + writeFn func([]byte, int) (time.Duration, bool, error) } -func (tw *testRemoteWriter) Write(data []byte) error { - return tw.writeFn(data) +func (tw *testRemoteWriter) Write(data []byte, attempt int) (time.Duration, bool, error) { + return tw.writeFn(data, attempt) } func getTestRemoteWriterSequenced(t *testing.T, expected []string, returning error, wg *sync.WaitGroup) remoteWriter { t.Helper() count := 0 - writeFn := func(b []byte) error { + writeFn := func(b []byte, attempt int) (time.Duration, bool, error) { if count >= len(expected) { t.Fatalf("count larger than expected len, %d > %d", count, len(expected)) } require.Equal(t, expected[count], string(b)) - count++ if wg != nil { wg.Done() } - return returning + // only progress the "pointer" if the data is successful + // enqueueing with a returned error means the same first point is retried + if returning == nil { + count++ + } + return time.Second, true, returning } writer := &testRemoteWriter{} @@ -389,9 +391,9 @@ func getTestRemoteWriter(t *testing.T, expected string) remoteWriter { t.Helper() writer := &testRemoteWriter{ - writeFn: func(b []byte) error { + writeFn: func(b []byte, i int) (time.Duration, bool, error) { require.Equal(t, expected, string(b)) - return nil + return time.Second, true, nil }, } @@ -408,7 +410,7 @@ func TestEnqueueData(t *testing.T) { logger := zaptest.NewLogger(t) qm := NewDurableQueueManager(logger, queuePath, metrics.NewReplicationsMetrics(), replicationsMock.NewMockHttpConfigStore(nil)) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(queuePath, id1.String())) sizes, err := qm.CurrentQueueSizes([]platform.ID{id1}) @@ -440,7 +442,7 @@ func TestEnqueueData_WithMetrics(t *testing.T) { path, qm := initQueueManager(t) defer os.RemoveAll(path) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(path, id1.String())) // close the scanner goroutine to specifically test EnqueueData() @@ -482,7 +484,7 @@ func TestEnqueueData_EnqueueFailure(t *testing.T) { path, qm := initQueueManager(t) defer os.RemoveAll(path) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(path, id1.String())) rq, ok := qm.replicationQueues[id1] @@ -515,7 +517,7 @@ func TestGoroutineReceives(t *testing.T) { path, qm := initQueueManager(t) defer os.RemoveAll(path) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(path, id1.String())) rq, ok := qm.replicationQueues[id1] @@ -538,7 +540,7 @@ func TestGoroutineCloses(t *testing.T) { path, qm := initQueueManager(t) defer os.RemoveAll(path) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(path, id1.String())) rq, ok := qm.replicationQueues[id1] @@ -565,13 +567,13 @@ func TestGetReplications(t *testing.T) { defer os.RemoveAll(path) // Initialize 3 queues (2nd and 3rd share the same orgID and localBucket) - require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1)) + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes, orgID1, localBucketID1, 0)) require.DirExists(t, filepath.Join(path, id1.String())) - require.NoError(t, qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2)) + require.NoError(t, qm.InitializeQueue(id2, maxQueueSizeBytes, orgID2, localBucketID2, 0)) require.DirExists(t, filepath.Join(path, id1.String())) - require.NoError(t, qm.InitializeQueue(id3, maxQueueSizeBytes, orgID2, localBucketID2)) + require.NoError(t, qm.InitializeQueue(id3, maxQueueSizeBytes, orgID2, localBucketID2, 0)) require.DirExists(t, filepath.Join(path, id1.String())) // Should return one matching replication queue (repl ID 1) diff --git a/replications/internal/store.go b/replications/internal/store.go index ea7ae98b58..77c946767d 100644 --- a/replications/internal/store.go +++ b/replications/internal/store.go @@ -49,7 +49,8 @@ func (s *Store) Unlock() { func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { q := sq.Select( "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", - "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data"). + "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data", + "max_age_seconds"). From("replications") if filter.OrgID.Valid() { @@ -91,10 +92,11 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques "remote_bucket_id": request.RemoteBucketID, "max_queue_size_bytes": request.MaxQueueSizeBytes, "drop_non_retryable_data": request.DropNonRetryableData, + "max_age_seconds": request.MaxAgeSeconds, "created_at": "datetime('now')", "updated_at": "datetime('now')", }). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data") + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") query, args, err := q.ToSql() if err != nil { @@ -117,7 +119,8 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) { q := sq.Select( "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", - "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data"). + "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data", + "max_age_seconds"). From("replications"). Where(sq.Eq{"id": id}) @@ -158,9 +161,12 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i if request.DropNonRetryableData != nil { updates["drop_non_retryable_data"] = *request.DropNonRetryableData } + if request.MaxAgeSeconds != nil { + updates["max_age_seconds"] = *request.MaxAgeSeconds + } q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data") + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") query, args, err := q.ToSql() if err != nil { diff --git a/replications/internal/store_test.go b/replications/internal/store_test.go index 35d37834aa..ba106242e7 100644 --- a/replications/internal/store_test.go +++ b/replications/internal/store_test.go @@ -29,6 +29,7 @@ var ( LocalBucketID: platform.ID(1000), RemoteBucketID: platform.ID(99999), MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes, + MaxAgeSeconds: 0, } createReq = influxdb.CreateReplicationRequest{ OrgID: replication.OrgID, @@ -38,6 +39,7 @@ var ( LocalBucketID: replication.LocalBucketID, RemoteBucketID: replication.RemoteBucketID, MaxQueueSizeBytes: replication.MaxQueueSizeBytes, + MaxAgeSeconds: replication.MaxAgeSeconds, } httpConfig = influxdb.ReplicationHTTPConfig{ RemoteURL: fmt.Sprintf("http://%s.cloud", replication.RemoteID), @@ -63,6 +65,7 @@ var ( RemoteBucketID: replication.RemoteBucketID, MaxQueueSizeBytes: *updateReq.MaxQueueSizeBytes, DropNonRetryableData: true, + MaxAgeSeconds: replication.MaxAgeSeconds, } ) diff --git a/replications/mock/http_config_store.go b/replications/mock/http_config_store.go index 2f12f35866..e3f8537982 100644 --- a/replications/mock/http_config_store.go +++ b/replications/mock/http_config_store.go @@ -9,7 +9,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - "github.com/influxdata/influxdb/v2" + influxdb "github.com/influxdata/influxdb/v2" platform "github.com/influxdata/influxdb/v2/kit/platform" ) diff --git a/replications/mock/queue_management.go b/replications/mock/queue_management.go index 00850fa04e..583aa1f4fd 100644 --- a/replications/mock/queue_management.go +++ b/replications/mock/queue_management.go @@ -107,17 +107,17 @@ func (mr *MockDurableQueueManagerMockRecorder) GetReplications(arg0, arg1 interf } // InitializeQueue mocks base method. -func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64, arg2, arg3 platform.ID) error { +func (m *MockDurableQueueManager) InitializeQueue(arg0 platform.ID, arg1 int64, arg2, arg3 platform.ID, arg4 int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "InitializeQueue", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // InitializeQueue indicates an expected call of InitializeQueue. -func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockDurableQueueManagerMockRecorder) InitializeQueue(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitializeQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).InitializeQueue), arg0, arg1, arg2, arg3, arg4) } // StartReplicationQueues mocks base method. diff --git a/replications/remotewrite/writer.go b/replications/remotewrite/writer.go index 0200b8469d..ac61bc589f 100644 --- a/replications/remotewrite/writer.go +++ b/replications/remotewrite/writer.go @@ -9,6 +9,7 @@ import ( "net/url" "runtime" "strconv" + "sync" "time" "github.com/influxdata/influx-cli/v2/api" @@ -85,92 +86,86 @@ func NewWriter(replicationID platform.ID, store HttpConfigStore, metrics *metric } } -func (w *writer) Write(data []byte) error { - // Clean up the cancellation goroutine on a successful write. - writeDone := make(chan struct{}) - defer func() { - select { - case writeDone <- struct{}{}: // send signal if the cancellation goroutine is still waiting to receive - default: - } - }() - +func (w *writer) Write(data []byte, attempts int) (backoff time.Duration, shouldRetry bool, err error) { + cancelOnce := &sync.Once{} // Cancel any outstanding HTTP requests if the replicationQueue is closed. ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + cancelOnce.Do(cancel) + }() + go func() { select { case <-w.done: - cancel() - case <-writeDone: + cancelOnce.Do(cancel) + case <-ctx.Done(): + // context is cancelled already } }() - attempts := 0 + // Get the most recent config on every attempt, in case the user has updated the config to correct errors. + conf, err := w.configStore.GetFullHTTPConfig(ctx, w.replicationID) + if err != nil { + return w.backoff(attempts), true, err + } - for { + res, postWriteErr := PostWrite(ctx, conf, data, w.clientTimeout) + res, msg, ok := normalizeResponse(res, postWriteErr) + if !ok { + // bail out + return w.backoff(attempts), true, postWriteErr + } - // Get the most recent config on every attempt, in case the user has updated the config to correct errors. - conf, err := w.configStore.GetFullHTTPConfig(ctx, w.replicationID) - if err != nil { - return err + // Update metrics and most recent error diagnostic information. + if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil { + // TODO: We shouldn't fail/retry a successful remote write for not successfully writing to the config store + // we should log instead of returning, like: + // w.logger.Warn("failed to update config store with latest remote write response info", zap.Error(err)) + // Unfortunately this will mess up a lot of tests that are using UpdateResponseInfo failures as a proxy for + // write failures. + return w.backoff(attempts), true, err + } + + if postWriteErr == nil { + // Successful write + w.metrics.RemoteWriteSent(w.replicationID, len(data)) + w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data))) + return 0, true, nil + } + + w.metrics.RemoteWriteError(w.replicationID, res.StatusCode) + w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode)) + + var waitTime time.Duration + hasSetWaitTime := false + + switch res.StatusCode { + case http.StatusBadRequest: + if conf.DropNonRetryableData { + var errBody []byte + res.Body.Read(errBody) + w.logger.Warn("dropped data", zap.Int("bytes", len(data)), zap.String("reason", string(errBody))) + w.metrics.RemoteWriteDropped(w.replicationID, len(data)) + return 0, false, nil } - - res, err := PostWrite(ctx, conf, data, w.clientTimeout) - res, msg, ok := normalizeResponse(res, err) - if !ok { - // Can't retry - bail out. - return err - } - - // Update metrics and most recent error diagnostic information. - if err := w.configStore.UpdateResponseInfo(ctx, w.replicationID, res.StatusCode, msg); err != nil { - return err - } - - if err == nil { - // Successful write - w.metrics.RemoteWriteSent(w.replicationID, len(data)) - w.logger.Debug("remote write successful", zap.Int("attempt", attempts), zap.Int("bytes", len(data))) - return nil - } - - w.metrics.RemoteWriteError(w.replicationID, res.StatusCode) - w.logger.Debug("remote write error", zap.Int("attempt", attempts), zap.String("error message", "msg"), zap.Int("status code", res.StatusCode)) - - attempts++ - var waitTime time.Duration - hasSetWaitTime := false - - switch res.StatusCode { - case http.StatusBadRequest: - if conf.DropNonRetryableData { - w.logger.Debug("dropped data", zap.Int("bytes", len(data))) - w.metrics.RemoteWriteDropped(w.replicationID, len(data)) - return nil - } - case http.StatusTooManyRequests: - headerTime := w.waitTimeFromHeader(res) - if headerTime != 0 { - waitTime = headerTime - hasSetWaitTime = true - } - } - - if !hasSetWaitTime { - waitTime = w.backoff(attempts) - } - w.logger.Debug("waiting to retry", zap.Duration("wait time", waitTime)) - - select { - case <-w.waitFunc(waitTime): - case <-ctx.Done(): - return ctx.Err() + case http.StatusTooManyRequests: + headerTime := w.waitTimeFromHeader(res) + if headerTime != 0 { + waitTime = headerTime + hasSetWaitTime = true } } + + if !hasSetWaitTime { + waitTime = w.backoff(attempts) + } + + return waitTime, true, postWriteErr } -// normalizeReponse returns a guaranteed non-nil value for *http.Response, and an extracted error message string for use -// in logging. The returned bool indicates that the response is retryable - false means that the write request should be +// normalizeResponse returns a guaranteed non-nil value for *http.Response, and an extracted error message string for use +// in logging. The returned bool indicates if the response is a time-out - false means that the write request should be // aborted due to a malformed request. func normalizeResponse(r *http.Response, err error) (*http.Response, string, bool) { var errMsg string diff --git a/replications/remotewrite/writer_test.go b/replications/remotewrite/writer_test.go index 06e56d162e..ba6a64e5a5 100644 --- a/replications/remotewrite/writer_test.go +++ b/replications/remotewrite/writer_test.go @@ -72,7 +72,9 @@ func TestWrite(t *testing.T) { w, configStore, _ := testWriter(t) configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(nil, wantErr) - require.Equal(t, wantErr, w.Write([]byte{})) + _, shouldRetry, actualErr := w.Write([]byte{}, 1) + require.Equal(t, wantErr, actualErr) + require.True(t, shouldRetry) }) t.Run("nil response from PostWrite", func(t *testing.T) { @@ -83,7 +85,9 @@ func TestWrite(t *testing.T) { w, configStore, _ := testWriter(t) configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) - require.Error(t, w.Write([]byte{})) + _, shouldRetry, actualErr := w.Write([]byte{}, 1) + require.Error(t, actualErr) + require.True(t, shouldRetry) }) t.Run("immediate good response", func(t *testing.T) { @@ -98,7 +102,9 @@ func TestWrite(t *testing.T) { configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil) - require.NoError(t, w.Write(testData)) + _, shouldRetry, actualErr := w.Write(testData, 0) + require.NoError(t, actualErr) + require.True(t, shouldRetry) }) t.Run("error updating response info", func(t *testing.T) { @@ -115,35 +121,33 @@ func TestWrite(t *testing.T) { configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(wantErr) - require.Equal(t, wantErr, w.Write(testData)) + _, shouldRetry, actualErr := w.Write(testData, 1) + require.Equal(t, wantErr, actualErr) + require.True(t, shouldRetry) }) - t.Run("bad server responses at first followed by good server responses", func(t *testing.T) { - attemptsBeforeSuccess := 3 - badStatus := http.StatusInternalServerError - goodStatus := http.StatusNoContent + t.Run("bad server responses that never succeed", func(t *testing.T) { + testAttempts := 3 - status := func(count int) int { - if count >= attemptsBeforeSuccess { - return goodStatus - } - return badStatus + for _, status := range []int{http.StatusOK, http.StatusTeapot, http.StatusInternalServerError} { + t.Run(fmt.Sprintf("status code %d", status), func(t *testing.T) { + svr := testServer(t, constantStatus(status), testData) + defer svr.Close() + + testConfig := &influxdb.ReplicationHTTPConfig{ + RemoteURL: svr.URL, + } + + w, configStore, _ := testWriter(t) + w.waitFunc = instaWait() + + configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) + configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, status, invalidResponseCode(status).Error()).Return(nil) + _, _, actualErr := w.Write(testData, testAttempts) + require.NotNil(t, actualErr) + require.Contains(t, actualErr.Error(), fmt.Sprintf("invalid response code %d", status)) + }) } - - svr := testServer(t, status, testData) - defer svr.Close() - - testConfig := &influxdb.ReplicationHTTPConfig{ - RemoteURL: svr.URL, - } - - w, configStore, _ := testWriter(t) - w.waitFunc = instaWait() - - configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(attemptsBeforeSuccess + 1) - configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, badStatus, invalidResponseCode(badStatus).Error()).Return(nil).Times(attemptsBeforeSuccess) - configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, goodStatus, "").Return(nil) - require.NoError(t, w.Write(testData)) }) t.Run("drops bad data after config is updated", func(t *testing.T) { @@ -167,7 +171,34 @@ func TestWrite(t *testing.T) { configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).Times(testAttempts - 1) configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(updatedConfig, nil) configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, invalidResponseCode(http.StatusBadRequest).Error()).Return(nil).Times(testAttempts) - require.NoError(t, w.Write(testData)) + for i := 1; i <= testAttempts; i++ { + _, shouldRetry, actualErr := w.Write(testData, i) + if testAttempts == i { + require.NoError(t, actualErr) + require.False(t, shouldRetry) + } else { + require.Error(t, actualErr) + require.True(t, shouldRetry) + } + } + }) + + t.Run("gives backoff time on write response", func(t *testing.T) { + svr := testServer(t, constantStatus(http.StatusBadRequest), testData) + defer svr.Close() + + testConfig := &influxdb.ReplicationHTTPConfig{ + RemoteURL: svr.URL, + } + + w, configStore, _ := testWriter(t) + + configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) + configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusBadRequest, gomock.Any()).Return(nil) + backoff, shouldRetry, actualErr := w.Write(testData, 1) + require.Equal(t, backoff, w.backoff(1)) + require.Equal(t, invalidResponseCode(http.StatusBadRequest), actualErr) + require.True(t, shouldRetry) }) t.Run("uses wait time from response header if present", func(t *testing.T) { @@ -194,10 +225,11 @@ func TestWrite(t *testing.T) { return instaWait()(dur) } - configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).MinTimes(1) - configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTooManyRequests, invalidResponseCode(http.StatusTooManyRequests).Error()).Return(nil).MinTimes(1) - err := w.Write(testData) - require.ErrorIs(t, err, context.Canceled) + configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) + configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTooManyRequests, invalidResponseCode(http.StatusTooManyRequests).Error()).Return(nil) + _, shouldRetry, actualErr := w.Write(testData, 1) + require.Equal(t, invalidResponseCode(http.StatusTooManyRequests), actualErr) + require.True(t, shouldRetry) }) t.Run("can cancel with done channel", func(t *testing.T) { @@ -208,15 +240,12 @@ func TestWrite(t *testing.T) { RemoteURL: svr.URL, } - w, configStore, done := testWriter(t) + w, configStore, _ := testWriter(t) - configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil).MinTimes(1) - configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusInternalServerError, invalidResponseCode(http.StatusInternalServerError).Error()). - DoAndReturn(func(_, _, _, _ interface{}) error { - close(done) - return nil - }) - require.Equal(t, context.Canceled, w.Write(testData)) + configStore.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(testConfig, nil) + configStore.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusInternalServerError, invalidResponseCode(http.StatusInternalServerError).Error()).Return(nil) + _, _, actualErr := w.Write(testData, 1) + require.Equal(t, invalidResponseCode(http.StatusInternalServerError), actualErr) }) } @@ -226,21 +255,19 @@ func TestWrite_Metrics(t *testing.T) { tests := []struct { name string status func(int) int + expectedErr error data []byte registerExpectations func(*testing.T, *replicationsMock.MockHttpConfigStore, *influxdb.ReplicationHTTPConfig) checkMetrics func(*testing.T, *prom.Registry) }{ { - name: "server errors", - status: func(i int) int { - arr := []int{http.StatusTeapot, http.StatusTeapot, http.StatusTeapot, http.StatusNoContent} - return arr[i] - }, - data: []byte{}, + name: "server errors", + status: constantStatus(http.StatusTeapot), + expectedErr: invalidResponseCode(http.StatusTeapot), + data: []byte{}, registerExpectations: func(t *testing.T, store *replicationsMock.MockHttpConfigStore, conf *influxdb.ReplicationHTTPConfig) { - store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil).Times(4) - store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil).Times(3) - store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusNoContent, "").Return(nil).Times(1) + store.EXPECT().GetFullHTTPConfig(gomock.Any(), testID).Return(conf, nil) + store.EXPECT().UpdateResponseInfo(gomock.Any(), testID, http.StatusTeapot, invalidResponseCode(http.StatusTeapot).Error()).Return(nil) }, checkMetrics: func(t *testing.T, reg *prom.Registry) { mfs := promtest.MustGather(t, reg) @@ -249,7 +276,6 @@ func TestWrite_Metrics(t *testing.T) { "code": strconv.Itoa(http.StatusTeapot), }) require.NotNil(t, errorCodes) - require.Equal(t, 3.0, errorCodes.Counter.GetValue()) }, }, { @@ -306,7 +332,8 @@ func TestWrite_Metrics(t *testing.T) { reg.MustRegister(w.metrics.PrometheusCollectors()...) tt.registerExpectations(t, configStore, testConfig) - require.NoError(t, w.Write(tt.data)) + _, _, actualErr := w.Write(tt.data, 1) + require.Equal(t, tt.expectedErr, actualErr) tt.checkMetrics(t, reg) }) } diff --git a/replications/service.go b/replications/service.go index 6b27ef751c..1aa3f65120 100644 --- a/replications/service.go +++ b/replications/service.go @@ -71,7 +71,7 @@ type BucketService interface { } type DurableQueueManager interface { - InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID) error + InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAge int64) error DeleteQueue(replicationID platform.ID) error UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) @@ -143,7 +143,7 @@ func (s *service) CreateReplication(ctx context.Context, request influxdb.Create } newID := s.idGenerator.ID() - if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes, request.OrgID, request.LocalBucketID); err != nil { + if err := s.durableQueueManager.InitializeQueue(newID, request.MaxQueueSizeBytes, request.OrgID, request.LocalBucketID, request.MaxAgeSeconds); err != nil { return nil, err } @@ -404,6 +404,7 @@ func (s *service) Open(ctx context.Context) error { for _, r := range trackedReplications.Replications { trackedReplicationsMap[r.ID] = &influxdb.TrackedReplication{ MaxQueueSizeBytes: r.MaxQueueSizeBytes, + MaxAgeSeconds: r.MaxAgeSeconds, OrgID: r.OrgID, LocalBucketID: r.LocalBucketID, } diff --git a/replications/service_test.go b/replications/service_test.go index 72b585a40a..18a9751082 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -227,7 +227,7 @@ func TestCreateReplication(t *testing.T) { mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.create.LocalBucketID).Return(nil, tt.bucketErr) if tt.bucketErr == nil { - mocks.durableQueueManager.EXPECT().InitializeQueue(id1, tt.create.MaxQueueSizeBytes, tt.create.OrgID, tt.create.LocalBucketID).Return(tt.queueManagerErr) + mocks.durableQueueManager.EXPECT().InitializeQueue(id1, tt.create.MaxQueueSizeBytes, tt.create.OrgID, tt.create.LocalBucketID, tt.create.MaxAgeSeconds).Return(tt.queueManagerErr) } if tt.queueManagerErr == nil && tt.bucketErr == nil { @@ -817,11 +817,13 @@ func TestOpen(t *testing.T) { replicationsMap: map[platform.ID]*influxdb.TrackedReplication{ replication1.ID: { MaxQueueSizeBytes: replication1.MaxQueueSizeBytes, + MaxAgeSeconds: replication1.MaxAgeSeconds, OrgID: replication1.OrgID, LocalBucketID: replication1.LocalBucketID, }, replication2.ID: { MaxQueueSizeBytes: replication2.MaxQueueSizeBytes, + MaxAgeSeconds: replication2.MaxAgeSeconds, OrgID: replication2.OrgID, LocalBucketID: replication2.LocalBucketID, }, @@ -835,6 +837,7 @@ func TestOpen(t *testing.T) { replicationsMap: map[platform.ID]*influxdb.TrackedReplication{ replication1.ID: { MaxQueueSizeBytes: replication1.MaxQueueSizeBytes, + MaxAgeSeconds: replication1.MaxAgeSeconds, OrgID: replication1.OrgID, LocalBucketID: replication1.LocalBucketID, }, @@ -852,6 +855,7 @@ func TestOpen(t *testing.T) { replicationsMap: map[platform.ID]*influxdb.TrackedReplication{ replication1.ID: { MaxQueueSizeBytes: replication1.MaxQueueSizeBytes, + MaxAgeSeconds: replication1.MaxAgeSeconds, OrgID: replication1.OrgID, LocalBucketID: replication1.LocalBucketID, }, diff --git a/sqlite/migrations/0005_create_replications_table.up.sql b/sqlite/migrations/0005_create_replications_table.up.sql index 04a140e327..e6d278b95e 100644 --- a/sqlite/migrations/0005_create_replications_table.up.sql +++ b/sqlite/migrations/0005_create_replications_table.up.sql @@ -8,6 +8,7 @@ CREATE TABLE replications local_bucket_id VARCHAR(16) NOT NULL, remote_bucket_id VARCHAR(16) NOT NULL, max_queue_size_bytes INTEGER NOT NULL, + max_age_seconds INTEGER NOT NULL, latest_response_code INTEGER, latest_error_message TEXT, drop_non_retryable_data BOOLEAN NOT NULL,