Move task leasing to a dedicated query
Summary: This simplifies the fairly thorny logic of leasing tasks a bit. I'm planning to introduce another callsite shortly for Drydock. Test Plan: Ran `bin/phd debug taskmaster`, observed sensible queries and correct operation. Reviewers: btrahan Reviewed By: btrahan CC: aran Maniphest Tasks: T2015 Differential Revision: https://secure.phabricator.com/D3855
This commit is contained in:
		@@ -1143,6 +1143,7 @@ phutil_register_library_map(array(
 | 
			
		||||
    'PhabricatorWorkerActiveTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php',
 | 
			
		||||
    'PhabricatorWorkerArchiveTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerArchiveTask.php',
 | 
			
		||||
    'PhabricatorWorkerDAO' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerDAO.php',
 | 
			
		||||
    'PhabricatorWorkerLeaseQuery' => 'infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php',
 | 
			
		||||
    'PhabricatorWorkerTask' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTask.php',
 | 
			
		||||
    'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTaskData.php',
 | 
			
		||||
    'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php',
 | 
			
		||||
@@ -2305,6 +2306,7 @@ phutil_register_library_map(array(
 | 
			
		||||
    'PhabricatorWorkerActiveTask' => 'PhabricatorWorkerTask',
 | 
			
		||||
    'PhabricatorWorkerArchiveTask' => 'PhabricatorWorkerTask',
 | 
			
		||||
    'PhabricatorWorkerDAO' => 'PhabricatorLiskDAO',
 | 
			
		||||
    'PhabricatorWorkerLeaseQuery' => 'PhabricatorQuery',
 | 
			
		||||
    'PhabricatorWorkerTask' => 'PhabricatorWorkerDAO',
 | 
			
		||||
    'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO',
 | 
			
		||||
    'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController',
 | 
			
		||||
 
 | 
			
		||||
@@ -19,59 +19,13 @@
 | 
			
		||||
final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon {
 | 
			
		||||
 | 
			
		||||
  public function run() {
 | 
			
		||||
    $lease_ownership_name = $this->getLeaseOwnershipName();
 | 
			
		||||
 | 
			
		||||
    $task_table = new PhabricatorWorkerActiveTask();
 | 
			
		||||
    $taskdata_table = new PhabricatorWorkerTaskData();
 | 
			
		||||
 | 
			
		||||
    $sleep = 0;
 | 
			
		||||
    do {
 | 
			
		||||
      $this->log('Dequeuing a task...');
 | 
			
		||||
 | 
			
		||||
      $conn_w = $task_table->establishConnection('w');
 | 
			
		||||
      queryfx(
 | 
			
		||||
        $conn_w,
 | 
			
		||||
        'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15
 | 
			
		||||
          WHERE leaseOwner IS NULL LIMIT 1',
 | 
			
		||||
          $task_table->getTableName(),
 | 
			
		||||
          $lease_ownership_name);
 | 
			
		||||
      $rows = $conn_w->getAffectedRows();
 | 
			
		||||
 | 
			
		||||
      if (!$rows) {
 | 
			
		||||
        $this->log('No unleased tasks. Dequeuing an expired lease...');
 | 
			
		||||
        queryfx(
 | 
			
		||||
          $conn_w,
 | 
			
		||||
          'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15
 | 
			
		||||
            WHERE leaseExpires < UNIX_TIMESTAMP() LIMIT 1',
 | 
			
		||||
          $task_table->getTableName(),
 | 
			
		||||
          $lease_ownership_name);
 | 
			
		||||
        $rows = $conn_w->getAffectedRows();
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if ($rows) {
 | 
			
		||||
        $data = queryfx_all(
 | 
			
		||||
          $conn_w,
 | 
			
		||||
          'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime
 | 
			
		||||
            FROM %T task LEFT JOIN %T taskdata
 | 
			
		||||
              ON taskdata.id = task.dataID
 | 
			
		||||
            WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()
 | 
			
		||||
            LIMIT 1',
 | 
			
		||||
          $task_table->getTableName(),
 | 
			
		||||
          $taskdata_table->getTableName(),
 | 
			
		||||
          $lease_ownership_name);
 | 
			
		||||
        $tasks = $task_table->loadAllFromArray($data);
 | 
			
		||||
        $tasks = mpull($tasks, null, 'getID');
 | 
			
		||||
 | 
			
		||||
        $task_data = array();
 | 
			
		||||
        foreach ($data as $row) {
 | 
			
		||||
          $tasks[$row['id']]->setServerTime($row['_serverTime']);
 | 
			
		||||
          if ($row['_taskData']) {
 | 
			
		||||
            $task_data[$row['id']] = json_decode($row['_taskData'], true);
 | 
			
		||||
          } else {
 | 
			
		||||
            $task_data[$row['id']] = null;
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      $tasks = id(new PhabricatorWorkerLeaseQuery())
 | 
			
		||||
        ->setLimit(1)
 | 
			
		||||
        ->execute();
 | 
			
		||||
 | 
			
		||||
      if ($tasks) {
 | 
			
		||||
        foreach ($tasks as $task) {
 | 
			
		||||
          $id = $task->getID();
 | 
			
		||||
          $class = $task->getTaskClass();
 | 
			
		||||
@@ -84,7 +38,7 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon {
 | 
			
		||||
          // TODO: We should detect if we acquired a task with an excessive
 | 
			
		||||
          // failure count and fail it permanently.
 | 
			
		||||
 | 
			
		||||
          $data = idx($task_data, $task->getID());
 | 
			
		||||
          $data = $task->getData();
 | 
			
		||||
          try {
 | 
			
		||||
            if (!class_exists($class) ||
 | 
			
		||||
                !is_subclass_of($class, 'PhabricatorWorker')) {
 | 
			
		||||
@@ -124,12 +78,4 @@ final class PhabricatorTaskmasterDaemon extends PhabricatorDaemon {
 | 
			
		||||
    } while (true);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private function getLeaseOwnershipName() {
 | 
			
		||||
    static $name = null;
 | 
			
		||||
    if ($name === null) {
 | 
			
		||||
      $name = getmypid().':'.time().':'.php_uname('n');
 | 
			
		||||
    }
 | 
			
		||||
    return $name;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,154 @@
 | 
			
		||||
<?php
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Copyright 2012 Facebook, Inc.
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *   http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Select and lease tasks from the worker task queue.
 | 
			
		||||
 *
 | 
			
		||||
 * @group worker
 | 
			
		||||
 */
 | 
			
		||||
final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
 | 
			
		||||
 | 
			
		||||
  const PHASE_UNLEASED = 'unleased';
 | 
			
		||||
  const PHASE_EXPIRED  = 'expired';
 | 
			
		||||
 | 
			
		||||
  private $ids;
 | 
			
		||||
  private $limit;
 | 
			
		||||
 | 
			
		||||
  public function withIDs(array $ids) {
 | 
			
		||||
    $this->ids = $ids;
 | 
			
		||||
    return $this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public function setLimit($limit) {
 | 
			
		||||
    $this->limit = $limit;
 | 
			
		||||
    return $this;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public function execute() {
 | 
			
		||||
    if (!$this->limit) {
 | 
			
		||||
      throw new Exception("You must setLimit() when leasing tasks.");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    $task_table = new PhabricatorWorkerActiveTask();
 | 
			
		||||
    $taskdata_table = new PhabricatorWorkerTaskData();
 | 
			
		||||
    $lease_ownership_name = $this->getLeaseOwnershipName();
 | 
			
		||||
 | 
			
		||||
    $conn_w = $task_table->establishConnection('w');
 | 
			
		||||
 | 
			
		||||
    // Try to satisfy the request from new, unleased tasks first. If we don't
 | 
			
		||||
    // find enough tasks, try tasks with expired leases (i.e., tasks which have
 | 
			
		||||
    // previously failed).
 | 
			
		||||
 | 
			
		||||
    $phases = array(
 | 
			
		||||
      self::PHASE_UNLEASED,
 | 
			
		||||
      self::PHASE_EXPIRED,
 | 
			
		||||
    );
 | 
			
		||||
    $limit = $this->limit;
 | 
			
		||||
 | 
			
		||||
    $leased = 0;
 | 
			
		||||
    foreach ($phases as $phase) {
 | 
			
		||||
      queryfx(
 | 
			
		||||
        $conn_w,
 | 
			
		||||
        'UPDATE %T SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + 15
 | 
			
		||||
          %Q %Q %Q',
 | 
			
		||||
        $task_table->getTableName(),
 | 
			
		||||
        $lease_ownership_name,
 | 
			
		||||
        $this->buildWhereClause($conn_w, $phase),
 | 
			
		||||
        $this->buildOrderClause($conn_w),
 | 
			
		||||
        $this->buildLimitClause($conn_w, $limit - $leased));
 | 
			
		||||
 | 
			
		||||
      $leased += $conn_w->getAffectedRows();
 | 
			
		||||
      if ($leased == $limit) {
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!$leased) {
 | 
			
		||||
      return array();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    $data = queryfx_all(
 | 
			
		||||
      $conn_w,
 | 
			
		||||
      'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime
 | 
			
		||||
        FROM %T task LEFT JOIN %T taskdata
 | 
			
		||||
          ON taskdata.id = task.dataID
 | 
			
		||||
        WHERE leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()
 | 
			
		||||
        %Q %Q',
 | 
			
		||||
      $task_table->getTableName(),
 | 
			
		||||
      $taskdata_table->getTableName(),
 | 
			
		||||
      $lease_ownership_name,
 | 
			
		||||
      $this->buildOrderClause($conn_w),
 | 
			
		||||
      $this->buildLimitClause($conn_w, $limit));
 | 
			
		||||
 | 
			
		||||
    $tasks = $task_table->loadAllFromArray($data);
 | 
			
		||||
    $tasks = mpull($tasks, null, 'getID');
 | 
			
		||||
 | 
			
		||||
    foreach ($data as $row) {
 | 
			
		||||
      $tasks[$row['id']]->setServerTime($row['_serverTime']);
 | 
			
		||||
      if ($row['_taskData']) {
 | 
			
		||||
        $task_data = json_decode($row['_taskData'], true);
 | 
			
		||||
      } else {
 | 
			
		||||
        $task_data = null;
 | 
			
		||||
      }
 | 
			
		||||
      $tasks[$row['id']]->setData($task_data);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return $tasks;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
 | 
			
		||||
    $where = array();
 | 
			
		||||
 | 
			
		||||
    switch ($phase) {
 | 
			
		||||
      case self::PHASE_UNLEASED:
 | 
			
		||||
        $where[] = 'leaseOwner IS NULL';
 | 
			
		||||
        break;
 | 
			
		||||
      case self::PHASE_Expired:
 | 
			
		||||
        $where[] = 'leaseExpires < UNIX_TIMESTAMP()';
 | 
			
		||||
        break;
 | 
			
		||||
      default:
 | 
			
		||||
        throw new Exception("Unknown phase '{$phase}'!");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if ($this->ids) {
 | 
			
		||||
      $where[] = qsprintf(
 | 
			
		||||
        $conn_w,
 | 
			
		||||
        'id IN (%Ld)',
 | 
			
		||||
        $this->ids);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return $this->formatWhereClause($where);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private function buildOrderClause(AphrontDatabaseConnection $conn_w) {
 | 
			
		||||
    return qsprintf($conn_w, 'ORDER BY id ASC');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) {
 | 
			
		||||
    return qsprintf($conn_w, 'LIMIT %d', $limit);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private function getLeaseOwnershipName() {
 | 
			
		||||
    static $name = null;
 | 
			
		||||
    if ($name === null) {
 | 
			
		||||
      $name = getmypid().':'.time().':'.php_uname('n');
 | 
			
		||||
    }
 | 
			
		||||
    return $name;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user