From 0e5b14fa5e406d004018f9659e064e9b4ab1777a Mon Sep 17 00:00:00 2001 From: William Baker Date: Mon, 13 Dec 2021 11:02:38 -0600 Subject: [PATCH] chore: increase replications batch size limits (#22983) --- replications/service.go | 22 +++++-- replications/service_test.go | 116 +++++++++++++++++++++-------------- 2 files changed, 88 insertions(+), 50 deletions(-) diff --git a/replications/service.go b/replications/service.go index af669031f1..5823185d0a 100644 --- a/replications/service.go +++ b/replications/service.go @@ -21,9 +21,14 @@ import ( "golang.org/x/sync/errgroup" ) -// This is the same batch size limit used by the influx write command -// https://github.com/influxdata/influx-cli/blob/a408c02bd462946ac6ebdedf6f62f5e3d81c1f6f/clients/write/buffer_batcher.go#L14 -const maxRemoteWriteBatchSize = 500000 +// InfluxDB docs suggest a batch size of 5000 lines for optimal write performance. +// https://docs.influxdata.com/influxdb/v2.1/write-data/best-practices/optimize-writes/ +const maxRemoteWritePointSize = 5000 + +// Uncompressed size (bytes) is used as a secondary limit to prevent network issues and stay below cloud maximum payload +// limitations. 2.5 MB is about 50% of the limit on a basic cloud plan. +// https://docs.influxdata.com/influxdb/cloud/account-management/pricing-plans/#data-limits +const maxRemoteWriteBatchSize = 2500000 func errLocalBucketNotFound(id platform.ID, cause error) error { return &ierrors.Error{ @@ -51,6 +56,7 @@ func NewService(sqlStore *sqlite.SqlStore, bktSvc BucketService, localWriter sto store, ), maxRemoteWriteBatchSize: maxRemoteWriteBatchSize, + maxRemoteWritePointSize: maxRemoteWritePointSize, }, metrs } @@ -96,6 +102,7 @@ type service struct { localWriter storage.PointsWriter log *zap.Logger maxRemoteWriteBatchSize int + maxRemoteWritePointSize int } func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { @@ -337,9 +344,9 @@ func (s service) WritePoints(ctx context.Context, orgID platform.ID, bucketID pl gzw := gzip.NewWriter(batches[0].data) // Iterate through points and compress in batches - for _, p := range points { + for count, p := range points { // If current point will cause this batch to exceed max size, start a new batch for it first - if currentBatchSize+p.StringSize() > s.maxRemoteWriteBatchSize { + if s.startNewBatch(currentBatchSize, p.StringSize(), count) { batches = append(batches, &batch{ data: &bytes.Buffer{}, numPoints: 0, @@ -416,3 +423,8 @@ func (s service) Close() error { } return nil } + +func (s service) startNewBatch(currentSize, nextSize, pointCount int) bool { + return currentSize+nextSize > s.maxRemoteWriteBatchSize || + pointCount > 0 && pointCount%s.maxRemoteWritePointSize == 0 +} diff --git a/replications/service_test.go b/replications/service_test.go index d3bf988147..b073f068aa 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -689,23 +689,46 @@ disk,host=C value=1.3 1000000000`) func TestWritePointsBatches(t *testing.T) { t.Parallel() - svc, mocks := newTestService(t) - - // Set batch size to smaller size for testing (should result in 3 batches sized 93, 93, and 63 - total size 249) - svc.maxRemoteWriteBatchSize = 100 - - // Define metadata for two replications - list := &influxdb.Replications{ - Replications: []influxdb.Replication{replication1, replication2}, + tests := []struct { + name string + setupFn func(*testing.T, *service) + }{ + { + name: "batch bytes size", + setupFn: func(t *testing.T, svc *service) { + t.Helper() + // Set batch size to smaller size for testing (should result in 3 batches sized 93, 93, and 63 - total size 249) + svc.maxRemoteWriteBatchSize = 100 + }, + }, + { + name: "batch point size", + setupFn: func(t *testing.T, svc *service) { + t.Helper() + // Set point size to smaller size for testing (should result in 3 batches with 3 points, 3 points, and 2 points) + svc.maxRemoteWritePointSize = 3 + }, + }, } - mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), influxdb.ReplicationListFilter{ - OrgID: orgID, - LocalBucketID: &id1, - }).Return(list, nil) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + svc, mocks := newTestService(t) - // Define some points of line protocol, parse string --> []Point - points, err := models.ParsePointsString(` + tt.setupFn(t, svc) + + // Define metadata for two replications + list := &influxdb.Replications{ + Replications: []influxdb.Replication{replication1, replication2}, + } + + mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), influxdb.ReplicationListFilter{ + OrgID: orgID, + LocalBucketID: &id1, + }).Return(list, nil) + + // Define some points of line protocol, parse string --> []Point + points, err := models.ParsePointsString(` cpu,host=0 value=1.1 6000000000 cpu,host=A value=1.2 2000000000 cpu,host=A value=1.3 3000000000 @@ -714,42 +737,44 @@ cpu,host=B value=1.3 5000000000 cpu,host=C value=1.3 1000000000 mem,host=C value=1.3 1000000000 disk,host=C value=1.3 1000000000`) - require.NoError(t, err) + require.NoError(t, err) - // Points should successfully write to local TSM. - mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil) + // Points should successfully write to local TSM. + mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil) - // Points should successfully be enqueued in the 2 replications associated with the local bucket. - for _, id := range []platform.ID{replication1.ID, replication2.ID} { - // Check batch 1 - mocks.durableQueueManager.EXPECT(). - EnqueueData(id, gomock.Any(), 3). - DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { - require.Equal(t, 3, numPoints) - checkCompressedData(t, data, points[:3]) - return nil - }) + // Points should successfully be enqueued in the 2 replications associated with the local bucket. + for _, id := range []platform.ID{replication1.ID, replication2.ID} { + // Check batch 1 + mocks.durableQueueManager.EXPECT(). + EnqueueData(id, gomock.Any(), 3). + DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { + require.Equal(t, 3, numPoints) + checkCompressedData(t, data, points[:3]) + return nil + }) - // Check batch 2 - mocks.durableQueueManager.EXPECT(). - EnqueueData(id, gomock.Any(), 3). - DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { - require.Equal(t, 3, numPoints) - checkCompressedData(t, data, points[3:6]) - return nil - }) + // Check batch 2 + mocks.durableQueueManager.EXPECT(). + EnqueueData(id, gomock.Any(), 3). + DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { + require.Equal(t, 3, numPoints) + checkCompressedData(t, data, points[3:6]) + return nil + }) - // Check batch 3 - mocks.durableQueueManager.EXPECT(). - EnqueueData(id, gomock.Any(), 2). - DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { - require.Equal(t, 2, numPoints) - checkCompressedData(t, data, points[6:]) - return nil - }) + // Check batch 3 + mocks.durableQueueManager.EXPECT(). + EnqueueData(id, gomock.Any(), 2). + DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { + require.Equal(t, 2, numPoints) + checkCompressedData(t, data, points[6:]) + return nil + }) + } + + require.NoError(t, svc.WritePoints(ctx, orgID, id1, points)) + }) } - - require.NoError(t, svc.WritePoints(ctx, orgID, id1, points)) } func TestWritePoints_LocalFailure(t *testing.T) { @@ -897,6 +922,7 @@ func newTestService(t *testing.T) (*service, mocks) { durableQueueManager: mocks.durableQueueManager, localWriter: mocks.pointWriter, maxRemoteWriteBatchSize: maxRemoteWriteBatchSize, + maxRemoteWritePointSize: maxRemoteWritePointSize, } return &svc, mocks