name = $name; } public function createItem($data) { $record = new stdClass(); $record->name = $this->name; $record->data = $data; $record->consumer_id = 0; // We cannot rely on REQUEST_TIME because many items might be created by a // single request which takes longer than 1 second. $record->created = time(); return drupal_write_record('queue', $record) !== FALSE; } public function numberOfItems() { return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); } public function claimItem($lease_time = 30) { if (!isset($this->consumerId)) { $this->consumerId = db_insert('queue_consumer_id') ->useDefaults(array('consumer_id')) ->execute(); } // Claim an item by updating its consumer_id and expire fields. If claim // is not successful another thread may have claimed the item in the // meantime. Therefore loop until an item is successfully claimed or we are // reasonably sure there are no unclaimed items left. while (TRUE) { $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', array(':name' => $this->name), 0, 1)->fetchObject(); if ($item) { // Try to mark the item as ours. We cannot rely on REQUEST_TIME // because items might be claimed by a single consumer which runs // longer than 1 second. If we continue to use REQUEST_TIME instead of // the current time(), we steal time from the lease, and will tend to // reset items before the lease should really expire. $update = db_update('queue') ->fields(array( 'consumer_id' => $this->consumerId, 'expire' => time() + $lease_time, )) ->condition('item_id', $item->item_id) ->condition('consumer_id', 0); // If there are affected rows, this update succeeded. if ($update->execute()) { $item->data = unserialize($item->data); return $item; } } else { // No items currently available to claim. return FALSE; } } } public function deleteItem($item) { db_delete('queue') ->condition('item_id', $item->item_id) ->execute(); } public function createQueue() { // All tasks are stored in a single database table (which is created when // Drupal is first installed) so there is nothing we need to do to create // a new queue. } public function deleteQueue() { db_delete('queue') ->condition('name', $this->name) ->execute(); } } /** * @} End of "defgroup queue". */