diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index 96589e1b49..c0e40ee778 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -21,7 +21,7 @@ import ( const ( scannerAdvanceInterval = 10 * time.Second purgeInterval = 60 * time.Second - defaultMaxAge = 168 * time.Hour / time.Second + defaultMaxAge = 7 * 24 * time.Hour // 1 week ) type remoteWriter interface { @@ -72,7 +72,7 @@ func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics. } // InitializeQueue creates and opens a new durable queue which is associated with a replication stream. -func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAge int64) error { +func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAgeSeconds int64) error { qm.mutex.Lock() defer qm.mutex.Unlock() @@ -112,7 +112,7 @@ func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQue } // Map new durable queue and scanner to its corresponding replication stream via replication ID - rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAge) + rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAgeSeconds) qm.replicationQueues[replicationID] = rq rq.Open() @@ -439,15 +439,15 @@ func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byt return nil } -func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAge int64) *replicationQueue { +func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAgeSeconds int64) *replicationQueue { logger := qm.logger.With(zap.String("replication_id", id.String())) done := make(chan struct{}) // check for max age minimum var maxAgeTime time.Duration - if maxAge < 0 { + if maxAgeSeconds < 0 { maxAgeTime = defaultMaxAge } else { - maxAgeTime = time.Duration(maxAge) + maxAgeTime = time.Duration(maxAgeSeconds) * time.Second } return &replicationQueue{