feat: replication remote writes do not block local writes (#22956)
* feat: replication remote writes do not block local writespull/22958/head
parent
f05d0136f1
commit
3460f1cc52
|
@ -8,6 +8,7 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
|
@ -308,3 +309,74 @@ func TestReplicationStreamEndToEnd(t *testing.T) {
|
|||
require.Equal(t, exp2, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, remote1BucketName)))
|
||||
require.Equal(t, exp3, l.FluxQueryOrFail(t, l.Org, l.Auth.Token, fmt.Sprintf(qs, remote2BucketName)))
|
||||
}
|
||||
|
||||
func TestReplicationsLocalWriteBlocking(t *testing.T) {
|
||||
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
|
||||
o.FeatureFlags = map[string]string{feature.ReplicationStreamBackend().Key(): "true"}
|
||||
})
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
client := l.APIClient(t)
|
||||
|
||||
// Server that only returns an error will cause the remote write to retry on loop.
|
||||
svr := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
|
||||
w.WriteHeader(nethttp.StatusInternalServerError)
|
||||
}))
|
||||
defer svr.Close()
|
||||
|
||||
// Create a "remote" connection to the blocking server
|
||||
remote, err := client.RemoteConnectionsApi.PostRemoteConnection(ctx).
|
||||
RemoteConnectionCreationRequest(api.RemoteConnectionCreationRequest{
|
||||
Name: "self",
|
||||
OrgID: l.Org.ID.String(),
|
||||
RemoteURL: svr.URL,
|
||||
RemoteAPIToken: "foo",
|
||||
RemoteOrgID: l.Org.ID.String(),
|
||||
AllowInsecureTLS: false,
|
||||
}).Execute()
|
||||
require.NoError(t, err)
|
||||
|
||||
createReq := api.ReplicationCreationRequest{
|
||||
Name: "test",
|
||||
OrgID: l.Org.ID.String(),
|
||||
RemoteID: remote.Id,
|
||||
LocalBucketID: l.Bucket.ID.String(),
|
||||
RemoteBucketID: l.Bucket.ID.String(),
|
||||
MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes,
|
||||
}
|
||||
|
||||
// Create the replication
|
||||
_, err = client.ReplicationsApi.PostReplication(ctx).ReplicationCreationRequest(createReq).Execute()
|
||||
require.NoError(t, err)
|
||||
|
||||
p := `m,k=v1 f=100i 946684800000000000`
|
||||
|
||||
// Do a write; the remote writer will block forever
|
||||
l.WritePointsOrFail(t, p)
|
||||
|
||||
// Do some more writes; these should not be blocked locally, although the remote writer will be.
|
||||
var wg sync.WaitGroup
|
||||
for idx := 0; idx < 3; idx++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
l.WritePointsOrFail(t, p)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
writesAreDone := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
// If local writes don't block, this will quickly send on the writesAreDone channel to prevent the test from timing
|
||||
// out.
|
||||
writesAreDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// Test timeout
|
||||
delay := 5 * time.Second
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
t.Fatalf("test timed out after %s - writing was blocked by remote writer", delay)
|
||||
case <-writesAreDone:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,6 +130,13 @@ func (rq *replicationQueue) run() {
|
|||
case <-rq.done: // end the goroutine when done is messaged
|
||||
return
|
||||
case <-rq.receive: // run the scanner on data append
|
||||
// Receive channel has a buffer to prevent a potential race condition where rq.SendWrite has reached EOF and will
|
||||
// return false, but data is queued after evaluating the scanner and before the loop is ready to select on the
|
||||
// receive channel again. This would result in data remaining unprocessed in the queue until the next send on the
|
||||
// receive channel since the send to the receive channel in qm.EnqueueData is non-blocking. The buffer ensures
|
||||
// that rq.SendWrite will be called again in this situation and not leave data in the queue. Outside of this
|
||||
// specific scenario, the buffer might result in an extra call to rq.SendWrite that will immediately return on
|
||||
// EOF.
|
||||
for rq.SendWrite() {
|
||||
}
|
||||
}
|
||||
|
@ -138,8 +145,7 @@ func (rq *replicationQueue) run() {
|
|||
|
||||
// SendWrite processes data enqueued into the durablequeue.Queue.
|
||||
// SendWrite is responsible for processing all data in the queue at the time of calling.
|
||||
// Retryable errors should be handled and retried in the dp function.
|
||||
// Unprocessable data should be dropped in the dp function.
|
||||
// Network errors will be handled by the remote writer.
|
||||
func (rq *replicationQueue) SendWrite() bool {
|
||||
// Any error in creating the scanner should exit the loop in run()
|
||||
// Either it is io.EOF indicating no data, or some other failure in making
|
||||
|
@ -163,12 +169,12 @@ func (rq *replicationQueue) SendWrite() bool {
|
|||
// drop the data with a call to scan.Advance() later.
|
||||
if scan.Err() != nil {
|
||||
rq.logger.Info("Segment read error.", zap.Error(scan.Err()))
|
||||
// TODO: Add metrics collection for dropped data here.
|
||||
break
|
||||
}
|
||||
|
||||
// An error here indicates an unhandlable error. Data is not corrupt, and
|
||||
// the remote write is not retryable. A potential example of an error here
|
||||
// is an authentication error with the remote host.
|
||||
// the remote write is not retryable.
|
||||
// TODO: Propagate context if needed to allow for graceful shutdowns, see https://github.com/influxdata/influxdb/issues/22944
|
||||
if err = rq.remoteWriter.Write(context.Background(), scan.Bytes()); err != nil {
|
||||
rq.logger.Error("Error in replication stream", zap.Error(err))
|
||||
|
@ -363,7 +369,12 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt
|
|||
// Update metrics for this replication queue when adding data to the queue.
|
||||
qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes())
|
||||
|
||||
qm.replicationQueues[replicationID].receive <- struct{}{}
|
||||
// Send to the replication receive channel if it is not full to activate the queue processing. If the receive channel
|
||||
// is full, don't block further writes and return.
|
||||
select {
|
||||
case qm.replicationQueues[replicationID].receive <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -375,7 +386,7 @@ func (qm *durableQueueManager) newReplicationQueue(id platform.ID, queue *durabl
|
|||
id: id,
|
||||
queue: queue,
|
||||
done: make(chan struct{}),
|
||||
receive: make(chan struct{}),
|
||||
receive: make(chan struct{}, 1),
|
||||
logger: logger,
|
||||
metrics: qm.metrics,
|
||||
remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger),
|
||||
|
|
Loading…
Reference in New Issue