From c88351cfa060738103b532918ed74c5bbdbe55a3 Mon Sep 17 00:00:00 2001 From: Alex Pott Date: Tue, 3 May 2022 13:13:44 +0100 Subject: [PATCH] Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX Negyesi: Queue items only reserved by cron for 1 second (cherry picked from commit 97d9a57d4e0ad45c4185b275b7e896d8c8fe87aa) --- core/lib/Drupal/Core/Cron.php | 6 +- core/lib/Drupal/Core/Queue/DatabaseQueue.php | 4 +- .../Drupal/Core/Queue/QueueWorkerManager.php | 13 ++-- .../Queue/QueueWorkerManagerInterface.php | 7 +++ .../QueueWorker/CronQueueTestLeaseTime.php | 27 ++++++++ .../tests/src/Kernel/System/CronQueueTest.php | 62 ++++++++++++++++++- 6 files changed, 109 insertions(+), 10 deletions(-) create mode 100644 core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php diff --git a/core/lib/Drupal/Core/Cron.php b/core/lib/Drupal/Core/Cron.php index 5464f7238d0..74e50206428 100644 --- a/core/lib/Drupal/Core/Cron.php +++ b/core/lib/Drupal/Core/Cron.php @@ -174,10 +174,10 @@ class Cron implements CronInterface { $this->queueFactory->get($queue_name)->createQueue(); $queue_worker = $this->queueManager->createInstance($queue_name); - $end = time() + ($info['cron']['time'] ?? 15); + $end = $this->time->getCurrentTime() + $info['cron']['time']; $queue = $this->queueFactory->get($queue_name); - $lease_time = isset($info['cron']['time']) ?: NULL; - while (time() < $end && ($item = $queue->claimItem($lease_time))) { + $lease_time = $info['cron']['time']; + while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) { try { $queue_worker->processItem($item->data); $queue->deleteItem($item); diff --git a/core/lib/Drupal/Core/Queue/DatabaseQueue.php b/core/lib/Drupal/Core/Queue/DatabaseQueue.php index 32a87feb323..2af4360dcab 100644 --- a/core/lib/Drupal/Core/Queue/DatabaseQueue.php +++ b/core/lib/Drupal/Core/Queue/DatabaseQueue.php @@ -241,7 +241,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt try { // Clean up the queue for failed batches. $this->connection->delete(static::TABLE_NAME) - ->condition('created', REQUEST_TIME - 864000, '<') + ->condition('created', \Drupal::time()->getRequestTime() - 864000, '<') ->condition('name', 'drupal_batch:%', 'LIKE') ->execute(); @@ -252,7 +252,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt 'expire' => 0, ]) ->condition('expire', 0, '<>') - ->condition('expire', REQUEST_TIME, '<') + ->condition('expire', \Drupal::time()->getRequestTime(), '<') ->execute(); } catch (\Exception $e) { diff --git a/core/lib/Drupal/Core/Queue/QueueWorkerManager.php b/core/lib/Drupal/Core/Queue/QueueWorkerManager.php index b1aa40f51c9..818231d3bfa 100644 --- a/core/lib/Drupal/Core/Queue/QueueWorkerManager.php +++ b/core/lib/Drupal/Core/Queue/QueueWorkerManager.php @@ -40,11 +40,16 @@ class QueueWorkerManager extends DefaultPluginManager implements QueueWorkerMana public function processDefinition(&$definition, $plugin_id) { parent::processDefinition($definition, $plugin_id); - // Assign a default time if a cron is specified. + // Safeguard to ensure the default lease time is used in the case of a + // malformed queue worker annotation where cron is specified without a time, + // or an invalid time is provided. + // + // @see \Drupal\Core\Cron::processQueues() if (isset($definition['cron'])) { - $definition['cron'] += [ - 'time' => 15, - ]; + $time = $definition['cron']['time'] ?? 0; + if ($time <= 0) { + $definition['cron']['time'] = self::DEFAULT_QUEUE_CRON_TIME; + } } } diff --git a/core/lib/Drupal/Core/Queue/QueueWorkerManagerInterface.php b/core/lib/Drupal/Core/Queue/QueueWorkerManagerInterface.php index cb473b1c241..7acf4721869 100644 --- a/core/lib/Drupal/Core/Queue/QueueWorkerManagerInterface.php +++ b/core/lib/Drupal/Core/Queue/QueueWorkerManagerInterface.php @@ -9,4 +9,11 @@ use Drupal\Component\Plugin\PluginManagerInterface; */ interface QueueWorkerManagerInterface extends PluginManagerInterface { + /** + * The default time duration in seconds spent calling a queue worker. + * + * @var int + */ + public const DEFAULT_QUEUE_CRON_TIME = 15; + } diff --git a/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php new file mode 100644 index 00000000000..2027bd69a45 --- /dev/null +++ b/core/modules/system/tests/modules/cron_queue_test/src/Plugin/QueueWorker/CronQueueTestLeaseTime.php @@ -0,0 +1,27 @@ +get('cron_queue_test_lease_time', 0); + $count++; + $state->set('cron_queue_test_lease_time', $count); + throw new \Exception('Leave me queued and leased!'); + } + +} diff --git a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php index 7eef7520602..c424552e21a 100644 --- a/core/modules/system/tests/src/Kernel/System/CronQueueTest.php +++ b/core/modules/system/tests/src/Kernel/System/CronQueueTest.php @@ -5,6 +5,7 @@ namespace Drupal\Tests\system\Kernel\System; use Drupal\Core\Database\Database; use Drupal\Core\Queue\DatabaseQueue; use Drupal\Core\Queue\Memory; +use Drupal\Core\Queue\QueueWorkerManagerInterface; use Drupal\KernelTests\KernelTestBase; use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException; use Prophecy\Argument; @@ -120,6 +121,36 @@ class CronQueueTest extends KernelTestBase { $this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire); } + /** + * Tests that leases are expiring correctly, also within the same request. + */ + public function testLeaseTime() { + $queue = $this->container->get('queue')->get('cron_queue_test_lease_time'); + $queue->createItem([$this->randomMachineName() => $this->randomMachineName()]); + // Run initial queue job and ensure lease time variable is initialized. + $this->cron->run(); + static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time')); + // Ensure the same queue job is not picked up due to the extended lease. + $this->cron->run(); + static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time')); + + // Set the expiration time to 3 seconds ago, so the lease should + // automatically expire. + \Drupal::database() + ->update(DatabaseQueue::TABLE_NAME) + ->fields(['expire' => $this->currentTime - 3]) + ->execute(); + + // The queue job should now be picked back up since it's lease has expired, + // and the state variable should be consequently incremented. + $this->cron->run(); + static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time')); + // Ensure the same queue job is not picked up again due to the extended + // lease. + $this->cron->run(); + static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time')); + } + /** * Tests that exceptions thrown by workers are handled properly. */ @@ -145,7 +176,7 @@ class CronQueueTest extends KernelTestBase { // @see \Drupal\Core\Cron::processQueues() $this->connection->update('queue') ->condition('name', 'cron_queue_test_exception') - ->fields(['expire' => REQUEST_TIME - 1]) + ->fields(['expire' => \Drupal::time()->getRequestTime() - 1]) ->execute(); $this->cron->run(); $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception')); @@ -201,4 +232,33 @@ class CronQueueTest extends KernelTestBase { static::assertFalse($queue->releaseItem($item)); } + /** + * Test safeguard against invalid annotations in QueueWorkerManager. + */ + public function testQueueWorkerManagerSafeguard(): void { + $queue_worker_manager = $this->container->get('plugin.manager.queue_worker'); + $plugin_id = 'test_plugin_id'; + + // Ensure if no cron annotation is provided, none is added. + $definition = []; + $queue_worker_manager->processDefinition($definition, $plugin_id); + $this->assertArrayNotHasKey('cron', $definition); + + // Ensure if an empty cron annotation is provided, the default lease time is + // added. + $definition = ['cron' => []]; + $queue_worker_manager->processDefinition($definition, $plugin_id); + $this->assertArrayHasKey('time', $definition['cron']); + $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']); + + // Ensure if an invalid lease time (less-than 1 second) is provided, it is + // overridden with the default lease time. + $definition = ['cron' => ['time' => 0]]; + $queue_worker_manager->processDefinition($definition, $plugin_id); + $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']); + $definition = ['cron' => ['time' => -1]]; + $queue_worker_manager->processDefinition($definition, $plugin_id); + $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']); + } + }