Issue #2867001 by dpi, acbramley, SpadXIII, mstrelan, smustgrave, neclimdul: Dont treat suspending of a queue as erroneous

merge-requests/3360/merge
catch 2023-02-22 09:48:13 +00:00
parent 6acfec45d5
commit ccae3c0412
5 changed files with 115 additions and 20 deletions

View File

@ -200,11 +200,13 @@ class Cron implements CronInterface {
$queue->releaseItem($item);
}
catch (SuspendQueueException $e) {
// If the worker indicates there is a problem with the whole queue,
// release the item and skip to the next queue.
// If the worker indicates the whole queue should be skipped,
// release the item and go to the next queue.
$queue->releaseItem($item);
watchdog_exception('cron', $e);
$this->logger->debug('A worker for @queue queue suspended further processing of the queue.', [
'@queue' => $queue_name,
]);
// Skip to the next queue.
continue 2;

View File

@ -6,13 +6,18 @@ use Drupal\Core\Queue\QueueWorkerBase;
/**
* @QueueWorker(
* id = "cron_queue_test_exception",
* id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException::PLUGIN_ID,
* title = @Translation("Exception test"),
* cron = {"time" = 1}
* )
*/
class CronQueueTestException extends QueueWorkerBase {
/**
* The plugin ID.
*/
public const PLUGIN_ID = 'cron_queue_test_exception';
/**
* {@inheritdoc}
*/

View File

@ -7,13 +7,18 @@ use Drupal\Core\Queue\RequeueException;
/**
* @QueueWorker(
* id = "cron_queue_test_requeue_exception",
* id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException::PLUGIN_ID,
* title = @Translation("RequeueException test"),
* cron = {"time" = 60}
* )
*/
class CronQueueTestRequeueException extends QueueWorkerBase {
/**
* The plugin ID.
*/
public const PLUGIN_ID = 'cron_queue_test_requeue_exception';
/**
* {@inheritdoc}
*/

View File

@ -7,18 +7,23 @@ use Drupal\Core\Queue\SuspendQueueException;
/**
* @QueueWorker(
* id = "cron_queue_test_broken_queue",
* title = @Translation("Broken queue test"),
* id = \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue::PLUGIN_ID,
* title = @Translation("Suspend queue test"),
* cron = {"time" = 60}
* )
*/
class CronQueueTestBrokenQueue extends QueueWorkerBase {
class CronQueueTestSuspendQueue extends QueueWorkerBase {
/**
* The plugin ID.
*/
public const PLUGIN_ID = 'cron_queue_test_suspend';
/**
* {@inheritdoc}
*/
public function processItem($data) {
if ($data == 'crash') {
if ($data === 'suspend') {
throw new SuspendQueueException('The queue is broken.');
}
// Do nothing otherwise.

View File

@ -3,12 +3,18 @@
namespace Drupal\Tests\system\Kernel\System;
use Drupal\Core\Database\Database;
use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\Logger\RfcLogLevel;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
use Psr\Log\LoggerInterface;
/**
* Tests the Cron Queue runner.
@ -47,10 +53,19 @@ class CronQueueTest extends KernelTestBase {
*/
protected $currentTime = 1000;
/**
* A logger for testing.
*
* @var \PHPUnit\Framework\MockObject\MockObject|\Psr\Log\LoggerInterface
*/
protected $logger;
/**
* {@inheritdoc}
*/
protected function setUp(): void {
// Setup logger before register() is called.
$this->logger = $this->createMock(LoggerInterface::class);
parent::setUp();
$this->connection = Database::getConnection();
@ -152,11 +167,31 @@ class CronQueueTest extends KernelTestBase {
}
/**
* Tests that exceptions thrown by workers are handled properly.
* Tests that non-queue exceptions thrown by workers are handled properly.
*
* @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException
*/
public function testExceptions() {
public function testUncaughtExceptions() {
$this->logger->expects($this->atLeast(2))
->method('log')
->withConsecutive(
[
$this->equalTo(RfcLogLevel::ERROR),
$this->equalTo('%type: @message in %function (line %line of %file).'),
$this->callback(function ($args) {
return $args['@message'] === 'That is not supposed to happen.' &&
$args['exception'] instanceof \Exception;
}),
],
[
$this->equalTo(RfcLogLevel::INFO),
$this->equalTo('Cron run completed.'),
$this->anything(),
],
);
// Get the queue to test the normal Exception.
$queue = $this->container->get('queue')->get('cron_queue_test_exception');
$queue = $this->container->get('queue')->get(CronQueueTestException::PLUGIN_ID);
// Enqueue an item for processing.
$queue->createItem([$this->randomMachineName() => $this->randomMachineName()]);
@ -181,31 +216,64 @@ class CronQueueTest extends KernelTestBase {
$this->cron->run();
$this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
$this->assertEquals(0, $queue->numberOfItems(), 'Item was processed and removed from the queue.');
}
/**
* Tests suspend queue exception is handled properly.
*
* @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue
* @covers \Drupal\Core\Queue\SuspendQueueException
*/
public function testSuspendQueueException(): void {
$this->logger->expects($this->atLeast(2))
->method('log')
->withConsecutive(
[
$this->equalTo(RfcLogLevel::DEBUG),
$this->equalTo('A worker for @queue queue suspended further processing of the queue.'),
$this->callback(function ($args) {
return $args['@queue'] === CronQueueTestSuspendQueue::PLUGIN_ID;
}),
],
[
$this->equalTo(RfcLogLevel::INFO),
$this->equalTo('Cron run completed.'),
$this->anything(),
],
);
// Get the queue to test the specific SuspendQueueException.
$queue = $this->container->get('queue')->get('cron_queue_test_broken_queue');
$queue = \Drupal::queue(CronQueueTestSuspendQueue::PLUGIN_ID);
// Enqueue several item for processing.
$queue->createItem('process');
$queue->createItem('crash');
$queue->createItem('suspend');
$queue->createItem('ignored');
// Run cron; the worker for this queue should process as far as the crashing
// item.
// Run cron; the worker for this queue should process as far as the
// suspending item.
$this->cron->run();
// Only one item should have been processed.
$this->assertEquals(2, $queue->numberOfItems(), 'Failing queue stopped processing at the failing item.');
$this->assertEquals(2, $queue->numberOfItems(), 'Suspended queue stopped processing at the suspending item.');
// Check the items remaining in the queue. The item that throws the
// exception gets released by cron, so we can claim it again to check it.
$item = $queue->claimItem();
$this->assertEquals('crash', $item->data, 'Failing item remains in the queue.');
$this->assertEquals('suspend', $item->data, 'Suspending item remains in the queue.');
$item = $queue->claimItem();
$this->assertEquals('ignored', $item->data, 'Item beyond the failing item remains in the queue.');
$this->assertEquals('ignored', $item->data, 'Item beyond the suspending item remains in the queue.');
}
/**
* Tests requeue exception is handled properly.
*
* @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException
* @covers \Drupal\Core\Queue\RequeueException
*/
public function testRequeueException(): void {
// Test the requeueing functionality.
$queue = $this->container->get('queue')->get('cron_queue_test_requeue_exception');
$queue = $this->container->get('queue')->get(CronQueueTestRequeueException::PLUGIN_ID);
$queue->createItem([]);
$this->cron->run();
@ -261,4 +329,14 @@ class CronQueueTest extends KernelTestBase {
$this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
}
/**
* {@inheritdoc}
*/
public function register(ContainerBuilder $container) {
parent::register($container);
$container->register('test_logger', get_class($this->logger))
->addTag('logger');
$container->set('test_logger', $this->logger);
}
}