Revert "Issue #3230541 by cliddell, jday, yogeshmpawar, neclimdul, cmlara, Charlie ChX Negyesi: Queue items only reserved by cron for 1 second"
This reverts commit 0fc50fa51d
.
merge-requests/2081/head
parent
0fc50fa51d
commit
6572d37a8a
|
@ -52,15 +52,13 @@ class QueueWorker extends Plugin {
|
|||
public $title;
|
||||
|
||||
/**
|
||||
* An associative array containing optional keys.
|
||||
* An associative array containing an optional key.
|
||||
*
|
||||
* This property is optional and it does not need to be declared.
|
||||
*
|
||||
* Available keys:
|
||||
* - time (optional): How much time Drupal cron should spend on calling this
|
||||
* worker in seconds. Defaults to 15.
|
||||
* - lease_time: (optional) How long the lease is for a queue item when
|
||||
* called from cron in seconds. Defaults to 30.
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
|
|
|
@ -174,10 +174,10 @@ class Cron implements CronInterface {
|
|||
$this->queueFactory->get($queue_name)->createQueue();
|
||||
|
||||
$queue_worker = $this->queueManager->createInstance($queue_name);
|
||||
$end = $this->time->getCurrentTime() + $info['cron']['time'] ?? static::DEFAULT_QUEUE_CRON_TIME;
|
||||
$end = time() + ($info['cron']['time'] ?? 15);
|
||||
$queue = $this->queueFactory->get($queue_name);
|
||||
$lease_time = $info['cron']['lease_time'] ?? static::DEFAULT_QUEUE_CRON_LEASE_TIME;
|
||||
while ($this->time->getCurrentTime() < $end && ($item = $queue->claimItem($lease_time))) {
|
||||
$lease_time = isset($info['cron']['time']) ?: NULL;
|
||||
while (time() < $end && ($item = $queue->claimItem($lease_time))) {
|
||||
try {
|
||||
$queue_worker->processItem($item->data);
|
||||
$queue->deleteItem($item);
|
||||
|
|
|
@ -9,16 +9,6 @@ namespace Drupal\Core;
|
|||
*/
|
||||
interface CronInterface {
|
||||
|
||||
/**
|
||||
* The default time cron should execute each queue in seconds.
|
||||
*/
|
||||
public const DEFAULT_QUEUE_CRON_TIME = 15;
|
||||
|
||||
/**
|
||||
* The default lease time a queue item should get when called from cron.
|
||||
*/
|
||||
public const DEFAULT_QUEUE_CRON_LEASE_TIME = 30;
|
||||
|
||||
/**
|
||||
* Executes a cron run.
|
||||
*
|
||||
|
|
|
@ -241,7 +241,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt
|
|||
try {
|
||||
// Clean up the queue for failed batches.
|
||||
$this->connection->delete(static::TABLE_NAME)
|
||||
->condition('created', \Drupal::time()->getRequestTime() - 864000, '<')
|
||||
->condition('created', REQUEST_TIME - 864000, '<')
|
||||
->condition('name', 'drupal_batch:%', 'LIKE')
|
||||
->execute();
|
||||
|
||||
|
@ -252,7 +252,7 @@ class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInt
|
|||
'expire' => 0,
|
||||
])
|
||||
->condition('expire', 0, '<>')
|
||||
->condition('expire', \Drupal::time()->getRequestTime(), '<')
|
||||
->condition('expire', REQUEST_TIME, '<')
|
||||
->execute();
|
||||
}
|
||||
catch (\Exception $e) {
|
||||
|
|
|
@ -34,6 +34,20 @@ class QueueWorkerManager extends DefaultPluginManager implements QueueWorkerMana
|
|||
$this->alterInfo('queue_info');
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function processDefinition(&$definition, $plugin_id) {
|
||||
parent::processDefinition($definition, $plugin_id);
|
||||
|
||||
// Assign a default time if a cron is specified.
|
||||
if (isset($definition['cron'])) {
|
||||
$definition['cron'] += [
|
||||
'time' => 15,
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*
|
||||
|
|
|
@ -11,10 +11,7 @@ use Drupal\Core\Queue\QueueWorkerBase;
|
|||
* @QueueWorker(
|
||||
* id = "cron_queue_test_database_delay_exception",
|
||||
* title = @Translation("Database delay exception test"),
|
||||
* cron = {
|
||||
* "time" = 1,
|
||||
* "lease_time" = 2
|
||||
* }
|
||||
* cron = {"time" = 1}
|
||||
* )
|
||||
*/
|
||||
class CronQueueTestDatabaseDelayException extends QueueWorkerBase {
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Drupal\cron_queue_test\Plugin\QueueWorker;
|
||||
|
||||
use Drupal\Core\Queue\QueueWorkerBase;
|
||||
|
||||
/**
|
||||
* @QueueWorker(
|
||||
* id = "cron_queue_test_lease_time",
|
||||
* title = @Translation("Lease time test"),
|
||||
* cron = {
|
||||
* "time" = 5,
|
||||
* "lease_time" = 2
|
||||
* }
|
||||
* )
|
||||
*/
|
||||
class CronQueueTestLeaseTime extends QueueWorkerBase {
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function processItem($data) {
|
||||
$state = \Drupal::state();
|
||||
$count = $state->get('cron_queue_test_lease_time', 0);
|
||||
$count++;
|
||||
$state->set('cron_queue_test_lease_time', $count);
|
||||
throw new \Exception('Leave me queued and leased!');
|
||||
}
|
||||
|
||||
}
|
|
@ -11,10 +11,7 @@ use Drupal\Core\Queue\QueueWorkerBase;
|
|||
* @QueueWorker(
|
||||
* id = "cron_queue_test_memory_delay_exception",
|
||||
* title = @Translation("Memory delay exception test"),
|
||||
* cron = {
|
||||
* "time" = 1,
|
||||
* "lease_time" = 2
|
||||
* }
|
||||
* cron = {"time" = 1}
|
||||
* )
|
||||
*/
|
||||
class CronQueueTestMemoryDelayException extends QueueWorkerBase {
|
||||
|
|
|
@ -89,8 +89,8 @@ class CronQueueTest extends KernelTestBase {
|
|||
// Get the queue worker plugin manager.
|
||||
$manager = $this->container->get('plugin.manager.queue_worker');
|
||||
$definitions = $manager->getDefinitions();
|
||||
$this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['lease_time']);
|
||||
$this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['lease_time']);
|
||||
$this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']);
|
||||
$this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']);
|
||||
|
||||
// Create the necessary test data and run cron.
|
||||
$database->createItem('test');
|
||||
|
@ -120,30 +120,6 @@ class CronQueueTest extends KernelTestBase {
|
|||
$this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that leases are expiring correctly, also within the same request.
|
||||
*/
|
||||
public function testLeaseTime() {
|
||||
$queue = $this->container->get('queue')->get('cron_queue_test_lease_time');
|
||||
$queue->createItem([$this->randomMachineName() => $this->randomMachineName()]);
|
||||
$this->cron->run();
|
||||
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
|
||||
$this->cron->run();
|
||||
static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
|
||||
|
||||
// Set the expiration time to 3 seconds ago, so the lease should
|
||||
// automatically expire.
|
||||
\Drupal::database()
|
||||
->update(DatabaseQueue::TABLE_NAME)
|
||||
->fields(['expire' => $this->currentTime - 3])
|
||||
->execute();
|
||||
|
||||
$this->cron->run();
|
||||
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
|
||||
$this->cron->run();
|
||||
static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that exceptions thrown by workers are handled properly.
|
||||
*/
|
||||
|
@ -169,7 +145,7 @@ class CronQueueTest extends KernelTestBase {
|
|||
// @see \Drupal\Core\Cron::processQueues()
|
||||
$this->connection->update('queue')
|
||||
->condition('name', 'cron_queue_test_exception')
|
||||
->fields(['expire' => \Drupal::time()->getRequestTime() - 1])
|
||||
->fields(['expire' => REQUEST_TIME - 1])
|
||||
->execute();
|
||||
$this->cron->run();
|
||||
$this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
|
||||
|
|
Loading…
Reference in New Issue