From f5f784f4c1d03473567a8b7ce7b05bb5bb0d782a Mon Sep 17 00:00:00 2001 From: epriestley Date: Fri, 27 May 2016 06:21:19 -0700 Subject: [PATCH] Version clustered, observed repositories in a reasonable way (by largest discovered HEAD) Summary: Ref T4292. For hosted, clustered repositories we have a good way to increment the internal version of the repository: every time a user pushes something, we increment the version by 1. We don't have a great way to do this for observed/remote repositories because when we `git fetch` we might get nothing, or we might get some changes, and we can't easily tell //what// changes we got. For example, if we see that another node is at "version 97", and we do a fetch and see some changes, we don't know if we're in sync with them (i.e., also at "version 97") or ahead of them (at "version 98"). This implements a simple way to version an observed repository: - Take the head of every branch/tag. - Look them up. - Pick the biggest internal ID number. This will work //except// when branches are deleted, which could cause the version to go backward if the "biggest commit" is the one that was deleted. This should be OK, since it's rare and the effects are minor and the repository will "self-heal" on the next actual push. Test Plan: - Created an observed repository. - Ran `bin/repository update` and observed a sensible version number appear in the version table. - Pushed to the remote, did another update, saw a sensible update. - Did an update with no push, saw no effect on version number. - Toggled repository to hosted, saw the version reset. - Simulated read traffic to out-of-sync node, saw it do a remote fetch. Reviewers: chad Reviewed By: chad Maniphest Tasks: T4292 Differential Revision: https://secure.phabricator.com/D15986 --- .../DiffusionBranchQueryConduitAPIMethod.php | 5 +- .../DiffusionTagsQueryConduitAPIMethod.php | 5 +- .../diffusion/editor/DiffusionURIEditor.php | 13 ++ ...fusionRepositoryStorageManagementPanel.php | 28 ++- .../DiffusionRepositoryClusterEngine.php | 187 +++++++++++++++--- .../lowlevel/DiffusionLowLevelGitRefQuery.php | 35 ++-- .../DiffusionLowLevelResolveRefsQuery.php | 7 +- .../PhabricatorRepositoryDiscoveryEngine.php | 62 +++++- .../PhabricatorRepositoryPullEngine.php | 34 ++-- .../engine/PhabricatorRepositoryRefEngine.php | 10 +- 10 files changed, 316 insertions(+), 70 deletions(-) diff --git a/src/applications/diffusion/conduit/DiffusionBranchQueryConduitAPIMethod.php b/src/applications/diffusion/conduit/DiffusionBranchQueryConduitAPIMethod.php index 5fd440e4b2..d42c45a96b 100644 --- a/src/applications/diffusion/conduit/DiffusionBranchQueryConduitAPIMethod.php +++ b/src/applications/diffusion/conduit/DiffusionBranchQueryConduitAPIMethod.php @@ -56,7 +56,10 @@ final class DiffusionBranchQueryConduitAPIMethod } else { $refs = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsOriginBranch(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_BRANCH, + )) ->execute(); } diff --git a/src/applications/diffusion/conduit/DiffusionTagsQueryConduitAPIMethod.php b/src/applications/diffusion/conduit/DiffusionTagsQueryConduitAPIMethod.php index ddf3a2152b..3de5793289 100644 --- a/src/applications/diffusion/conduit/DiffusionTagsQueryConduitAPIMethod.php +++ b/src/applications/diffusion/conduit/DiffusionTagsQueryConduitAPIMethod.php @@ -72,7 +72,10 @@ final class DiffusionTagsQueryConduitAPIMethod $refs = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsTag(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_TAG, + )) ->execute(); $tags = array(); diff --git a/src/applications/diffusion/editor/DiffusionURIEditor.php b/src/applications/diffusion/editor/DiffusionURIEditor.php index 020275a367..c22b888ac5 100644 --- a/src/applications/diffusion/editor/DiffusionURIEditor.php +++ b/src/applications/diffusion/editor/DiffusionURIEditor.php @@ -463,6 +463,8 @@ final class DiffusionURIEditor break; } + $was_hosted = $repository->isHosted(); + if ($observe_uri) { $repository ->setHosted(false) @@ -477,6 +479,17 @@ final class DiffusionURIEditor $repository->save(); + $is_hosted = $repository->isHosted(); + + // If we've swapped the repository from hosted to observed or vice versa, + // reset all the cluster version clocks. + if ($was_hosted != $is_hosted) { + $cluster_engine = id(new DiffusionRepositoryClusterEngine()) + ->setViewer($this->getActor()) + ->setRepository($repository) + ->synchronizeWorkingCopyAfterHostingChange(); + } + return $xactions; } diff --git a/src/applications/diffusion/management/DiffusionRepositoryStorageManagementPanel.php b/src/applications/diffusion/management/DiffusionRepositoryStorageManagementPanel.php index 0c723bd9e6..9cf210632d 100644 --- a/src/applications/diffusion/management/DiffusionRepositoryStorageManagementPanel.php +++ b/src/applications/diffusion/management/DiffusionRepositoryStorageManagementPanel.php @@ -137,12 +137,28 @@ final class DiffusionRepositoryStorageManagementPanel $version = idx($versions, $device->getPHID()); if ($version) { $version_number = $version->getRepositoryVersion(); - $version_number = phutil_tag( - 'a', - array( - 'href' => "/diffusion/pushlog/view/{$version_number}/", - ), - $version_number); + + $href = null; + if ($repository->isHosted()) { + $href = "/diffusion/pushlog/view/{$version_number}/"; + } else { + $commit = id(new DiffusionCommitQuery()) + ->setViewer($viewer) + ->withIDs(array($version_number)) + ->executeOne(); + if ($commit) { + $href = $commit->getURI(); + } + } + + if ($href) { + $version_number = phutil_tag( + 'a', + array( + 'href' => $href, + ), + $version_number); + } } else { $version_number = '-'; } diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php index b271c16741..fad8d4019e 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -82,6 +82,53 @@ final class DiffusionRepositoryClusterEngine extends Phobject { } + /** + * @task sync + */ + public function synchronizeWorkingCopyAfterHostingChange() { + if (!$this->shouldEnableSynchronization()) { + return; + } + + $repository = $this->getRepository(); + $repository_phid = $repository->getPHID(); + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + $versions = mpull($versions, null, 'getDevicePHID'); + + // After converting a hosted repository to observed, or vice versa, we + // need to reset version numbers because the clocks for observed and hosted + // repositories run on different units. + + // We identify all the cluster leaders and reset their version to 0. + // We identify all the cluster followers and demote them. + + // This allows the cluter to start over again at version 0 but keep the + // same leaders. + + if ($versions) { + $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); + foreach ($versions as $version) { + $device_phid = $version->getDevicePHID(); + + if ($version->getRepositoryVersion() == $max_version) { + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $device_phid, + 0); + } else { + PhabricatorRepositoryWorkingCopyVersion::demoteDevice( + $repository_phid, + $device_phid); + } + } + } + + return $this; + } + + /** * @task sync */ @@ -149,14 +196,18 @@ final class DiffusionRepositoryClusterEngine extends Phobject { $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); if ($max_version > $this_version) { - $fetchable = array(); - foreach ($versions as $version) { - if ($version->getRepositoryVersion() == $max_version) { - $fetchable[] = $version->getDevicePHID(); + if ($repository->isHosted()) { + $fetchable = array(); + foreach ($versions as $version) { + if ($version->getRepositoryVersion() == $max_version) { + $fetchable[] = $version->getDevicePHID(); + } } - } - $this->synchronizeWorkingCopyFromDevices($fetchable); + $this->synchronizeWorkingCopyFromDevices($fetchable); + } else { + $this->synchornizeWorkingCopyFromRemote(); + } PhabricatorRepositoryWorkingCopyVersion::updateVersion( $repository_phid, @@ -329,6 +380,47 @@ final class DiffusionRepositoryClusterEngine extends Phobject { } + public function synchronizeWorkingCopyAfterDiscovery($new_version) { + if (!$this->shouldEnableSynchronization()) { + return; + } + + $repository = $this->getRepository(); + $repository_phid = $repository->getPHID(); + if ($repository->isHosted()) { + return; + } + + $viewer = $this->getViewer(); + + $device = AlmanacKeys::getLiveDevice(); + $device_phid = $device->getPHID(); + + // NOTE: We are not holding a lock here because this method is only called + // from PhabricatorRepositoryDiscoveryEngine, which already holds a device + // lock. Even if we do race here and record an older version, the + // consequences are mild: we only do extra work to correct it later. + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + $versions = mpull($versions, null, 'getDevicePHID'); + + $this_version = idx($versions, $device_phid); + if ($this_version) { + $this_version = (int)$this_version->getRepositoryVersion(); + } else { + $this_version = -1; + } + + if ($new_version > $this_version) { + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $device_phid, + $new_version); + } + } + + /** * @task sync */ @@ -471,13 +563,6 @@ final class DiffusionRepositoryClusterEngine extends Phobject { return false; } - // TODO: It may eventually make sense to try to version and synchronize - // observed repositories (so that daemons don't do reads against out-of - // date hosts), but don't bother for now. - if (!$repository->isHosted()) { - return false; - } - $device = AlmanacKeys::getLiveDevice(); if (!$device) { return false; @@ -487,6 +572,50 @@ final class DiffusionRepositoryClusterEngine extends Phobject { } + /** + * @task internal + */ + private function synchornizeWorkingCopyFromRemote() { + $repository = $this->getRepository(); + $device = AlmanacKeys::getLiveDevice(); + + $local_path = $repository->getLocalPath(); + $fetch_uri = $repository->getRemoteURIEnvelope(); + + if ($repository->isGit()) { + $this->requireWorkingCopy(); + + $argv = array( + 'fetch --prune -- %P %s', + $fetch_uri, + '+refs/*:refs/*', + ); + } else { + throw new Exception(pht('Remote sync only supported for git!')); + } + + $future = DiffusionCommandEngine::newCommandEngine($repository) + ->setArgv($argv) + ->setSudoAsDaemon(true) + ->setCredentialPHID($repository->getCredentialPHID()) + ->setProtocol($repository->getRemoteProtocol()) + ->newFuture(); + + $future->setCWD($local_path); + + try { + $future->resolvex(); + } catch (Exception $ex) { + $this->logLine( + pht( + 'Synchronization of "%s" from remote failed: %s', + $device->getName(), + $ex->getMessage())); + throw $ex; + } + } + + /** * @task internal */ @@ -560,17 +689,7 @@ final class DiffusionRepositoryClusterEngine extends Phobject { $local_path = $repository->getLocalPath(); if ($repository->isGit()) { - if (!Filesystem::pathExists($local_path)) { - throw new Exception( - pht( - 'Repository "%s" does not have a working copy on this device '. - 'yet, so it can not be synchronized. Wait for the daemons to '. - 'construct one or run `bin/repository update %s` on this host '. - '("%s") to build it explicitly.', - $repository->getDisplayName(), - $repository->getMonogram(), - $device->getName())); - } + $this->requireWorkingCopy(); $argv = array( 'fetch --prune -- %s %s', @@ -622,4 +741,24 @@ final class DiffusionRepositoryClusterEngine extends Phobject { } return $this; } + + private function requireWorkingCopy() { + $repository = $this->getRepository(); + $local_path = $repository->getLocalPath(); + + if (!Filesystem::pathExists($local_path)) { + $device = AlmanacKeys::getLiveDevice(); + + throw new Exception( + pht( + 'Repository "%s" does not have a working copy on this device '. + 'yet, so it can not be synchronized. Wait for the daemons to '. + 'construct one or run `bin/repository update %s` on this host '. + '("%s") to build it explicitly.', + $repository->getDisplayName(), + $repository->getMonogram(), + $device->getName())); + } + } + } diff --git a/src/applications/diffusion/query/lowlevel/DiffusionLowLevelGitRefQuery.php b/src/applications/diffusion/query/lowlevel/DiffusionLowLevelGitRefQuery.php index 1f17f5fe9f..038d833670 100644 --- a/src/applications/diffusion/query/lowlevel/DiffusionLowLevelGitRefQuery.php +++ b/src/applications/diffusion/query/lowlevel/DiffusionLowLevelGitRefQuery.php @@ -6,30 +6,33 @@ */ final class DiffusionLowLevelGitRefQuery extends DiffusionLowLevelQuery { - private $isTag; - private $isOriginBranch; + private $refTypes; - public function withIsTag($is_tag) { - $this->isTag = $is_tag; - return $this; - } - - public function withIsOriginBranch($is_origin_branch) { - $this->isOriginBranch = $is_origin_branch; + public function withRefTypes(array $ref_types) { + $this->refTypes = $ref_types; return $this; } protected function executeQuery() { + $ref_types = $this->refTypes; + if ($ref_types) { + $type_branch = PhabricatorRepositoryRefCursor::TYPE_BRANCH; + $type_tag = PhabricatorRepositoryRefCursor::TYPE_TAG; + + $ref_types = array_fuse($ref_types); + + $with_branches = isset($ref_types[$type_branch]); + $with_tags = isset($ref_types[$type_tag]); + } else { + $with_branches = true; + $with_tags = true; + } + $repository = $this->getRepository(); $prefixes = array(); - $any = ($this->isTag || $this->isOriginBranch); - if (!$any) { - throw new Exception(pht('Specify types of refs to query.')); - } - - if ($this->isOriginBranch) { + if ($with_branches) { if ($repository->isWorkingCopyBare()) { $prefix = 'refs/heads/'; } else { @@ -39,7 +42,7 @@ final class DiffusionLowLevelGitRefQuery extends DiffusionLowLevelQuery { $prefixes[] = $prefix; } - if ($this->isTag) { + if ($with_tags) { $prefixes[] = 'refs/tags/'; } diff --git a/src/applications/diffusion/query/lowlevel/DiffusionLowLevelResolveRefsQuery.php b/src/applications/diffusion/query/lowlevel/DiffusionLowLevelResolveRefsQuery.php index 4e9ed246d9..8f9493a67a 100644 --- a/src/applications/diffusion/query/lowlevel/DiffusionLowLevelResolveRefsQuery.php +++ b/src/applications/diffusion/query/lowlevel/DiffusionLowLevelResolveRefsQuery.php @@ -66,8 +66,11 @@ final class DiffusionLowLevelResolveRefsQuery // First, resolve branches and tags. $ref_map = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsTag(true) - ->withIsOriginBranch(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_BRANCH, + PhabricatorRepositoryRefCursor::TYPE_TAG, + )) ->execute(); $ref_map = mgroup($ref_map, 'getShortName'); diff --git a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php index 78a40986e3..9e0e13c720 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryDiscoveryEngine.php @@ -63,6 +63,7 @@ final class PhabricatorRepositoryDiscoveryEngine private function discoverCommitsWithLock() { $repository = $this->getRepository(); + $viewer = $this->getViewer(); $vcs = $repository->getVersionControlSystem(); switch ($vcs) { @@ -104,6 +105,14 @@ final class PhabricatorRepositoryDiscoveryEngine $this->commitCache[$ref->getIdentifier()] = true; } + $version = $this->getObservedVersion($repository); + if ($version !== null) { + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyAfterDiscovery($version); + } + return $refs; } @@ -121,9 +130,15 @@ final class PhabricatorRepositoryDiscoveryEngine $this->verifyGitOrigin($repository); } + // TODO: This should also import tags, but some of the logic is still + // branch-specific today. + $branches = id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsOriginBranch(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_BRANCH, + )) ->execute(); if (!$branches) { @@ -642,4 +657,49 @@ final class PhabricatorRepositoryDiscoveryEngine return true; } + + private function getObservedVersion(PhabricatorRepository $repository) { + if ($repository->isHosted()) { + return null; + } + + if ($repository->isGit()) { + return $this->getGitObservedVersion($repository); + } + + return null; + } + + private function getGitObservedVersion(PhabricatorRepository $repository) { + $refs = id(new DiffusionLowLevelGitRefQuery()) + ->setRepository($repository) + ->execute(); + if (!$refs) { + return null; + } + + // In Git, the observed version is the most recently discovered commit + // at any repository HEAD. It's possible for this to regress temporarily + // if a branch is pushed and then deleted. This is acceptable because it + // doesn't do anything meaningfully bad and will fix itself on the next + // push. + + $ref_identifiers = mpull($refs, 'getCommitIdentifier'); + $ref_identifiers = array_fuse($ref_identifiers); + + $version = queryfx_one( + $repository->establishConnection('w'), + 'SELECT MAX(id) version FROM %T WHERE repositoryID = %d + AND commitIdentifier IN (%Ls)', + id(new PhabricatorRepositoryCommit())->getTableName(), + $repository->getID(), + $ref_identifiers); + + if (!$version) { + return null; + } + + return (int)$version['version']; + } + } diff --git a/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php b/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php index 9e151ecd81..3d7dce90fa 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryPullEngine.php @@ -108,27 +108,27 @@ final class PhabricatorRepositoryPullEngine } else { $this->executeSubversionCreate(); } - } else { - if (!$repository->isHosted()) { - $this->logPull( - pht( - 'Updating the working copy for repository "%s".', - $repository->getDisplayName())); - if ($is_git) { - $this->verifyGitOrigin($repository); - $this->executeGitUpdate(); - } else if ($is_hg) { - $this->executeMercurialUpdate(); - } + } + + id(new DiffusionRepositoryClusterEngine()) + ->setViewer($viewer) + ->setRepository($repository) + ->synchronizeWorkingCopyBeforeRead(); + + if (!$repository->isHosted()) { + $this->logPull( + pht( + 'Updating the working copy for repository "%s".', + $repository->getDisplayName())); + if ($is_git) { + $this->verifyGitOrigin($repository); + $this->executeGitUpdate(); + } else if ($is_hg) { + $this->executeMercurialUpdate(); } } if ($repository->isHosted()) { - id(new DiffusionRepositoryClusterEngine()) - ->setViewer($viewer) - ->setRepository($repository) - ->synchronizeWorkingCopyBeforeRead(); - if ($is_git) { $this->installGitHook(); } else if ($is_svn) { diff --git a/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php b/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php index 3909cc83b0..da89db75e0 100644 --- a/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php +++ b/src/applications/repository/engine/PhabricatorRepositoryRefEngine.php @@ -452,7 +452,10 @@ final class PhabricatorRepositoryRefEngine private function loadGitBranchPositions(PhabricatorRepository $repository) { return id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsOriginBranch(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_BRANCH, + )) ->execute(); } @@ -463,7 +466,10 @@ final class PhabricatorRepositoryRefEngine private function loadGitTagPositions(PhabricatorRepository $repository) { return id(new DiffusionLowLevelGitRefQuery()) ->setRepository($repository) - ->withIsTag(true) + ->withRefTypes( + array( + PhabricatorRepositoryRefCursor::TYPE_TAG, + )) ->execute(); }