Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX Negyesi: Queue items only reserved by cron for 1 second

(cherry picked from commit 97d9a57d4e0ad45c4185b275b7e896d8c8fe87aa)
merge-requests/2216/head
Alex Pott 2022-05-03 13:13:44 +01:00
parent 5d068a48dd
commit c88351cfa0
No known key found for this signature in database
GPG Key ID: BDA67E7EE836E5CE
6 changed files with 109 additions and 10 deletions

View File

@ -174,10 +174,10 @@ class Cron implements CronInterface {
$this->queueFactory->get($queue_name)->createQueue();
$queue_worker = $this->queueManager->createInstance($queue_name);
$end = time() + ($info['cron']['time'] ?? 15);
$end = $this->time->getCurrentTime() + $info['cron']['time'];
$queue = $this->queueFactory->get($queue_name);
$lease_time = isset($info['cron']['time']) ?: NULL;
while (time() < $end && ($item = $queue->claimItem($lease_time))) {
$lease_time = $info['cron']['time'];
while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
try {
$queue_worker->processItem($item->data);
$queue->deleteItem($item);

View File

@ -241,7 +241,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt
try {
// Clean up the queue for failed batches.
$this->connection->delete(static::TABLE_NAME)
->condition('created', REQUEST_TIME - 864000, '<')
->condition('created', \Drupal::time()->getRequestTime() - 864000, '<')
->condition('name', 'drupal_batch:%', 'LIKE')
->execute();
@ -252,7 +252,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt
'expire' => 0,
])
->condition('expire', 0, '<>')
->condition('expire', REQUEST_TIME, '<')
->condition('expire', \Drupal::time()->getRequestTime(), '<')
->execute();
}
catch (\Exception $e) {

View File

@ -40,11 +40,16 @@ class QueueWorkerManager extends DefaultPluginManager implements QueueWorkerMana
public function processDefinition(&$definition, $plugin_id) {
parent::processDefinition($definition, $plugin_id);
// Assign a default time if a cron is specified.
// Safeguard to ensure the default lease time is used in the case of a
// malformed queue worker annotation where cron is specified without a time,
// or an invalid time is provided.
//
// @see \Drupal\Core\Cron::processQueues()
if (isset($definition['cron'])) {
$definition['cron'] += [
'time' => 15,
];
$time = $definition['cron']['time'] ?? 0;
if ($time <= 0) {
$definition['cron']['time'] = self::DEFAULT_QUEUE_CRON_TIME;
}
}
}

View File

@ -9,4 +9,11 @@ use Drupal\Component\Plugin\PluginManagerInterface;
*/
interface QueueWorkerManagerInterface extends PluginManagerInterface {
/**
* The default time duration in seconds spent calling a queue worker.
*
* @var int
*/
public const DEFAULT_QUEUE_CRON_TIME = 15;
}

View File

@ -0,0 +1,27 @@
<?php
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* @QueueWorker(
* id = "cron_queue_test_lease_time",
* title = @Translation("Lease time test"),
* cron = {"time" = 100}
* )
*/
class CronQueueTestLeaseTime extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
$state = \Drupal::state();
$count = $state->get('cron_queue_test_lease_time', 0);
$count++;
$state->set('cron_queue_test_lease_time', $count);
throw new \Exception('Leave me queued and leased!');
}
}

View File

@ -5,6 +5,7 @@ namespace Drupal\Tests\system\Kernel\System;
use Drupal\Core\Database\Database;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
@ -120,6 +121,36 @@ class CronQueueTest extends KernelTestBase {
$this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
}
/**
* Tests that leases are expiring correctly, also within the same request.
*/
public function testLeaseTime() {
$queue = $this->container->get('queue')->get('cron_queue_test_lease_time');
$queue->createItem([$this->randomMachineName() => $this->randomMachineName()]);
// Run initial queue job and ensure lease time variable is initialized.
$this->cron->run();
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
// Ensure the same queue job is not picked up due to the extended lease.
$this->cron->run();
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
// Set the expiration time to 3 seconds ago, so the lease should
// automatically expire.
\Drupal::database()
->update(DatabaseQueue::TABLE_NAME)
->fields(['expire' => $this->currentTime - 3])
->execute();
// The queue job should now be picked back up since it's lease has expired,
// and the state variable should be consequently incremented.
$this->cron->run();
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
// Ensure the same queue job is not picked up again due to the extended
// lease.
$this->cron->run();
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
}
/**
* Tests that exceptions thrown by workers are handled properly.
*/
@ -145,7 +176,7 @@ class CronQueueTest extends KernelTestBase {
// @see \Drupal\Core\Cron::processQueues()
$this->connection->update('queue')
->condition('name', 'cron_queue_test_exception')
->fields(['expire' => REQUEST_TIME - 1])
->fields(['expire' => \Drupal::time()->getRequestTime() - 1])
->execute();
$this->cron->run();
$this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
@ -201,4 +232,33 @@ class CronQueueTest extends KernelTestBase {
static::assertFalse($queue->releaseItem($item));
}
/**
* Test safeguard against invalid annotations in QueueWorkerManager.
*/
public function testQueueWorkerManagerSafeguard(): void {
$queue_worker_manager = $this->container->get('plugin.manager.queue_worker');
$plugin_id = 'test_plugin_id';
// Ensure if no cron annotation is provided, none is added.
$definition = [];
$queue_worker_manager->processDefinition($definition, $plugin_id);
$this->assertArrayNotHasKey('cron', $definition);
// Ensure if an empty cron annotation is provided, the default lease time is
// added.
$definition = ['cron' => []];
$queue_worker_manager->processDefinition($definition, $plugin_id);
$this->assertArrayHasKey('time', $definition['cron']);
$this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
// Ensure if an invalid lease time (less-than 1 second) is provided, it is
// overridden with the default lease time.
$definition = ['cron' => ['time' => 0]];
$queue_worker_manager->processDefinition($definition, $plugin_id);
$this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
$definition = ['cron' => ['time' => -1]];
$queue_worker_manager->processDefinition($definition, $plugin_id);
$this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
}
}