flamenco/pkg/shaman/_test_file_store/stored/80/b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3/7488.blob
Sybren A. Stüvel 4e8e71e4e2 Initial checkin of Shaman of Flamenco 2
This is not yet working, it's just a direct copy of the Manager of Flamenco
2, with Logrus replaced by Zerolog. The API has been documented in
flamenco-manager.yaml as a starting point for the integration.
2022-03-25 14:10:26 +01:00

231 lines
7.3 KiB
Python

#!/usr/bin/env python3.7
import argparse
import atexit
from collections import deque
from dataclasses import dataclass
from pathlib import Path
import random
import sys
import typing
import requests
import filesystemstuff
import httpstuff
parser = argparse.ArgumentParser()
parser.add_argument('root', type=Path)
parser.add_argument('shaman_url', type=str)
parser.add_argument('--checkout')
parser.add_argument('--sha-only', default=False, action='store_true')
parser.add_argument('--cleanup', default=False, action='store_true', help='Clean up cache files and exit')
cli_args = parser.parse_args()
root = cli_args.root.resolve()
if cli_args.cleanup:
filesystemstuff.cleanup_cache()
raise SystemExit('CLEAN!')
shaman_url = httpstuff.normalise_url(cli_args.shaman_url)
session: requests.Session()
@dataclass
class FileInfo:
checksum: str
filesize: int
abspath: Path
global_fileinfo = {}
def feed_lines() -> typing.Iterable[typing.Tuple[Path, bytes, typing.Optional[Path]]]:
for filepath in filesystemstuff.find_files(root):
content_path, checksum = filesystemstuff.compute_cached_checksum(filepath)
filesize = filepath.stat().st_size
relpath = filepath.relative_to(root)
global_fileinfo[str(relpath)] = FileInfo(
checksum=checksum,
filesize=filesize,
abspath=filepath,
)
file_to_unlink = None if content_path == filepath else content_path
yield relpath, f'{checksum} {filesize} {relpath}\n'.encode('utf8'), file_to_unlink
def show_stats():
print('filesystemstuff stats:')
print(f' computing checksums: {filesystemstuff.TimeInfo.computing_checksums:.3f} seconds')
print(f' handling caching : {filesystemstuff.TimeInfo.checksum_cache_handling:.3f} seconds')
def feed(definition_file: bytes, valid_paths: typing.Set[str]) -> typing.Set[str]:
print(f'Feeding {root} to the Shaman')
resp = session.post(f'{shaman_url}checkout/requirements', data=definition_file, stream=True)
if resp.status_code >= 300:
raise SystemExit(f'Error {resp.status_code}: {resp.text}')
print('==========')
to_upload = deque()
for line in resp.iter_lines():
response, path = line.decode().split(' ', 1)
print(f'{response}\t{path}')
if path not in valid_paths:
raise RuntimeError(f'Shaman asked us for path {path!r} which we never offered')
if response == 'file-unknown':
to_upload.appendleft(path)
elif response == 'already-uploading':
to_upload.append(path)
elif response == 'ERROR':
print(f'ERROR RESPONSE: {path}')
return
else:
print(f'UNKNOWN RESPONSE {response!r} FOR PATH {path!r}')
return
print('==========')
print(f'Going to upload {len(to_upload)} files')
failed_paths = upload_files(to_upload)
if failed_paths:
print('Some files did not upload this iteration:')
for fname in sorted(failed_paths):
print(f' - {fname}')
return failed_paths
def upload_files(to_upload: typing.Deque[str]) -> typing.Set[str]:
failed_paths = set()
deferred_paths = set()
def defer(some_path: str):
nonlocal to_upload
print(' - Shaman asked us to defer uploading this file.')
deferred_paths.add(some_path)
# Instead of deferring this one file, randomize the files to upload.
# This prevents multiple deferrals when someone else is uploading
# files from the same project (because it probably happens alphabetically).
all_files = list(to_upload)
random.shuffle(all_files)
to_upload = deque(all_files)
to_upload.append(some_path)
MAX_DEFERRED_PATHS = 8
MAX_FAILED_PATHS = 8
while to_upload:
# After too many failures, just retry to get a fresh set of files to upload.
if len(failed_paths) > MAX_FAILED_PATHS:
print('Too many failures, going to abort this iteration')
failed_paths.update(to_upload)
return failed_paths
path = to_upload.popleft()
fileinfo = global_fileinfo[path]
headers = {
'X-Shaman-Original-Filename': path
}
# Let the Shaman know whether we can defer uploading this file or not.
can_defer = bool(len(deferred_paths) < MAX_DEFERRED_PATHS and path not in deferred_paths and len(to_upload))
if can_defer:
headers['X-Shaman-Can-Defer-Upload'] = 'true'
print(f'Uploading {path} ; can_defer={can_defer}')
try:
with fileinfo.abspath.open('rb') as infile:
resp = session.post(
f'{shaman_url}files/{fileinfo.checksum}/{fileinfo.filesize}',
data=infile, headers=headers)
resp.raise_for_status()
if resp.status_code == 208:
if can_defer:
defer(path)
else:
print(' - Someone else already finished uploading this file.')
except requests.ConnectionError as ex:
if can_defer:
# Closing the connection with an 'X-Shaman-Can-Defer-Upload: true' header
# indicates that we should defer the upload.
defer(path)
else:
print(f'Error uploading {path}, might retry later: {ex}')
failed_paths.add(path)
else:
failed_paths.discard(path)
return failed_paths
def main():
global session
# Get an authentication token.
resp = requests.get(f'{shaman_url}get-token')
resp.raise_for_status()
session = httpstuff.session(token=resp.text)
paths_to_unlink = set()
def unlink_temp_paths():
for path in paths_to_unlink:
try:
if path.exists():
path.unlink()
except Exception as ex:
print(f'Error deleting {path}: {ex}')
atexit.register(filesystemstuff.cleanup_cache)
atexit.register(show_stats)
atexit.register(unlink_temp_paths)
print(f'Creating Shaman definition file from {root}')
allowed_paths = set()
definition_lines = []
for relpath, line, content_path in feed_lines():
allowed_paths.add(str(relpath))
definition_lines.append(line)
paths_to_unlink.add(content_path)
definition_file = b''.join(definition_lines)
print(f'Computed SHA sums, definition file is {len(definition_file) // 1024} KiB')
sys.stdout.buffer.write(definition_file)
if cli_args.sha_only:
return
for try_count in range(50):
print(f'========== Upload attempt {try_count+1}')
failed_paths = feed(definition_file, allowed_paths)
if not failed_paths:
break
print('==========')
if failed_paths:
raise SystemExit('Aborted due to repeated upload failure')
else:
print(f'All files uploaded succesfully in {try_count+1} iterations')
if cli_args.checkout:
print(f'Going to ask for a checkout with ID {cli_args.checkout}')
resp = session.post(f'{shaman_url}checkout/create/{cli_args.checkout}', data=definition_file)
resp.raise_for_status()
print(f'Received status {resp.status_code}: {resp.text}')
else:
print('Not asking for a checkout, use --checkout if you want this.')
if __name__ == '__main__':
main()