Separate repository updates from the pull daemon
Summary: Ref T4605. Currently, the PullLocal daemon is responsible for two relatively distinct things: - scheduling repository updates; and - actually updating repositories. Move the "actually updating" part into a new `bin/repository update` command, which basically runs the pull, discover, refs and mirror commands. This will let the parent process focus on scheduling in a more understandable way and update multiple repositories at once. It also makes it easier to debug and understand update behavior since the non-scheduling pipeline can be run separately. Test Plan: - Ran `update --trace` on SVN, Mercurial and Git repos. - Ran PullLocal daemon for a while without issues. Reviewers: btrahan Reviewed By: btrahan Subscribers: epriestley Maniphest Tasks: T4605 Differential Revision: https://secure.phabricator.com/D8780
This commit is contained in:
		| @@ -1964,6 +1964,7 @@ phutil_register_library_map(array( | |||||||
|     'PhabricatorRepositoryManagementMirrorWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementMirrorWorkflow.php', |     'PhabricatorRepositoryManagementMirrorWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementMirrorWorkflow.php', | ||||||
|     'PhabricatorRepositoryManagementPullWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementPullWorkflow.php', |     'PhabricatorRepositoryManagementPullWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementPullWorkflow.php', | ||||||
|     'PhabricatorRepositoryManagementRefsWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementRefsWorkflow.php', |     'PhabricatorRepositoryManagementRefsWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementRefsWorkflow.php', | ||||||
|  |     'PhabricatorRepositoryManagementUpdateWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementUpdateWorkflow.php', | ||||||
|     'PhabricatorRepositoryManagementWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementWorkflow.php', |     'PhabricatorRepositoryManagementWorkflow' => 'applications/repository/management/PhabricatorRepositoryManagementWorkflow.php', | ||||||
|     'PhabricatorRepositoryMercurialCommitChangeParserWorker' => 'applications/repository/worker/commitchangeparser/PhabricatorRepositoryMercurialCommitChangeParserWorker.php', |     'PhabricatorRepositoryMercurialCommitChangeParserWorker' => 'applications/repository/worker/commitchangeparser/PhabricatorRepositoryMercurialCommitChangeParserWorker.php', | ||||||
|     'PhabricatorRepositoryMercurialCommitMessageParserWorker' => 'applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php', |     'PhabricatorRepositoryMercurialCommitMessageParserWorker' => 'applications/repository/worker/commitmessageparser/PhabricatorRepositoryMercurialCommitMessageParserWorker.php', | ||||||
| @@ -4827,6 +4828,7 @@ phutil_register_library_map(array( | |||||||
|     'PhabricatorRepositoryManagementMirrorWorkflow' => 'PhabricatorRepositoryManagementWorkflow', |     'PhabricatorRepositoryManagementMirrorWorkflow' => 'PhabricatorRepositoryManagementWorkflow', | ||||||
|     'PhabricatorRepositoryManagementPullWorkflow' => 'PhabricatorRepositoryManagementWorkflow', |     'PhabricatorRepositoryManagementPullWorkflow' => 'PhabricatorRepositoryManagementWorkflow', | ||||||
|     'PhabricatorRepositoryManagementRefsWorkflow' => 'PhabricatorRepositoryManagementWorkflow', |     'PhabricatorRepositoryManagementRefsWorkflow' => 'PhabricatorRepositoryManagementWorkflow', | ||||||
|  |     'PhabricatorRepositoryManagementUpdateWorkflow' => 'PhabricatorRepositoryManagementWorkflow', | ||||||
|     'PhabricatorRepositoryManagementWorkflow' => 'PhabricatorManagementWorkflow', |     'PhabricatorRepositoryManagementWorkflow' => 'PhabricatorManagementWorkflow', | ||||||
|     'PhabricatorRepositoryMercurialCommitChangeParserWorker' => 'PhabricatorRepositoryCommitChangeParserWorker', |     'PhabricatorRepositoryMercurialCommitChangeParserWorker' => 'PhabricatorRepositoryCommitChangeParserWorker', | ||||||
|     'PhabricatorRepositoryMercurialCommitMessageParserWorker' => 'PhabricatorRepositoryCommitMessageParserWorker', |     'PhabricatorRepositoryMercurialCommitMessageParserWorker' => 'PhabricatorRepositoryCommitMessageParserWorker', | ||||||
|   | |||||||
| @@ -29,8 +29,6 @@ | |||||||
| final class PhabricatorRepositoryPullLocalDaemon | final class PhabricatorRepositoryPullLocalDaemon | ||||||
|   extends PhabricatorDaemon { |   extends PhabricatorDaemon { | ||||||
|  |  | ||||||
|   private $discoveryEngines = array(); |  | ||||||
|  |  | ||||||
|  |  | ||||||
| /* -(  Pulling Repositories  )----------------------------------------------- */ | /* -(  Pulling Repositories  )----------------------------------------------- */ | ||||||
|  |  | ||||||
| @@ -122,49 +120,28 @@ final class PhabricatorRepositoryPullLocalDaemon | |||||||
|         try { |         try { | ||||||
|           $this->log("Updating repository '{$callsign}'."); |           $this->log("Updating repository '{$callsign}'."); | ||||||
|  |  | ||||||
|           id(new PhabricatorRepositoryPullEngine()) |           $bin_dir = dirname(phutil_get_library_root('phabricator')).'/bin'; | ||||||
|             ->setRepository($repository) |           $flags = array(); | ||||||
|             ->pullRepository(); |           if ($no_discovery) { | ||||||
|  |             $flags[] = '--no-discovery'; | ||||||
|           if (!$no_discovery) { |  | ||||||
|             // TODO: It would be nice to discover only if we pulled something, |  | ||||||
|             // but this isn't totally trivial. It's slightly more complicated |  | ||||||
|             // with hosted repositories, too. |  | ||||||
|  |  | ||||||
|             $lock_name = get_class($this).':'.$callsign; |  | ||||||
|             $lock = PhabricatorGlobalLock::newLock($lock_name); |  | ||||||
|             $lock->lock(); |  | ||||||
|  |  | ||||||
|             try { |  | ||||||
|               $repository->writeStatusMessage( |  | ||||||
|                 PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, |  | ||||||
|                 null); |  | ||||||
|               $this->discoverRepository($repository); |  | ||||||
|               $this->updateRepositoryRefs($repository); |  | ||||||
|               $this->mirrorRepository($repository); |  | ||||||
|               $repository->writeStatusMessage( |  | ||||||
|                 PhabricatorRepositoryStatusMessage::TYPE_FETCH, |  | ||||||
|                 PhabricatorRepositoryStatusMessage::CODE_OKAY); |  | ||||||
|             } catch (Exception $ex) { |  | ||||||
|               $repository->writeStatusMessage( |  | ||||||
|                 PhabricatorRepositoryStatusMessage::TYPE_FETCH, |  | ||||||
|                 PhabricatorRepositoryStatusMessage::CODE_ERROR, |  | ||||||
|                 array( |  | ||||||
|                   'message' => pht( |  | ||||||
|                     'Error updating working copy: %s', $ex->getMessage()), |  | ||||||
|                 )); |  | ||||||
|               $lock->unlock(); |  | ||||||
|               throw $ex; |  | ||||||
|           } |           } | ||||||
|  |  | ||||||
|             $lock->unlock(); |           list($stdout, $stderr) = execx( | ||||||
|  |             '%s/repository update %Ls -- %s', | ||||||
|  |             $bin_dir, | ||||||
|  |             $flags, | ||||||
|  |             $callsign); | ||||||
|  |  | ||||||
|  |           if (strlen($stderr)) { | ||||||
|  |             $stderr_msg = pht( | ||||||
|  |               'Unexpected output while updating the %s repository: %s', | ||||||
|  |               $callsign, | ||||||
|  |               $stderr); | ||||||
|  |             phlog($stderr_msg); | ||||||
|           } |           } | ||||||
|  |  | ||||||
|           $sleep_for = $repository->getDetail('pull-frequency', $min_sleep); |           $sleep_for = $repository->getDetail('pull-frequency', $min_sleep); | ||||||
|           $retry_after[$id] = time() + $sleep_for; |           $retry_after[$id] = time() + $sleep_for; | ||||||
|         } catch (PhutilLockException $ex) { |  | ||||||
|           $retry_after[$id] = time() + $min_sleep; |  | ||||||
|           $this->log("Failed to acquire lock."); |  | ||||||
|         } catch (Exception $ex) { |         } catch (Exception $ex) { | ||||||
|           $retry_after[$id] = time() + $min_sleep; |           $retry_after[$id] = time() + $min_sleep; | ||||||
|  |  | ||||||
| @@ -224,85 +201,4 @@ final class PhabricatorRepositoryPullLocalDaemon | |||||||
|     return $repos; |     return $repos; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   public function discoverRepository(PhabricatorRepository $repository) { |  | ||||||
|     $refs = $this->getDiscoveryEngine($repository) |  | ||||||
|       ->discoverCommits(); |  | ||||||
|  |  | ||||||
|     $this->checkIfRepositoryIsFullyImported($repository); |  | ||||||
|  |  | ||||||
|     return (bool)count($refs); |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   private function mirrorRepository(PhabricatorRepository $repository) { |  | ||||||
|     try { |  | ||||||
|       id(new PhabricatorRepositoryMirrorEngine()) |  | ||||||
|         ->setRepository($repository) |  | ||||||
|         ->pushToMirrors(); |  | ||||||
|     } catch (Exception $ex) { |  | ||||||
|       // TODO: We should report these into the UI properly, but for |  | ||||||
|       // now just complain. These errors are much less severe than |  | ||||||
|       // pull errors. |  | ||||||
|       $proxy = new PhutilProxyException( |  | ||||||
|         pht( |  | ||||||
|           'Error while pushing "%s" repository to mirrors.', |  | ||||||
|           $repository->getCallsign()), |  | ||||||
|         $ex); |  | ||||||
|       phlog($proxy); |  | ||||||
|     } |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   private function updateRepositoryRefs(PhabricatorRepository $repository) { |  | ||||||
|     id(new PhabricatorRepositoryRefEngine()) |  | ||||||
|       ->setRepository($repository) |  | ||||||
|       ->updateRefs(); |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   private function getDiscoveryEngine(PhabricatorRepository $repository) { |  | ||||||
|     $id = $repository->getID(); |  | ||||||
|     if (empty($this->discoveryEngines[$id])) { |  | ||||||
|       $engine = id(new PhabricatorRepositoryDiscoveryEngine()) |  | ||||||
|         ->setRepository($repository) |  | ||||||
|         ->setVerbose($this->getVerbose()); |  | ||||||
|  |  | ||||||
|       $this->discoveryEngines[$id] = $engine; |  | ||||||
|     } |  | ||||||
|     return $this->discoveryEngines[$id]; |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   private function checkIfRepositoryIsFullyImported( |  | ||||||
|     PhabricatorRepository $repository) { |  | ||||||
|  |  | ||||||
|     // Check if the repository has the "Importing" flag set. We want to clear |  | ||||||
|     // the flag if we can. |  | ||||||
|     $importing = $repository->getDetail('importing'); |  | ||||||
|     if (!$importing) { |  | ||||||
|       // This repository isn't marked as "Importing", so we're done. |  | ||||||
|       return; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     // Look for any commit which hasn't imported. |  | ||||||
|     $unparsed_commit = queryfx_one( |  | ||||||
|       $repository->establishConnection('r'), |  | ||||||
|       'SELECT * FROM %T WHERE repositoryID = %d AND (importStatus & %d) != %d |  | ||||||
|         LIMIT 1', |  | ||||||
|       id(new PhabricatorRepositoryCommit())->getTableName(), |  | ||||||
|       $repository->getID(), |  | ||||||
|       PhabricatorRepositoryCommit::IMPORTED_ALL, |  | ||||||
|       PhabricatorRepositoryCommit::IMPORTED_ALL); |  | ||||||
|     if ($unparsed_commit) { |  | ||||||
|       // We found a commit which still needs to import, so we can't clear the |  | ||||||
|       // flag. |  | ||||||
|       return; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     // Clear the "importing" flag. |  | ||||||
|     $repository->openTransaction(); |  | ||||||
|       $repository->beginReadLocking(); |  | ||||||
|         $repository = $repository->reload(); |  | ||||||
|         $repository->setDetail('importing', false); |  | ||||||
|         $repository->save(); |  | ||||||
|       $repository->endReadLocking(); |  | ||||||
|     $repository->saveTransaction(); |  | ||||||
|   } |  | ||||||
|  |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -0,0 +1,178 @@ | |||||||
|  | <?php | ||||||
|  |  | ||||||
|  | final class PhabricatorRepositoryManagementUpdateWorkflow | ||||||
|  |   extends PhabricatorRepositoryManagementWorkflow { | ||||||
|  |  | ||||||
|  |   private $verbose; | ||||||
|  |  | ||||||
|  |   public function setVerbose($verbose) { | ||||||
|  |     $this->verbose = $verbose; | ||||||
|  |     return $this; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   public function getVerbose() { | ||||||
|  |     return $this->verbose; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   public function didConstruct() { | ||||||
|  |     $this | ||||||
|  |       ->setName('update') | ||||||
|  |       ->setExamples('**update** [options] __repository__') | ||||||
|  |       ->setSynopsis( | ||||||
|  |         pht( | ||||||
|  |           'Update __repository__, named by callsign. '. | ||||||
|  |           'This performs the __pull__, __discover__, __ref__ and __mirror__ '. | ||||||
|  |           'operations and is primarily an internal workflow.')) | ||||||
|  |       ->setArguments( | ||||||
|  |         array( | ||||||
|  |           array( | ||||||
|  |             'name'        => 'verbose', | ||||||
|  |             'help'        => 'Show additional debugging information.', | ||||||
|  |           ), | ||||||
|  |           array( | ||||||
|  |             'name'        => 'no-discovery', | ||||||
|  |             'help'        => 'Do not perform discovery.', | ||||||
|  |           ), | ||||||
|  |           array( | ||||||
|  |             'name'        => 'repos', | ||||||
|  |             'wildcard'    => true, | ||||||
|  |           ), | ||||||
|  |         )); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   public function execute(PhutilArgumentParser $args) { | ||||||
|  |     $this->setVerbose($args->getArg('verbose')); | ||||||
|  |  | ||||||
|  |     $repos = $this->loadRepositories($args, 'repos'); | ||||||
|  |     if (count($repos) !== 1) { | ||||||
|  |       throw new PhutilArgumentUsageException( | ||||||
|  |         pht('Specify exactly one repository to update, by callsign.')); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     $repository = head($repos); | ||||||
|  |     $callsign = $repository->getCallsign(); | ||||||
|  |  | ||||||
|  |     $no_discovery = $args->getArg('no-discovery'); | ||||||
|  |  | ||||||
|  |     id(new PhabricatorRepositoryPullEngine()) | ||||||
|  |       ->setRepository($repository) | ||||||
|  |       ->setVerbose($this->getVerbose()) | ||||||
|  |       ->pullRepository(); | ||||||
|  |  | ||||||
|  |     if ($no_discovery) { | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // TODO: It would be nice to discover only if we pulled something, but this | ||||||
|  |     // isn't totally trivial. It's slightly more complicated with hosted | ||||||
|  |     // repositories, too. | ||||||
|  |  | ||||||
|  |     $lock_name = get_class($this).':'.$callsign; | ||||||
|  |     $lock = PhabricatorGlobalLock::newLock($lock_name); | ||||||
|  |  | ||||||
|  |     $lock->lock(); | ||||||
|  |  | ||||||
|  |     try { | ||||||
|  |       $repository->writeStatusMessage( | ||||||
|  |         PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE, | ||||||
|  |         null); | ||||||
|  |  | ||||||
|  |       $this->discoverRepository($repository); | ||||||
|  |  | ||||||
|  |       $this->checkIfRepositoryIsFullyImported($repository); | ||||||
|  |  | ||||||
|  |       $this->updateRepositoryRefs($repository); | ||||||
|  |  | ||||||
|  |       $this->mirrorRepository($repository); | ||||||
|  |  | ||||||
|  |       $repository->writeStatusMessage( | ||||||
|  |         PhabricatorRepositoryStatusMessage::TYPE_FETCH, | ||||||
|  |         PhabricatorRepositoryStatusMessage::CODE_OKAY); | ||||||
|  |     } catch (Exception $ex) { | ||||||
|  |       $repository->writeStatusMessage( | ||||||
|  |         PhabricatorRepositoryStatusMessage::TYPE_FETCH, | ||||||
|  |         PhabricatorRepositoryStatusMessage::CODE_ERROR, | ||||||
|  |         array( | ||||||
|  |           'message' => pht( | ||||||
|  |             'Error updating working copy: %s', $ex->getMessage()), | ||||||
|  |         )); | ||||||
|  |  | ||||||
|  |       $lock->unlock(); | ||||||
|  |       throw $ex; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     $lock->unlock(); | ||||||
|  |  | ||||||
|  |     return 0; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   private function discoverRepository(PhabricatorRepository $repository) { | ||||||
|  |     $refs = id(new PhabricatorRepositoryDiscoveryEngine()) | ||||||
|  |       ->setRepository($repository) | ||||||
|  |       ->setVerbose($this->getVerbose()) | ||||||
|  |       ->discoverCommits(); | ||||||
|  |  | ||||||
|  |     return (bool)count($refs); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   private function mirrorRepository(PhabricatorRepository $repository) { | ||||||
|  |     try { | ||||||
|  |       id(new PhabricatorRepositoryMirrorEngine()) | ||||||
|  |         ->setRepository($repository) | ||||||
|  |         ->pushToMirrors(); | ||||||
|  |     } catch (Exception $ex) { | ||||||
|  |       // TODO: We should report these into the UI properly, but for now just | ||||||
|  |       // complain. These errors are much less severe than pull errors. | ||||||
|  |       $proxy = new PhutilProxyException( | ||||||
|  |         pht( | ||||||
|  |           'Error while pushing "%s" repository to mirrors.', | ||||||
|  |           $repository->getCallsign()), | ||||||
|  |         $ex); | ||||||
|  |       phlog($proxy); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   private function updateRepositoryRefs(PhabricatorRepository $repository) { | ||||||
|  |     id(new PhabricatorRepositoryRefEngine()) | ||||||
|  |       ->setRepository($repository) | ||||||
|  |       ->updateRefs(); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   private function checkIfRepositoryIsFullyImported( | ||||||
|  |     PhabricatorRepository $repository) { | ||||||
|  |  | ||||||
|  |     // Check if the repository has the "Importing" flag set. We want to clear | ||||||
|  |     // the flag if we can. | ||||||
|  |     $importing = $repository->getDetail('importing'); | ||||||
|  |     if (!$importing) { | ||||||
|  |       // This repository isn't marked as "Importing", so we're done. | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // Look for any commit which hasn't imported. | ||||||
|  |     $unparsed_commit = queryfx_one( | ||||||
|  |       $repository->establishConnection('r'), | ||||||
|  |       'SELECT * FROM %T WHERE repositoryID = %d AND (importStatus & %d) != %d | ||||||
|  |         LIMIT 1', | ||||||
|  |       id(new PhabricatorRepositoryCommit())->getTableName(), | ||||||
|  |       $repository->getID(), | ||||||
|  |       PhabricatorRepositoryCommit::IMPORTED_ALL, | ||||||
|  |       PhabricatorRepositoryCommit::IMPORTED_ALL); | ||||||
|  |     if ($unparsed_commit) { | ||||||
|  |       // We found a commit which still needs to import, so we can't clear the | ||||||
|  |       // flag. | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // Clear the "importing" flag. | ||||||
|  |     $repository->openTransaction(); | ||||||
|  |       $repository->beginReadLocking(); | ||||||
|  |         $repository = $repository->reload(); | ||||||
|  |         $repository->setDetail('importing', false); | ||||||
|  |         $repository->save(); | ||||||
|  |       $repository->endReadLocking(); | ||||||
|  |     $repository->saveTransaction(); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 epriestley
					epriestley