flamenco/pkg/shaman/_test_file_store/stored/d6/fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3/7217.blob
Sybren A. Stüvel 4e8e71e4e2 Initial checkin of Shaman of Flamenco 2
This is not yet working, it's just a direct copy of the Manager of Flamenco
2, with Logrus replaced by Zerolog. The API has been documented in
flamenco-manager.yaml as a starting point for the integration.
2022-03-25 14:10:26 +01:00

237 lines
7.0 KiB
Plaintext

import base64
import contextlib
import gzip
import hashlib
import json
import logging
import os
import sys
import time
import typing
from collections import deque
from pathlib import Path
GLOBAL_CACHE_ROOT = Path().home() / '.cache/shaman-client/shasums'
MAX_CACHE_FILES_AGE_SECS = 3600 * 24 * 60 # 60 days
CURRENT_FILE_VERSION = 2
log = logging.getLogger(__name__)
class TimeInfo:
computing_checksums = 0.0
checksum_cache_handling = 0.0
def find_files(root: Path) -> typing.Iterable[Path]:
queue = deque([root])
while queue:
path = queue.popleft()
# Ignore hidden files/dirs; these can be things like '.svn' or '.git',
# which shouldn't be sent to Shaman.
if path.name.startswith('.'):
continue
if path.is_dir():
for child in path.iterdir():
queue.append(child)
continue
# Skip .blend1, .blend2, etc.
if path.stem.startswith('.blend') and path.stem[-1].isdecimal():
continue
yield path
def compute_checksum(filepath: Path) -> str:
blocksize = 32 * 1024
log.debug('SHAsumming %s', filepath)
with track_time(TimeInfo, 'computing_checksums'):
hasher = hashlib.sha256()
with filepath.open('rb') as infile:
while True:
block = infile.read(blocksize)
if not block:
break
hasher.update(block)
checksum = hasher.hexdigest()
return checksum
def _cache_key(filepath: Path) -> str:
fs_encoding = sys.getfilesystemencoding()
filepath = filepath.absolute()
# Reverse the directory, because most variation is in the last bytes.
rev_dir = str(filepath.parent)[::-1]
cache_path = '%s%s%s' % (filepath.stem, rev_dir, filepath.suffix)
encoded_path = cache_path.encode(fs_encoding)
cache_key = base64.urlsafe_b64encode(encoded_path).decode().rstrip('=')
return cache_key
def chunkstring(string: str, length: int) -> typing.Iterable[str]:
return (string[0+i:length+i] for i in range(0, len(string), length))
def is_compressed_blendfile(filepath: Path) -> bool:
if not filepath.suffix.lower().startswith('.blend'):
return False
with filepath.open('rb') as blendfile:
magic = blendfile.read(3)
return magic == b'\x1f\x8b\x08'
def compute_cached_checksum(filepath: Path) -> (Path, str):
"""Compute the SHA256 checksum in a compression-aware way.
Returns the tuple `(content_path, checksum)`, where
`content_path` is either the path to the decompressed file (if
any) or the filepath itself.
The caller is responsible for removing the decompressed file.
"""
with track_time(TimeInfo, 'checksum_cache_handling'):
cache_key = _cache_key(filepath)
is_compressed = is_compressed_blendfile(filepath)
# Don't create filenames that are longer than 255 characters.
last_parts = Path(*chunkstring(cache_key[10:], 255))
cache_path = GLOBAL_CACHE_ROOT / cache_key[:10] / last_parts
current_stat = filepath.stat()
checksum = parse_cache_file(cache_path, current_stat, is_compressed)
if checksum:
return filepath, checksum
# Determine which path we want to checksum.
if is_compressed:
content_path = decompress(filepath)
else:
content_path = filepath
checksum = compute_checksum(content_path)
with track_time(TimeInfo, 'checksum_cache_handling'):
write_cache_file(cache_path, current_stat, is_compressed, checksum)
return content_path, checksum
def parse_cache_file(cache_path: Path, current_stat: os.stat_result, is_compressed: bool) -> str:
"""Try to parse the cache file as JSON.
:return: the cached checksum, or '' if not cached.
"""
try:
with cache_path.open('r') as cache_file:
payload = json.load(cache_file)
except (OSError, ValueError):
# File may not exist, or have invalid contents.
return ''
file_version = payload.get('version', 1)
if file_version < CURRENT_FILE_VERSION:
return ''
checksum_key = 'uncompressed_checksum' if is_compressed else 'checksum'
checksum = payload.get(checksum_key, '')
cached_mtime = payload.get('file_mtime', 0.0)
cached_size = payload.get('file_size', 0)
if checksum \
and abs(cached_mtime - current_stat.st_mtime) < 0.01 \
and current_stat.st_size == cached_size:
cache_path.touch()
return checksum
def write_cache_file(cache_path: Path, current_stat: os.stat_result, is_compressed: bool, checksum: str) -> str:
checksum_key = 'uncompressed_checksum' if is_compressed else 'checksum'
payload = {
'version': CURRENT_FILE_VERSION,
checksum_key: checksum,
'file_mtime': current_stat.st_mtime,
'file_size': current_stat.st_size,
'is_compressed': is_compressed,
}
cache_path.parent.mkdir(parents=True, exist_ok=True)
with cache_path.open('w') as cache_file:
json.dump(payload, cache_file)
def cleanup_cache():
if not GLOBAL_CACHE_ROOT.exists():
return
with track_time(TimeInfo, 'checksum_cache_handling'):
queue = deque([GLOBAL_CACHE_ROOT])
rmdir_queue = []
now = time.time()
num_removed_files = 0
num_removed_dirs = 0
while queue:
path = queue.popleft()
if path.is_dir():
for child in path.iterdir():
queue.append(child)
rmdir_queue.append(path)
continue
assert path.is_file()
path.relative_to(GLOBAL_CACHE_ROOT)
age = now - path.stat().st_mtime
# Don't trust files from the future either.
if 0 <= age <= MAX_CACHE_FILES_AGE_SECS:
continue
path.unlink()
num_removed_files += 1
for dirpath in reversed(rmdir_queue):
assert dirpath.is_dir()
dirpath.relative_to(GLOBAL_CACHE_ROOT)
try:
dirpath.rmdir()
num_removed_dirs += 1
except OSError:
pass
if num_removed_dirs or num_removed_files:
log.info('Cache Cleanup: removed %d dirs and %d files', num_removed_dirs, num_removed_files)
@contextlib.contextmanager
def track_time(tracker_object: typing.Any, attribute: str):
start_time = time.time()
yield
duration = time.time() - start_time
tracked_so_far = getattr(tracker_object, attribute, 0.0)
setattr(tracker_object, attribute, tracked_so_far + duration)
def decompress(filepath: Path) -> Path:
"""Gunzip the file, returning '{filepath}.gunzipped'."""
decomppath = filepath.with_suffix('%s.gunzipped' % filepath.suffix)
if not decomppath.exists() or filepath.stat().st_mtime >= decomppath.stat().st_mtime:
with gzip.open(str(filepath), 'rb') as infile, decomppath.open('wb') as outfile:
while True:
block = infile.read(32768)
if not block:
break
outfile.write(block)
return decomppath