refactor: remove replications.current_queue_size_bytes from sqlite (#22832)
Maintaining the current queue size in a SQL column would require updating the DB on every queue operation. Avoid that contention by instead looking up the current size on the in-memory durable queue struct, which is already tracked & updated as data enters & leaves the queue.pull/22835/head
parent
a7f3b67092
commit
1aac92c5ee
|
@ -189,7 +189,7 @@ func (l *Queue) Open() error {
|
|||
return l.trimHead(false)
|
||||
}
|
||||
|
||||
l.queueTotalSize.Add(l.diskUsage())
|
||||
l.queueTotalSize.Add(l.DiskUsage())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -365,8 +365,8 @@ func (l *Queue) Dir() string {
|
|||
return l.dir
|
||||
}
|
||||
|
||||
// diskUsage returns the total size on disk used by the Queue.
|
||||
func (l *Queue) diskUsage() int64 {
|
||||
// DiskUsage returns the total size on disk used by the Queue.
|
||||
func (l *Queue) DiskUsage() int64 {
|
||||
var size int64
|
||||
for _, s := range l.segments {
|
||||
size += s.diskUsage()
|
||||
|
|
|
@ -24,7 +24,7 @@ func BenchmarkQueueAppend(b *testing.B) {
|
|||
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err := q.Append([]byte(fmt.Sprintf("%d", i))); err != nil {
|
||||
println(q.diskUsage())
|
||||
println(q.DiskUsage())
|
||||
b.Fatalf("Queue.Append failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,3 +126,20 @@ func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, max
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CurrentQueueSizes returns the current size-on-disk for the requested set of durable queues.
|
||||
func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) {
|
||||
qm.mutex.RLock()
|
||||
defer qm.mutex.RUnlock()
|
||||
|
||||
sizes := make(map[platform.ID]int64, len(ids))
|
||||
|
||||
for _, id := range ids {
|
||||
if qm.replicationQueues[id] == nil {
|
||||
return nil, fmt.Errorf("durable queue not found for replication ID %q", id)
|
||||
}
|
||||
sizes[id] = qm.replicationQueues[id].DiskUsage()
|
||||
}
|
||||
|
||||
return sizes, nil
|
||||
}
|
||||
|
|
|
@ -34,6 +34,21 @@ func (m *MockDurableQueueManager) EXPECT() *MockDurableQueueManagerMockRecorder
|
|||
return m.recorder
|
||||
}
|
||||
|
||||
// CurrentQueueSizes mocks base method.
|
||||
func (m *MockDurableQueueManager) CurrentQueueSizes(arg0 []platform.ID) (map[platform.ID]int64, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "CurrentQueueSizes", arg0)
|
||||
ret0, _ := ret[0].(map[platform.ID]int64)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// CurrentQueueSizes indicates an expected call of CurrentQueueSizes.
|
||||
func (mr *MockDurableQueueManagerMockRecorder) CurrentQueueSizes(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).CurrentQueueSizes), arg0)
|
||||
}
|
||||
|
||||
// DeleteQueue mocks base method.
|
||||
func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
|
|
@ -63,6 +63,7 @@ type DurableQueueManager interface {
|
|||
InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64) error
|
||||
DeleteQueue(replicationID platform.ID) error
|
||||
UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error
|
||||
CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
|
@ -76,10 +77,8 @@ type service struct {
|
|||
|
||||
func (s service) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
|
||||
q := sq.Select(
|
||||
"id", "org_id", "name", "description",
|
||||
"remote_id", "local_bucket_id", "remote_bucket_id",
|
||||
"max_queue_size_bytes", "current_queue_size_bytes",
|
||||
"latest_response_code", "latest_error_message").
|
||||
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id",
|
||||
"max_queue_size_bytes", "latest_response_code", "latest_error_message").
|
||||
From("replications").
|
||||
Where(sq.Eq{"org_id": filter.OrgID})
|
||||
|
||||
|
@ -102,6 +101,23 @@ func (s service) ListReplications(ctx context.Context, filter influxdb.Replicati
|
|||
if err := s.store.DB.SelectContext(ctx, &rs.Replications, query, args...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(rs.Replications) == 0 {
|
||||
return &rs, nil
|
||||
}
|
||||
|
||||
ids := make([]platform.ID, len(rs.Replications))
|
||||
for i := range rs.Replications {
|
||||
ids[i] = rs.Replications[i].ID
|
||||
}
|
||||
sizes, err := s.durableQueueManager.CurrentQueueSizes(ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range rs.Replications {
|
||||
rs.Replications[i].CurrentQueueSizeBytes = sizes[rs.Replications[i].ID]
|
||||
}
|
||||
|
||||
return &rs, nil
|
||||
}
|
||||
|
||||
|
@ -123,19 +139,18 @@ func (s service) CreateReplication(ctx context.Context, request influxdb.CreateR
|
|||
|
||||
q := sq.Insert("replications").
|
||||
SetMap(sq.Eq{
|
||||
"id": newID,
|
||||
"org_id": request.OrgID,
|
||||
"name": request.Name,
|
||||
"description": request.Description,
|
||||
"remote_id": request.RemoteID,
|
||||
"local_bucket_id": request.LocalBucketID,
|
||||
"remote_bucket_id": request.RemoteBucketID,
|
||||
"max_queue_size_bytes": request.MaxQueueSizeBytes,
|
||||
"current_queue_size_bytes": 0,
|
||||
"created_at": "datetime('now')",
|
||||
"updated_at": "datetime('now')",
|
||||
"id": newID,
|
||||
"org_id": request.OrgID,
|
||||
"name": request.Name,
|
||||
"description": request.Description,
|
||||
"remote_id": request.RemoteID,
|
||||
"local_bucket_id": request.LocalBucketID,
|
||||
"remote_bucket_id": request.RemoteBucketID,
|
||||
"max_queue_size_bytes": request.MaxQueueSizeBytes,
|
||||
"created_at": "datetime('now')",
|
||||
"updated_at": "datetime('now')",
|
||||
}).
|
||||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes")
|
||||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes")
|
||||
|
||||
cleanupQueue := func() {
|
||||
if cleanupErr := s.durableQueueManager.DeleteQueue(newID); cleanupErr != nil {
|
||||
|
@ -187,8 +202,7 @@ func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb.
|
|||
q := sq.Select(
|
||||
"id", "org_id", "name", "description",
|
||||
"remote_id", "local_bucket_id", "remote_bucket_id",
|
||||
"max_queue_size_bytes", "current_queue_size_bytes",
|
||||
"latest_response_code", "latest_error_message").
|
||||
"max_queue_size_bytes", "latest_response_code", "latest_error_message").
|
||||
From("replications").
|
||||
Where(sq.Eq{"id": id})
|
||||
|
||||
|
@ -204,6 +218,13 @@ func (s service) GetReplication(ctx context.Context, id platform.ID) (*influxdb.
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sizes, err := s.durableQueueManager.CurrentQueueSizes([]platform.ID{r.ID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.CurrentQueueSizeBytes = sizes[r.ID]
|
||||
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
|
@ -229,7 +250,7 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request
|
|||
}
|
||||
|
||||
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).
|
||||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, current_queue_size_bytes")
|
||||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes")
|
||||
|
||||
query, args, err := q.ToSql()
|
||||
if err != nil {
|
||||
|
@ -253,6 +274,13 @@ func (s service) UpdateReplication(ctx context.Context, id platform.ID, request
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
sizes, err := s.durableQueueManager.CurrentQueueSizes([]platform.ID{r.ID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.CurrentQueueSizeBytes = sizes[r.ID]
|
||||
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,10 @@ import (
|
|||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/validator.go github.com/influxdata/influxdb/v2/replications ReplicationValidator
|
||||
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/bucket_service.go github.com/influxdata/influxdb/v2/replications BucketService
|
||||
//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/queue_management.go github.com/influxdata/influxdb/v2/replications DurableQueueManager
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
initID = platform.ID(1)
|
||||
|
@ -99,6 +103,8 @@ func TestCreateAndGetReplication(t *testing.T) {
|
|||
require.Equal(t, replication, *created)
|
||||
|
||||
// Read the created replication and assert it matches the creation response.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
got, err = svc.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *got)
|
||||
|
@ -249,6 +255,8 @@ func TestUpdateAndGetReplication(t *testing.T) {
|
|||
|
||||
// Update the replication.
|
||||
mocks.durableQueueManager.EXPECT().UpdateMaxQueueSize(initID, *updateReq.MaxQueueSizeBytes)
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
updated, err = svc.UpdateReplication(ctx, initID, updateReq)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, updatedReplication, *updated)
|
||||
|
@ -279,6 +287,8 @@ func TestUpdateMissingRemote(t *testing.T) {
|
|||
require.Nil(t, updated)
|
||||
|
||||
// Make sure nothing changed in the DB.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
got, err := svc.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *got)
|
||||
|
@ -303,6 +313,8 @@ func TestUpdateNoop(t *testing.T) {
|
|||
require.Equal(t, replication, *created)
|
||||
|
||||
// Send a no-op update, assert nothing changed.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
updated, err := svc.UpdateReplication(ctx, initID, influxdb.UpdateReplicationRequest{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *updated)
|
||||
|
@ -334,6 +346,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
|
|||
fmt.Sprintf("remote %q not found", *updateReq.RemoteID))
|
||||
|
||||
// Make sure nothing changed in the DB.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
got, err := svc.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *got)
|
||||
|
@ -365,6 +379,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
|
|||
require.Contains(t, svc.ValidateUpdatedReplication(ctx, initID, updateReq).Error(), fakeErr.Error())
|
||||
|
||||
// Make sure nothing changed in the DB.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
got, err := svc.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *got)
|
||||
|
@ -395,6 +411,8 @@ func TestValidateUpdatedReplicationWithoutPersisting(t *testing.T) {
|
|||
require.NoError(t, svc.ValidateUpdatedReplication(ctx, initID, updateReq))
|
||||
|
||||
// Make sure nothing changed in the DB.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID}).
|
||||
Return(map[platform.ID]int64{initID: replication.CurrentQueueSizeBytes}, nil)
|
||||
got, err := svc.GetReplication(ctx, initID)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, replication, *got)
|
||||
|
@ -457,6 +475,8 @@ func TestDeleteReplications(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1, initID + 2}).
|
||||
Return(map[platform.ID]int64{initID: 0, initID + 1: 0, initID + 2: 0}, nil)
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listed.Replications, 3)
|
||||
|
@ -466,6 +486,8 @@ func TestDeleteReplications(t *testing.T) {
|
|||
require.NoError(t, svc.DeleteBucketReplications(ctx, createReq.LocalBucketID))
|
||||
|
||||
// Ensure they were deleted.
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID + 1}).
|
||||
Return(map[platform.ID]int64{initID + 1: 0}, nil)
|
||||
listed, err = svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: replication.OrgID})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, listed.Replications, 1)
|
||||
|
@ -505,6 +527,8 @@ func TestListReplications(t *testing.T) {
|
|||
defer clean(t)
|
||||
allRepls := setup(t, svc, mocks)
|
||||
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1, initID + 2}).
|
||||
Return(map[platform.ID]int64{initID: 0, initID + 1: 0, initID + 2: 0}, nil)
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{OrgID: createReq.OrgID})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, influxdb.Replications{Replications: allRepls}, *listed)
|
||||
|
@ -517,6 +541,8 @@ func TestListReplications(t *testing.T) {
|
|||
defer clean(t)
|
||||
allRepls := setup(t, svc, mocks)
|
||||
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID + 1}).
|
||||
Return(map[platform.ID]int64{initID + 1: 0}, nil)
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{
|
||||
OrgID: createReq.OrgID,
|
||||
Name: &createReq2.Name,
|
||||
|
@ -532,6 +558,8 @@ func TestListReplications(t *testing.T) {
|
|||
defer clean(t)
|
||||
allRepls := setup(t, svc, mocks)
|
||||
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 1}).
|
||||
Return(map[platform.ID]int64{initID: 0, initID + 1: 0}, nil)
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{
|
||||
OrgID: createReq.OrgID,
|
||||
RemoteID: &createReq.RemoteID,
|
||||
|
@ -547,6 +575,8 @@ func TestListReplications(t *testing.T) {
|
|||
defer clean(t)
|
||||
allRepls := setup(t, svc, mocks)
|
||||
|
||||
mocks.durableQueueManager.EXPECT().CurrentQueueSizes([]platform.ID{initID, initID + 2}).
|
||||
Return(map[platform.ID]int64{initID: 0, initID + 2: 0}, nil)
|
||||
listed, err := svc.ListReplications(ctx, influxdb.ReplicationListFilter{
|
||||
OrgID: createReq.OrgID,
|
||||
LocalBucketID: &createReq.LocalBucketID,
|
||||
|
|
|
@ -8,7 +8,6 @@ CREATE TABLE replications
|
|||
local_bucket_id VARCHAR(16) NOT NULL,
|
||||
remote_bucket_id VARCHAR(16) NOT NULL,
|
||||
max_queue_size_bytes INTEGER NOT NULL,
|
||||
current_queue_size_bytes INTEGER NOT NULL,
|
||||
latest_response_code INTEGER,
|
||||
latest_error_message TEXT,
|
||||
created_at TIMESTAMP NOT NULL,
|
||||
|
|
Loading…
Reference in New Issue