Daemons - add "objectPHID" to task tables.

Summary: Ref T5402. This more or less "fixes" it but there's probably some polish to do?

Test Plan:
stopped and started daemons. error logs look good.

ran bin/storage upgrade.  noted that `adjust` added the appropriate indices for active and archive task.

Reviewers: epriestley

Reviewed By: epriestley

Subscribers: Korvin, epriestley

Maniphest Tasks: T5402

Differential Revision: https://secure.phabricator.com/D11044
This commit is contained in:
Bob Trahan
2014-12-23 16:30:05 -08:00
parent a4474a4975
commit 9219645287
12 changed files with 56 additions and 13 deletions

View File

@@ -0,0 +1,5 @@
ALTER TABLE {$NAMESPACE}_worker.worker_activetask
ADD objectPHID VARBINARY(64);
ALTER TABLE {$NAMESPACE}_worker.worker_archivetask
ADD objectPHID VARBINARY(64);

View File

@@ -188,7 +188,9 @@ final class DiffusionCommitHookEngine extends Phobject {
'emailPHIDs' => array_values($this->emailPHIDs), 'emailPHIDs' => array_values($this->emailPHIDs),
'info' => $this->loadCommitInfoForWorker($all_updates), 'info' => $this->loadCommitInfoForWorker($all_updates),
), ),
PhabricatorWorker::PRIORITY_ALERTS); array(
'priority' => PhabricatorWorker::PRIORITY_ALERTS,
));
} }
return 0; return 0;

View File

@@ -50,7 +50,9 @@ final class PhabricatorMailManagementResendWorkflow
$mailer_task = PhabricatorWorker::scheduleTask( $mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker', 'PhabricatorMetaMTAWorker',
$message->getID(), $message->getID(),
PhabricatorWorker::PRIORITY_ALERTS); array(
'priority' => PhabricatorWorker::PRIORITY_ALERTS,
));
$console->writeOut( $console->writeOut(
"Queued message #%d for resend.\n", "Queued message #%d for resend.\n",

View File

@@ -342,7 +342,9 @@ final class PhabricatorMetaMTAMail extends PhabricatorMetaMTADAO {
$mailer_task = PhabricatorWorker::scheduleTask( $mailer_task = PhabricatorWorker::scheduleTask(
'PhabricatorMetaMTAWorker', 'PhabricatorMetaMTAWorker',
$this->getID(), $this->getID(),
PhabricatorWorker::PRIORITY_ALERTS); array(
'priority' => PhabricatorWorker::PRIORITY_ALERTS,
));
$this->saveTransaction(); $this->saveTransaction();

View File

@@ -8,7 +8,9 @@ final class PhabricatorSearchIndexer {
array( array(
'documentPHID' => $phid, 'documentPHID' => $phid,
), ),
PhabricatorWorker::PRIORITY_IMPORT); array(
'priority' => PhabricatorWorker::PRIORITY_IMPORT,
));
} }
public function indexDocumentByPHID($phid) { public function indexDocumentByPHID($phid) {

View File

@@ -94,16 +94,19 @@ abstract class PhabricatorWorker {
final public static function scheduleTask( final public static function scheduleTask(
$task_class, $task_class,
$data, $data,
$priority = null) { $options = array()) {
$priority = idx($options, 'priority');
if ($priority === null) { if ($priority === null) {
$priority = self::PRIORITY_DEFAULT; $priority = self::PRIORITY_DEFAULT;
} }
$object_phid = idx($options, 'objectPHID');
$task = id(new PhabricatorWorkerActiveTask()) $task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class) ->setTaskClass($task_class)
->setData($data) ->setData($data)
->setPriority($priority); ->setPriority($priority)
->setObjectPHID($object_phid);
if (self::$runAllTasksInProcess) { if (self::$runAllTasksInProcess) {
// Do the work in-process. // Do the work in-process.
@@ -114,7 +117,8 @@ abstract class PhabricatorWorker {
$worker->doWork(); $worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) { foreach ($worker->getQueuedTasks() as $queued_task) {
list($queued_class, $queued_data, $queued_priority) = $queued_task; list($queued_class, $queued_data, $queued_priority) = $queued_task;
self::scheduleTask($queued_class, $queued_data, $queued_priority); $queued_options = array('priority' => $queued_priority);
self::scheduleTask($queued_class, $queued_data, $queued_options);
} }
break; break;
} catch (PhabricatorWorkerYieldException $ex) { } catch (PhabricatorWorkerYieldException $ex) {

View File

@@ -206,7 +206,7 @@ final class PhabricatorWorkerTestCase extends PhabricatorTestCase {
return PhabricatorWorker::scheduleTask( return PhabricatorWorker::scheduleTask(
'PhabricatorTestWorker', 'PhabricatorTestWorker',
$data, $data,
$priority); array('priority' => $priority));
} }
} }

View File

@@ -9,6 +9,7 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
const PHASE_EXPIRED = 'expired'; const PHASE_EXPIRED = 'expired';
private $ids; private $ids;
private $objectPHIDs;
private $limit; private $limit;
private $skipLease; private $skipLease;
@@ -39,6 +40,11 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
return $this; return $this;
} }
public function withObjectPHIDs(array $phids) {
$this->objectPHIDs = $phids;
return $this;
}
public function setLimit($limit) { public function setLimit($limit) {
$this->limit = $limit; $this->limit = $limit;
return $this; return $this;
@@ -175,6 +181,10 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
$where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
} }
if ($this->objectPHIDs !== null) {
$where[] = qsprintf($conn_w, 'objectPHID IN (%Ls)', $this->objectPHIDs);
}
return $this->formatWhereClause($where); return $this->formatWhereClause($where);
} }

View File

@@ -33,7 +33,7 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
'leaseOwner_2' => array( 'leaseOwner_2' => array(
'columns' => array('leaseOwner', 'priority', 'id'), 'columns' => array('leaseOwner', 'priority', 'id'),
), ),
), ) + $parent[self::CONFIG_KEY_SCHEMA],
); );
$config[self::CONFIG_COLUMN_SCHEMA] = array( $config[self::CONFIG_COLUMN_SCHEMA] = array(
@@ -197,7 +197,12 @@ final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
if ($did_succeed) { if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) { foreach ($worker->getQueuedTasks() as $task) {
list($class, $data) = $task; list($class, $data) = $task;
PhabricatorWorker::scheduleTask($class, $data, $this->getPriority()); PhabricatorWorker::scheduleTask(
$class,
$data,
array(
'priority' => $this->getPriority(),
));
} }
} }

View File

@@ -10,11 +10,13 @@ final class PhabricatorWorkerArchiveTask extends PhabricatorWorkerTask {
protected $result; protected $result;
public function getConfiguration() { public function getConfiguration() {
$parent = parent::getConfiguration();
$config = array( $config = array(
// We manage the IDs in this table; they are allocated in the ActiveTask // We manage the IDs in this table; they are allocated in the ActiveTask
// table and moved here without alteration. // table and moved here without alteration.
self::CONFIG_IDS => self::IDS_MANUAL, self::CONFIG_IDS => self::IDS_MANUAL,
) + parent::getConfiguration(); ) + $parent;
$config[self::CONFIG_COLUMN_SCHEMA] = array( $config[self::CONFIG_COLUMN_SCHEMA] = array(
@@ -29,7 +31,7 @@ final class PhabricatorWorkerArchiveTask extends PhabricatorWorkerTask {
'leaseOwner' => array( 'leaseOwner' => array(
'columns' => array('leaseOwner', 'priority', 'id'), 'columns' => array('leaseOwner', 'priority', 'id'),
), ),
); ) + $parent[self::CONFIG_KEY_SCHEMA];
return $config; return $config;
} }

View File

@@ -10,6 +10,7 @@ abstract class PhabricatorWorkerTask extends PhabricatorWorkerDAO {
protected $failureCount; protected $failureCount;
protected $dataID; protected $dataID;
protected $priority; protected $priority;
protected $objectPHID;
private $data; private $data;
private $executionException; private $executionException;
@@ -23,6 +24,12 @@ abstract class PhabricatorWorkerTask extends PhabricatorWorkerDAO {
'failureCount' => 'uint32', 'failureCount' => 'uint32',
'failureTime' => 'epoch?', 'failureTime' => 'epoch?',
'priority' => 'uint32', 'priority' => 'uint32',
'objectPHID' => 'phid?',
),
self::CONFIG_KEY_SCHEMA => array(
'key_object' => array(
'columns' => array('objectPHID'),
),
), ),
) + parent::getConfiguration(); ) + parent::getConfiguration();
} }

View File

@@ -81,6 +81,8 @@ abstract class PhabricatorSMSImplementationAdapter {
'toNumbers' => $to_numbers, 'toNumbers' => $to_numbers,
'body' => $body, 'body' => $body,
), ),
PhabricatorWorker::PRIORITY_ALERTS); array(
'priority' => PhabricatorWorker::PRIORITY_ALERTS,
));
} }
} }