diff --git a/modules/system/system.install b/modules/system/system.install index 77ef11498ae..4febc30e8d9 100644 --- a/modules/system/system.install +++ b/modules/system/system.install @@ -1226,12 +1226,6 @@ function system_schema() { 'default' => '', 'description' => 'The queue name.', ), - 'consumer_id' => array( - 'type' => 'int', - 'not null' => TRUE, - 'default' => 0, - 'description' => 'The ID of the dequeuing consumer.', - ), 'data' => array( 'type' => 'text', 'not null' => FALSE, @@ -1254,23 +1248,11 @@ function system_schema() { ), 'primary key' => array('item_id'), 'indexes' => array( - 'consumer_queue' => array('consumer_id', 'name', 'created'), - 'consumer_expire' => array('consumer_id', 'expire'), + 'name_created' => array('name', 'created'), + 'expire' => array('expire'), ), ); - $schema['queue_consumer_id'] = array( - 'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.', - 'fields' => array( - 'consumer_id' => array( - 'type' => 'serial', - 'not null' => TRUE, - 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', - ), - ), - 'primary key' => array('consumer_id'), - ); - $schema['registry'] = array( 'description' => "Each record is a function, class, or interface name and the file it is in.", 'fields' => array( @@ -2253,12 +2235,6 @@ function system_update_7022() { 'default' => '', 'description' => 'The queue name.', ), - 'consumer_id' => array( - 'type' => 'int', - 'not null' => TRUE, - 'default' => 0, - 'description' => 'The ID of the dequeuing consumer.', - ), 'data' => array( 'type' => 'text', 'not null' => FALSE, @@ -2281,25 +2257,12 @@ function system_update_7022() { ), 'primary key' => array('item_id'), 'indexes' => array( - 'consumer_queue' => array('consumer_id', 'name', 'created'), - 'consumer_expire' => array('consumer_id', 'expire'), + 'name_created' => array('name', 'created'), + 'expire' => array('expire'), ), ); - $schema['queue_consumer_id'] = array( - 'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.', - 'fields' => array( - 'consumer_id' => array( - 'type' => 'serial', - 'not null' => TRUE, - 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.', - ), - ), - 'primary key' => array('consumer_id'), - ); - db_create_table('queue', $schema['queue']); - db_create_table('queue_consumer_id', $schema['queue_consumer_id']); } /** diff --git a/modules/system/system.module b/modules/system/system.module index 23db12b5602..9b6a90bd4b5 100644 --- a/modules/system/system.module +++ b/modules/system/system.module @@ -2426,7 +2426,6 @@ function system_cron() { // not used, this will simply be a no-op. db_update('queue') ->fields(array( - 'consumer_id' => 0, 'expire' => 0, )) ->condition('expire', REQUEST_TIME, '<') diff --git a/modules/system/system.queue.inc b/modules/system/system.queue.inc index 2132012fe49..c384f581de9 100644 --- a/modules/system/system.queue.inc +++ b/modules/system/system.queue.inc @@ -182,7 +182,6 @@ class SystemQueue implements DrupalQueueInterface { $record = new stdClass(); $record->name = $this->name; $record->data = $data; - $record->consumer_id = 0; // We cannot rely on REQUEST_TIME because many items might be created by a // single request which takes longer than 1 second. $record->created = time(); @@ -194,30 +193,25 @@ class SystemQueue implements DrupalQueueInterface { } public function claimItem($lease_time = 30) { - if (!isset($this->consumerId)) { - $this->consumerId = db_insert('queue_consumer_id') - ->useDefaults(array('consumer_id')) - ->execute(); - } - // Claim an item by updating its consumer_id and expire fields. If claim - // is not successful another thread may have claimed the item in the - // meantime. Therefore loop until an item is successfully claimed or we are - // reasonably sure there are no unclaimed items left. + // Claim an item by updating its expire fields. If claim is not successful + // another thread may have claimed the item in the meantime. Therefore loop + // until an item is successfully claimed or we are reasonably sure there + // are no unclaimed items left. while (TRUE) { - $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); + $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject(); if ($item) { - // Try to mark the item as ours. We cannot rely on REQUEST_TIME - // because items might be claimed by a single consumer which runs - // longer than 1 second. If we continue to use REQUEST_TIME instead of - // the current time(), we steal time from the lease, and will tend to - // reset items before the lease should really expire. + // Try to update the item. Only one thread can succeed in UPDATEing the + // same row. We cannot rely on REQUEST_TIME because items might be + // claimed by a single consumer which runs longer than 1 second. If we + // continue to use REQUEST_TIME instead of the current time(), we steal + // time from the lease, and will tend to reset items before the lease + // should really expire. $update = db_update('queue') ->fields(array( - 'consumer_id' => $this->consumerId, 'expire' => time() + $lease_time, )) ->condition('item_id', $item->item_id) - ->condition('consumer_id', 0); + ->condition('expire', 0); // If there are affected rows, this update succeeded. if ($update->execute()) { $item->data = unserialize($item->data);