From 1aac92c5ee0df44eb559717c3a99028bc5d28b73 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Fri, 5 Nov 2021 14:35:12 -0400 Subject: [PATCH] refactor: remove replications.current_queue_size_bytes from sqlite (#22832) Maintaining the current queue size in a SQL column would require updating the DB on every queue operation. Avoid that contention by instead looking up the current size on the in-memory durable queue struct, which is already tracked & updated as data enters & leaves the queue. --- pkg/durablequeue/queue.go | 6 +- pkg/durablequeue/queue_test.go | 2 +- replications/internal/queue_management.go | 17 +++++ replications/mock/queue_management.go | 15 +++++ replications/service.go | 66 +++++++++++++------ replications/service_test.go | 30 +++++++++ .../0005_create_replications_table.up.sql | 1 - 7 files changed, 113 insertions(+), 24 deletions(-) diff --git a/pkg/durablequeue/queue.go b/pkg/durablequeue/queue.go index 9de352c765..f13faacd3c 100644 --- a/pkg/durablequeue/queue.go +++ b/pkg/durablequeue/queue.go @@ -189,7 +189,7 @@ func (l *Queue) Open() error { return l.trimHead(false) } - l.queueTotalSize.Add(l.diskUsage()) + l.queueTotalSize.Add(l.DiskUsage()) return nil } @@ -365,8 +365,8 @@ func (l *Queue) Dir() string { return l.dir } -// diskUsage returns the total size on disk used by the Queue. -func (l *Queue) diskUsage() int64 { +// DiskUsage returns the total size on disk used by the Queue. +func (l *Queue) DiskUsage() int64 { var size int64 for _, s := range l.segments { size += s.diskUsage() diff --git a/pkg/durablequeue/queue_test.go b/pkg/durablequeue/queue_test.go index e0a802d8d5..88d1b2516c 100644 --- a/pkg/durablequeue/queue_test.go +++ b/pkg/durablequeue/queue_test.go @@ -24,7 +24,7 @@ func BenchmarkQueueAppend(b *testing.B) { for i := 0; i < b.N; i++ { if err := q.Append([]byte(fmt.Sprintf("%d", i))); err != nil { - println(q.diskUsage()) + println(q.DiskUsage()) b.Fatalf("Queue.Append failed: %v", err) } } diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index 2f1751b0ac..0c49d933b9 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -126,3 +126,20 @@ func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, max return nil } + +// CurrentQueueSizes returns the current size-on-disk for the requested set of durable queues. +func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) { + qm.mutex.RLock() + defer qm.mutex.RUnlock() + + sizes := make(map[platform.ID]int64, len(ids)) + + for _, id := range ids { + if qm.replicationQueues[id] == nil { + return nil, fmt.Errorf("durable queue not found for replication ID %q", id) + } + sizes[id] = qm.replicationQueues[id].DiskUsage() + } + + return sizes, nil +} diff --git a/replications/mock/queue_management.go b/replications/mock/queue_management.go index dc29d0b891..dcb838060b 100644 --- a/replications/mock/queue_management.go +++ b/replications/mock/queue_management.go @@ -34,6 +34,21 @@ func (m *MockDurableQueueManager) EXPECT() *MockDurableQueueManagerMockRecorder return m.recorder } +// CurrentQueueSizes mocks base method. +func (m *MockDurableQueueManager) CurrentQueueSizes(arg0 []platform.ID) (map[platform.ID]int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CurrentQueueSizes", arg0) + ret0, _ := ret[0].(map[platform.ID]int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CurrentQueueSizes indicates an expected call of CurrentQueueSizes. +func (mr *MockDurableQueueManagerMockRecorder) CurrentQueueSizes(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).CurrentQueueSizes), arg0) +} + // DeleteQueue mocks base method. func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error { m.ctrl.T.Helper() diff --git a/replications/service.go b/replications/service.go index 38cabc6d44..315897e21d 100644 --- a/replications/service.go +++ b/replications/service.go @@ -63,6 +63,7 @@ type DurableQueueManager interface { InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error DeleteQueue(replicationID platform.ID) error UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error + CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) } type service struct { @@ -76,10 +77,8 @@ type service struct { func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { q := sq.Select( - "id", "org_id", "name", "description", - "remote_id", "local_bucket_id", "remote_bucket_id", - "max_queue_size_bytes", "current_queue_size_bytes", - "latest_response_code", "latest_error_message"). + "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", + "max_queue_size_bytes", "latest_response_code", "latest_error_message"). From("replications"). Where(sq.Eq{"org_id": filter.OrgID}) @@ -102,6 +101,23 @@ func (s service) ListReplications(ctx context.Context, filter influxdb.Replicati if err := s.store.DB.SelectContext(ctx, &rs.Replications, query, args...); err != nil { return nil, err } + + if len(rs.Replications) == 0 { + return &rs, nil + } + + ids := make([]platform.ID, len(rs.Replications)) + for i := range rs.Replications { + ids[i] = rs.Replications[i].ID + } + sizes, err := s.durableQueueManager.CurrentQueueSizes(ids) + if err != nil { + return nil, err + } + for i := range rs.Replications { + rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID] + } + return &rs, nil } @@ -123,19 +139,18 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR q := sq.Insert("replications"). SetMap(sq.Eq{ - "id": newID, - "org_id": request.OrgID, - "name": request.Name, - "description": request.Description, - "remote_id": request.RemoteID, - "local_bucket_id": request.LocalBucketID, - "remote_bucket_id": request.RemoteBucketID, - "max_queue_size_bytes": request.MaxQueueSizeBytes, - "current_queue_size_bytes": 0, - "created_at": "datetime('now')", - "updated_at": "datetime('now')", + "id": newID, + "org_id": request.OrgID, + "name": request.Name, + "description": request.Description, + "remote_id": request.RemoteID, + "local_bucket_id": request.LocalBucketID, + "remote_bucket_id": request.RemoteBucketID, + "max_queue_size_bytes": request.MaxQueueSizeBytes, + "created_at": "datetime('now')", + "updated_at": "datetime('now')", }). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes") + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes") cleanupQueue := func() { if cleanupErr := s.durableQueueManager.DeleteQueue(newID); cleanupErr != nil { @@ -187,8 +202,7 @@ func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb. q := sq.Select( "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", - "max_queue_size_bytes", "current_queue_size_bytes", - "latest_response_code", "latest_error_message"). + "max_queue_size_bytes", "latest_response_code", "latest_error_message"). From("replications"). Where(sq.Eq{"id": id}) @@ -204,6 +218,13 @@ func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb. } return nil, err } + + sizes, err := s.durableQueueManager.CurrentQueueSizes([]platform.ID{r.ID}) + if err != nil { + return nil, err + } + r.CurrentQueueSizeBytes = sizes[r.ID] + return &r, nil } @@ -229,7 +250,7 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request } q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes") + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes") query, args, err := q.ToSql() if err != nil { @@ -253,6 +274,13 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request return nil, err } } + + sizes, err := s.durableQueueManager.CurrentQueueSizes([]platform.ID{r.ID}) + if err != nil { + return nil, err + } + r.CurrentQueueSizeBytes = sizes[r.ID] + return &r, nil } diff --git a/replications/service_test.go b/replications/service_test.go index 91934cd39a..1aea802f4f 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -19,6 +19,10 @@ import ( "go.uber.org/zap/zaptest" ) +//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/validator.go github.com/influxdata/influxdb/v2/replications ReplicationValidator +//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/bucket_service.go github.com/influxdata/influxdb/v2/replications BucketService +//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/queue_management.go github.com/influxdata/influxdb/v2/replications DurableQueueManager + var ( ctx = context.Background() initID = platform.ID(1) @@ -99,6 +103,8 @@ func TestCreateAndGetReplication(t *testing.T) { require.Equal(t, replication, *created) // Read the created replication and assert it matches the creation response. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) got, err = svc.GetReplication(ctx, initID) require.NoError(t, err) require.Equal(t, replication, *got) @@ -249,6 +255,8 @@ func TestUpdateAndGetReplication(t *testing.T) { // Update the replication. mocks.durableQueueManager.EXPECT().UpdateMaxQueueSize(initID, *updateReq.MaxQueueSizeBytes) + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) updated, err = svc.UpdateReplication(ctx, initID, updateReq) require.NoError(t, err) require.Equal(t, updatedReplication, *updated) @@ -279,6 +287,8 @@ func TestUpdateMissingRemote(t *testing.T) { require.Nil(t, updated) // Make sure nothing changed in the DB. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) got, err := svc.GetReplication(ctx, initID) require.NoError(t, err) require.Equal(t, replication, *got) @@ -303,6 +313,8 @@ func TestUpdateNoop(t *testing.T) { require.Equal(t, replication, *created) // Send a no-op update, assert nothing changed. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) updated, err := svc.UpdateReplication(ctx, initID, influxdb.UpdateReplicationRequest{}) require.NoError(t, err) require.Equal(t, replication, *updated) @@ -334,6 +346,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { fmt.Sprintf("remote %q not found", *updateReq.RemoteID)) // Make sure nothing changed in the DB. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) got, err := svc.GetReplication(ctx, initID) require.NoError(t, err) require.Equal(t, replication, *got) @@ -365,6 +379,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { require.Contains(t, svc.ValidateUpdatedReplication(ctx, initID, updateReq).Error(), fakeErr.Error()) // Make sure nothing changed in the DB. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) got, err := svc.GetReplication(ctx, initID) require.NoError(t, err) require.Equal(t, replication, *got) @@ -395,6 +411,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) { require.NoError(t, svc.ValidateUpdatedReplication(ctx, initID, updateReq)) // Make sure nothing changed in the DB. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}). + Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil) got, err := svc.GetReplication(ctx, initID) require.NoError(t, err) require.Equal(t, replication, *got) @@ -457,6 +475,8 @@ func TestDeleteReplications(t *testing.T) { require.NoError(t, err) } + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1, initID + 2}). + Return(map[platform.ID]int64{initID: 0, initID + 1: 0, initID + 2: 0}, nil) listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID}) require.NoError(t, err) require.Len(t, listed.Replications, 3) @@ -466,6 +486,8 @@ func TestDeleteReplications(t *testing.T) { require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID)) // Ensure they were deleted. + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID + 1}). + Return(map[platform.ID]int64{initID + 1: 0}, nil) listed, err = svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID}) require.NoError(t, err) require.Len(t, listed.Replications, 1) @@ -505,6 +527,8 @@ func TestListReplications(t *testing.T) { defer clean(t) allRepls := setup(t, svc, mocks) + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1, initID + 2}). + Return(map[platform.ID]int64{initID: 0, initID + 1: 0, initID + 2: 0}, nil) listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: createReq.OrgID}) require.NoError(t, err) require.Equal(t, influxdb.Replications{Replications: allRepls}, *listed) @@ -517,6 +541,8 @@ func TestListReplications(t *testing.T) { defer clean(t) allRepls := setup(t, svc, mocks) + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID + 1}). + Return(map[platform.ID]int64{initID + 1: 0}, nil) listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{ OrgID: createReq.OrgID, Name: &createReq2.Name, @@ -532,6 +558,8 @@ func TestListReplications(t *testing.T) { defer clean(t) allRepls := setup(t, svc, mocks) + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1}). + Return(map[platform.ID]int64{initID: 0, initID + 1: 0}, nil) listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{ OrgID: createReq.OrgID, RemoteID: &createReq.RemoteID, @@ -547,6 +575,8 @@ func TestListReplications(t *testing.T) { defer clean(t) allRepls := setup(t, svc, mocks) + mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 2}). + Return(map[platform.ID]int64{initID: 0, initID + 2: 0}, nil) listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{ OrgID: createReq.OrgID, LocalBucketID: &createReq.LocalBucketID, diff --git a/sqlite/migrations/0005_create_replications_table.up.sql b/sqlite/migrations/0005_create_replications_table.up.sql index ee0cd0f535..bed21fcc57 100644 --- a/sqlite/migrations/0005_create_replications_table.up.sql +++ b/sqlite/migrations/0005_create_replications_table.up.sql @@ -8,7 +8,6 @@ CREATE TABLE replications local_bucket_id VARCHAR(16) NOT NULL, remote_bucket_id VARCHAR(16) NOT NULL, max_queue_size_bytes INTEGER NOT NULL, - current_queue_size_bytes INTEGER NOT NULL, latest_response_code INTEGER, latest_error_message TEXT, created_at TIMESTAMP NOT NULL,