Sybren A. Stüvel
4e8e71e4e2
This is not yet working, it's just a direct copy of the Manager of Flamenco 2, with Logrus replaced by Zerolog. The API has been documented in flamenco-manager.yaml as a starting point for the integration.
231 lines
7.3 KiB
Python
231 lines
7.3 KiB
Python
#!/usr/bin/env python3.7
|
|
|
|
import argparse
|
|
import atexit
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import random
|
|
import sys
|
|
import typing
|
|
|
|
import requests
|
|
|
|
import filesystemstuff
|
|
import httpstuff
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('root', type=Path)
|
|
parser.add_argument('shaman_url', type=str)
|
|
parser.add_argument('--checkout')
|
|
parser.add_argument('--sha-only', default=False, action='store_true')
|
|
parser.add_argument('--cleanup', default=False, action='store_true', help='Clean up cache files and exit')
|
|
cli_args = parser.parse_args()
|
|
|
|
root = cli_args.root.resolve()
|
|
|
|
if cli_args.cleanup:
|
|
filesystemstuff.cleanup_cache()
|
|
raise SystemExit('CLEAN!')
|
|
|
|
shaman_url = httpstuff.normalise_url(cli_args.shaman_url)
|
|
|
|
session: requests.Session()
|
|
|
|
@dataclass
|
|
class FileInfo:
|
|
checksum: str
|
|
filesize: int
|
|
abspath: Path
|
|
|
|
global_fileinfo = {}
|
|
|
|
|
|
def feed_lines() -> typing.Iterable[typing.Tuple[Path, bytes, typing.Optional[Path]]]:
|
|
for filepath in filesystemstuff.find_files(root):
|
|
content_path, checksum = filesystemstuff.compute_cached_checksum(filepath)
|
|
filesize = filepath.stat().st_size
|
|
relpath = filepath.relative_to(root)
|
|
|
|
global_fileinfo[str(relpath)] = FileInfo(
|
|
checksum=checksum,
|
|
filesize=filesize,
|
|
abspath=filepath,
|
|
)
|
|
|
|
file_to_unlink = None if content_path == filepath else content_path
|
|
yield relpath, f'{checksum} {filesize} {relpath}\n'.encode('utf8'), file_to_unlink
|
|
|
|
|
|
def show_stats():
|
|
print('filesystemstuff stats:')
|
|
print(f' computing checksums: {filesystemstuff.TimeInfo.computing_checksums:.3f} seconds')
|
|
print(f' handling caching : {filesystemstuff.TimeInfo.checksum_cache_handling:.3f} seconds')
|
|
|
|
|
|
def feed(definition_file: bytes, valid_paths: typing.Set[str]) -> typing.Set[str]:
|
|
print(f'Feeding {root} to the Shaman')
|
|
resp = session.post(f'{shaman_url}checkout/requirements', data=definition_file, stream=True)
|
|
if resp.status_code >= 300:
|
|
raise SystemExit(f'Error {resp.status_code}: {resp.text}')
|
|
|
|
print('==========')
|
|
to_upload = deque()
|
|
for line in resp.iter_lines():
|
|
response, path = line.decode().split(' ', 1)
|
|
print(f'{response}\t{path}')
|
|
|
|
if path not in valid_paths:
|
|
raise RuntimeError(f'Shaman asked us for path {path!r} which we never offered')
|
|
|
|
if response == 'file-unknown':
|
|
to_upload.appendleft(path)
|
|
elif response == 'already-uploading':
|
|
to_upload.append(path)
|
|
elif response == 'ERROR':
|
|
print(f'ERROR RESPONSE: {path}')
|
|
return
|
|
else:
|
|
print(f'UNKNOWN RESPONSE {response!r} FOR PATH {path!r}')
|
|
return
|
|
|
|
print('==========')
|
|
print(f'Going to upload {len(to_upload)} files')
|
|
|
|
failed_paths = upload_files(to_upload)
|
|
|
|
if failed_paths:
|
|
print('Some files did not upload this iteration:')
|
|
for fname in sorted(failed_paths):
|
|
print(f' - {fname}')
|
|
|
|
return failed_paths
|
|
|
|
|
|
def upload_files(to_upload: typing.Deque[str]) -> typing.Set[str]:
|
|
failed_paths = set()
|
|
deferred_paths = set()
|
|
|
|
def defer(some_path: str):
|
|
nonlocal to_upload
|
|
|
|
print(' - Shaman asked us to defer uploading this file.')
|
|
deferred_paths.add(some_path)
|
|
|
|
# Instead of deferring this one file, randomize the files to upload.
|
|
# This prevents multiple deferrals when someone else is uploading
|
|
# files from the same project (because it probably happens alphabetically).
|
|
all_files = list(to_upload)
|
|
random.shuffle(all_files)
|
|
to_upload = deque(all_files)
|
|
to_upload.append(some_path)
|
|
|
|
MAX_DEFERRED_PATHS = 8
|
|
MAX_FAILED_PATHS = 8
|
|
|
|
while to_upload:
|
|
# After too many failures, just retry to get a fresh set of files to upload.
|
|
if len(failed_paths) > MAX_FAILED_PATHS:
|
|
print('Too many failures, going to abort this iteration')
|
|
failed_paths.update(to_upload)
|
|
return failed_paths
|
|
|
|
path = to_upload.popleft()
|
|
fileinfo = global_fileinfo[path]
|
|
|
|
headers = {
|
|
'X-Shaman-Original-Filename': path
|
|
}
|
|
|
|
# Let the Shaman know whether we can defer uploading this file or not.
|
|
can_defer = bool(len(deferred_paths) < MAX_DEFERRED_PATHS and path not in deferred_paths and len(to_upload))
|
|
if can_defer:
|
|
headers['X-Shaman-Can-Defer-Upload'] = 'true'
|
|
|
|
print(f'Uploading {path} ; can_defer={can_defer}')
|
|
try:
|
|
with fileinfo.abspath.open('rb') as infile:
|
|
resp = session.post(
|
|
f'{shaman_url}files/{fileinfo.checksum}/{fileinfo.filesize}',
|
|
data=infile, headers=headers)
|
|
resp.raise_for_status()
|
|
|
|
if resp.status_code == 208:
|
|
if can_defer:
|
|
defer(path)
|
|
else:
|
|
print(' - Someone else already finished uploading this file.')
|
|
|
|
except requests.ConnectionError as ex:
|
|
if can_defer:
|
|
# Closing the connection with an 'X-Shaman-Can-Defer-Upload: true' header
|
|
# indicates that we should defer the upload.
|
|
defer(path)
|
|
else:
|
|
print(f'Error uploading {path}, might retry later: {ex}')
|
|
failed_paths.add(path)
|
|
else:
|
|
failed_paths.discard(path)
|
|
|
|
return failed_paths
|
|
|
|
|
|
def main():
|
|
global session
|
|
|
|
# Get an authentication token.
|
|
resp = requests.get(f'{shaman_url}get-token')
|
|
resp.raise_for_status()
|
|
session = httpstuff.session(token=resp.text)
|
|
|
|
paths_to_unlink = set()
|
|
def unlink_temp_paths():
|
|
for path in paths_to_unlink:
|
|
try:
|
|
if path.exists():
|
|
path.unlink()
|
|
except Exception as ex:
|
|
print(f'Error deleting {path}: {ex}')
|
|
|
|
|
|
atexit.register(filesystemstuff.cleanup_cache)
|
|
atexit.register(show_stats)
|
|
atexit.register(unlink_temp_paths)
|
|
|
|
print(f'Creating Shaman definition file from {root}')
|
|
allowed_paths = set()
|
|
definition_lines = []
|
|
for relpath, line, content_path in feed_lines():
|
|
allowed_paths.add(str(relpath))
|
|
definition_lines.append(line)
|
|
paths_to_unlink.add(content_path)
|
|
|
|
definition_file = b''.join(definition_lines)
|
|
print(f'Computed SHA sums, definition file is {len(definition_file) // 1024} KiB')
|
|
sys.stdout.buffer.write(definition_file)
|
|
if cli_args.sha_only:
|
|
return
|
|
|
|
for try_count in range(50):
|
|
print(f'========== Upload attempt {try_count+1}')
|
|
failed_paths = feed(definition_file, allowed_paths)
|
|
if not failed_paths:
|
|
break
|
|
|
|
print('==========')
|
|
if failed_paths:
|
|
raise SystemExit('Aborted due to repeated upload failure')
|
|
else:
|
|
print(f'All files uploaded succesfully in {try_count+1} iterations')
|
|
|
|
if cli_args.checkout:
|
|
print(f'Going to ask for a checkout with ID {cli_args.checkout}')
|
|
resp = session.post(f'{shaman_url}checkout/create/{cli_args.checkout}', data=definition_file)
|
|
resp.raise_for_status()
|
|
print(f'Received status {resp.status_code}: {resp.text}')
|
|
else:
|
|
print('Not asking for a checkout, use --checkout if you want this.')
|
|
|
|
if __name__ == '__main__':
|
|
main()
|