chore: increase replications batch size limits (#22983)

pull/22990/head
William Baker 2021-12-13 11:02:38 -06:00 committed by GitHub
parent a7a5233432
commit 0e5b14fa5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 50 deletions

View File

@ -21,9 +21,14 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// This is the same batch size limit used by the influx write command // InfluxDB docs suggest a batch size of 5000 lines for optimal write performance.
// https://github.com/influxdata/influx-cli/blob/a408c02bd462946ac6ebdedf6f62f5e3d81c1f6f/clients/write/buffer_batcher.go#L14 // https://docs.influxdata.com/influxdb/v2.1/write-data/best-practices/optimize-writes/
const maxRemoteWriteBatchSize = 500000 const maxRemoteWritePointSize = 5000
// Uncompressed size (bytes) is used as a secondary limit to prevent network issues and stay below cloud maximum payload
// limitations. 2.5 MB is about 50% of the limit on a basic cloud plan.
// https://docs.influxdata.com/influxdb/cloud/account-management/pricing-plans/#data-limits
const maxRemoteWriteBatchSize = 2500000
func errLocalBucketNotFound(id platform.ID, cause error) error { func errLocalBucketNotFound(id platform.ID, cause error) error {
return &ierrors.Error{ return &ierrors.Error{
@ -51,6 +56,7 @@ func NewService(sqlStore *sqlite.SqlStore, bktSvc BucketService, localWriter sto
store, store,
), ),
maxRemoteWriteBatchSize: maxRemoteWriteBatchSize, maxRemoteWriteBatchSize: maxRemoteWriteBatchSize,
maxRemoteWritePointSize: maxRemoteWritePointSize,
}, metrs }, metrs
} }
@ -96,6 +102,7 @@ type service struct {
localWriter storage.PointsWriter localWriter storage.PointsWriter
log *zap.Logger log *zap.Logger
maxRemoteWriteBatchSize int maxRemoteWriteBatchSize int
maxRemoteWritePointSize int
} }
func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
@ -337,9 +344,9 @@ func (s service) WritePoints(ctx context.Context, orgID platform.ID, bucketID pl
gzw := gzip.NewWriter(batches[0].data) gzw := gzip.NewWriter(batches[0].data)
// Iterate through points and compress in batches // Iterate through points and compress in batches
for _, p := range points { for count, p := range points {
// If current point will cause this batch to exceed max size, start a new batch for it first // If current point will cause this batch to exceed max size, start a new batch for it first
if currentBatchSize+p.StringSize() > s.maxRemoteWriteBatchSize { if s.startNewBatch(currentBatchSize, p.StringSize(), count) {
batches = append(batches, &batch{ batches = append(batches, &batch{
data: &bytes.Buffer{}, data: &bytes.Buffer{},
numPoints: 0, numPoints: 0,
@ -416,3 +423,8 @@ func (s service) Close() error {
} }
return nil return nil
} }
func (s service) startNewBatch(currentSize, nextSize, pointCount int) bool {
return currentSize+nextSize > s.maxRemoteWriteBatchSize ||
pointCount > 0 && pointCount%s.maxRemoteWritePointSize == 0
}

View File

@ -689,23 +689,46 @@ disk,host=C value=1.3 1000000000`)
func TestWritePointsBatches(t *testing.T) { func TestWritePointsBatches(t *testing.T) {
t.Parallel() t.Parallel()
svc, mocks := newTestService(t) tests := []struct {
name string
// Set batch size to smaller size for testing (should result in 3 batches sized 93, 93, and 63 - total size 249) setupFn func(*testing.T, *service)
svc.maxRemoteWriteBatchSize = 100 }{
{
// Define metadata for two replications name: "batch bytes size",
list := &influxdb.Replications{ setupFn: func(t *testing.T, svc *service) {
Replications: []influxdb.Replication{replication1, replication2}, t.Helper()
// Set batch size to smaller size for testing (should result in 3 batches sized 93, 93, and 63 - total size 249)
svc.maxRemoteWriteBatchSize = 100
},
},
{
name: "batch point size",
setupFn: func(t *testing.T, svc *service) {
t.Helper()
// Set point size to smaller size for testing (should result in 3 batches with 3 points, 3 points, and 2 points)
svc.maxRemoteWritePointSize = 3
},
},
} }
mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), influxdb.ReplicationListFilter{ for _, tt := range tests {
OrgID: orgID, t.Run(tt.name, func(t *testing.T) {
LocalBucketID: &id1, svc, mocks := newTestService(t)
}).Return(list, nil)
// Define some points of line protocol, parse string --> []Point tt.setupFn(t, svc)
points, err := models.ParsePointsString(`
// Define metadata for two replications
list := &influxdb.Replications{
Replications: []influxdb.Replication{replication1, replication2},
}
mocks.serviceStore.EXPECT().ListReplications(gomock.Any(), influxdb.ReplicationListFilter{
OrgID: orgID,
LocalBucketID: &id1,
}).Return(list, nil)
// Define some points of line protocol, parse string --> []Point
points, err := models.ParsePointsString(`
cpu,host=0 value=1.1 6000000000 cpu,host=0 value=1.1 6000000000
cpu,host=A value=1.2 2000000000 cpu,host=A value=1.2 2000000000
cpu,host=A value=1.3 3000000000 cpu,host=A value=1.3 3000000000
@ -714,42 +737,44 @@ cpu,host=B value=1.3 5000000000
cpu,host=C value=1.3 1000000000 cpu,host=C value=1.3 1000000000
mem,host=C value=1.3 1000000000 mem,host=C value=1.3 1000000000
disk,host=C value=1.3 1000000000`) disk,host=C value=1.3 1000000000`)
require.NoError(t, err) require.NoError(t, err)
// Points should successfully write to local TSM. // Points should successfully write to local TSM.
mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil) mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), orgID, id1, points).Return(nil)
// Points should successfully be enqueued in the 2 replications associated with the local bucket. // Points should successfully be enqueued in the 2 replications associated with the local bucket.
for _, id := range []platform.ID{replication1.ID, replication2.ID} { for _, id := range []platform.ID{replication1.ID, replication2.ID} {
// Check batch 1 // Check batch 1
mocks.durableQueueManager.EXPECT(). mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 3). EnqueueData(id, gomock.Any(), 3).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 3, numPoints) require.Equal(t, 3, numPoints)
checkCompressedData(t, data, points[:3]) checkCompressedData(t, data, points[:3])
return nil return nil
}) })
// Check batch 2 // Check batch 2
mocks.durableQueueManager.EXPECT(). mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 3). EnqueueData(id, gomock.Any(), 3).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 3, numPoints) require.Equal(t, 3, numPoints)
checkCompressedData(t, data, points[3:6]) checkCompressedData(t, data, points[3:6])
return nil return nil
}) })
// Check batch 3 // Check batch 3
mocks.durableQueueManager.EXPECT(). mocks.durableQueueManager.EXPECT().
EnqueueData(id, gomock.Any(), 2). EnqueueData(id, gomock.Any(), 2).
DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error { DoAndReturn(func(_ platform.ID, data []byte, numPoints int) error {
require.Equal(t, 2, numPoints) require.Equal(t, 2, numPoints)
checkCompressedData(t, data, points[6:]) checkCompressedData(t, data, points[6:])
return nil return nil
}) })
}
require.NoError(t, svc.WritePoints(ctx, orgID, id1, points))
})
} }
require.NoError(t, svc.WritePoints(ctx, orgID, id1, points))
} }
func TestWritePoints_LocalFailure(t *testing.T) { func TestWritePoints_LocalFailure(t *testing.T) {
@ -897,6 +922,7 @@ func newTestService(t *testing.T) (*service, mocks) {
durableQueueManager: mocks.durableQueueManager, durableQueueManager: mocks.durableQueueManager,
localWriter: mocks.pointWriter, localWriter: mocks.pointWriter,
maxRemoteWriteBatchSize: maxRemoteWriteBatchSize, maxRemoteWriteBatchSize: maxRemoteWriteBatchSize,
maxRemoteWritePointSize: maxRemoteWritePointSize,
} }
return &svc, mocks return &svc, mocks