Sybren A. Stüvel
4e8e71e4e2
This is not yet working, it's just a direct copy of the Manager of Flamenco 2, with Logrus replaced by Zerolog. The API has been documented in flamenco-manager.yaml as a starting point for the integration.
237 lines
7.0 KiB
Plaintext
237 lines
7.0 KiB
Plaintext
import base64
|
|
import contextlib
|
|
import gzip
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import typing
|
|
from collections import deque
|
|
from pathlib import Path
|
|
|
|
GLOBAL_CACHE_ROOT = Path().home() / '.cache/shaman-client/shasums'
|
|
MAX_CACHE_FILES_AGE_SECS = 3600 * 24 * 60 # 60 days
|
|
CURRENT_FILE_VERSION = 2
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class TimeInfo:
|
|
computing_checksums = 0.0
|
|
checksum_cache_handling = 0.0
|
|
|
|
|
|
def find_files(root: Path) -> typing.Iterable[Path]:
|
|
queue = deque([root])
|
|
while queue:
|
|
path = queue.popleft()
|
|
|
|
# Ignore hidden files/dirs; these can be things like '.svn' or '.git',
|
|
# which shouldn't be sent to Shaman.
|
|
if path.name.startswith('.'):
|
|
continue
|
|
|
|
if path.is_dir():
|
|
for child in path.iterdir():
|
|
queue.append(child)
|
|
continue
|
|
|
|
# Skip .blend1, .blend2, etc.
|
|
if path.stem.startswith('.blend') and path.stem[-1].isdecimal():
|
|
continue
|
|
|
|
yield path
|
|
|
|
|
|
def compute_checksum(filepath: Path) -> str:
|
|
blocksize = 32 * 1024
|
|
|
|
log.debug('SHAsumming %s', filepath)
|
|
with track_time(TimeInfo, 'computing_checksums'):
|
|
hasher = hashlib.sha256()
|
|
with filepath.open('rb') as infile:
|
|
while True:
|
|
block = infile.read(blocksize)
|
|
if not block:
|
|
break
|
|
hasher.update(block)
|
|
checksum = hasher.hexdigest()
|
|
return checksum
|
|
|
|
|
|
def _cache_key(filepath: Path) -> str:
|
|
fs_encoding = sys.getfilesystemencoding()
|
|
filepath = filepath.absolute()
|
|
|
|
# Reverse the directory, because most variation is in the last bytes.
|
|
rev_dir = str(filepath.parent)[::-1]
|
|
cache_path = '%s%s%s' % (filepath.stem, rev_dir, filepath.suffix)
|
|
encoded_path = cache_path.encode(fs_encoding)
|
|
cache_key = base64.urlsafe_b64encode(encoded_path).decode().rstrip('=')
|
|
|
|
return cache_key
|
|
|
|
def chunkstring(string: str, length: int) -> typing.Iterable[str]:
|
|
return (string[0+i:length+i] for i in range(0, len(string), length))
|
|
|
|
|
|
def is_compressed_blendfile(filepath: Path) -> bool:
|
|
if not filepath.suffix.lower().startswith('.blend'):
|
|
return False
|
|
|
|
with filepath.open('rb') as blendfile:
|
|
magic = blendfile.read(3)
|
|
|
|
return magic == b'\x1f\x8b\x08'
|
|
|
|
|
|
def compute_cached_checksum(filepath: Path) -> (Path, str):
|
|
"""Compute the SHA256 checksum in a compression-aware way.
|
|
|
|
Returns the tuple `(content_path, checksum)`, where
|
|
`content_path` is either the path to the decompressed file (if
|
|
any) or the filepath itself.
|
|
|
|
The caller is responsible for removing the decompressed file.
|
|
"""
|
|
|
|
with track_time(TimeInfo, 'checksum_cache_handling'):
|
|
cache_key = _cache_key(filepath)
|
|
is_compressed = is_compressed_blendfile(filepath)
|
|
|
|
# Don't create filenames that are longer than 255 characters.
|
|
last_parts = Path(*chunkstring(cache_key[10:], 255))
|
|
cache_path = GLOBAL_CACHE_ROOT / cache_key[:10] / last_parts
|
|
current_stat = filepath.stat()
|
|
|
|
checksum = parse_cache_file(cache_path, current_stat, is_compressed)
|
|
if checksum:
|
|
return filepath, checksum
|
|
|
|
# Determine which path we want to checksum.
|
|
if is_compressed:
|
|
content_path = decompress(filepath)
|
|
else:
|
|
content_path = filepath
|
|
|
|
checksum = compute_checksum(content_path)
|
|
|
|
with track_time(TimeInfo, 'checksum_cache_handling'):
|
|
write_cache_file(cache_path, current_stat, is_compressed, checksum)
|
|
|
|
return content_path, checksum
|
|
|
|
def parse_cache_file(cache_path: Path, current_stat: os.stat_result, is_compressed: bool) -> str:
|
|
"""Try to parse the cache file as JSON.
|
|
|
|
:return: the cached checksum, or '' if not cached.
|
|
"""
|
|
|
|
try:
|
|
with cache_path.open('r') as cache_file:
|
|
payload = json.load(cache_file)
|
|
except (OSError, ValueError):
|
|
# File may not exist, or have invalid contents.
|
|
return ''
|
|
|
|
file_version = payload.get('version', 1)
|
|
if file_version < CURRENT_FILE_VERSION:
|
|
return ''
|
|
|
|
checksum_key = 'uncompressed_checksum' if is_compressed else 'checksum'
|
|
checksum = payload.get(checksum_key, '')
|
|
cached_mtime = payload.get('file_mtime', 0.0)
|
|
cached_size = payload.get('file_size', 0)
|
|
|
|
if checksum \
|
|
and abs(cached_mtime - current_stat.st_mtime) < 0.01 \
|
|
and current_stat.st_size == cached_size:
|
|
cache_path.touch()
|
|
return checksum
|
|
|
|
def write_cache_file(cache_path: Path, current_stat: os.stat_result, is_compressed: bool, checksum: str) -> str:
|
|
checksum_key = 'uncompressed_checksum' if is_compressed else 'checksum'
|
|
payload = {
|
|
'version': CURRENT_FILE_VERSION,
|
|
checksum_key: checksum,
|
|
'file_mtime': current_stat.st_mtime,
|
|
'file_size': current_stat.st_size,
|
|
'is_compressed': is_compressed,
|
|
}
|
|
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with cache_path.open('w') as cache_file:
|
|
json.dump(payload, cache_file)
|
|
|
|
|
|
def cleanup_cache():
|
|
if not GLOBAL_CACHE_ROOT.exists():
|
|
return
|
|
|
|
with track_time(TimeInfo, 'checksum_cache_handling'):
|
|
queue = deque([GLOBAL_CACHE_ROOT])
|
|
rmdir_queue = []
|
|
|
|
now = time.time()
|
|
num_removed_files = 0
|
|
num_removed_dirs = 0
|
|
while queue:
|
|
path = queue.popleft()
|
|
|
|
if path.is_dir():
|
|
for child in path.iterdir():
|
|
queue.append(child)
|
|
|
|
rmdir_queue.append(path)
|
|
continue
|
|
|
|
assert path.is_file()
|
|
path.relative_to(GLOBAL_CACHE_ROOT)
|
|
|
|
age = now - path.stat().st_mtime
|
|
# Don't trust files from the future either.
|
|
if 0 <= age <= MAX_CACHE_FILES_AGE_SECS:
|
|
continue
|
|
|
|
path.unlink()
|
|
num_removed_files += 1
|
|
|
|
for dirpath in reversed(rmdir_queue):
|
|
assert dirpath.is_dir()
|
|
dirpath.relative_to(GLOBAL_CACHE_ROOT)
|
|
|
|
try:
|
|
dirpath.rmdir()
|
|
num_removed_dirs += 1
|
|
except OSError:
|
|
pass
|
|
|
|
if num_removed_dirs or num_removed_files:
|
|
log.info('Cache Cleanup: removed %d dirs and %d files', num_removed_dirs, num_removed_files)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def track_time(tracker_object: typing.Any, attribute: str):
|
|
start_time = time.time()
|
|
yield
|
|
duration = time.time() - start_time
|
|
tracked_so_far = getattr(tracker_object, attribute, 0.0)
|
|
setattr(tracker_object, attribute, tracked_so_far + duration)
|
|
|
|
|
|
def decompress(filepath: Path) -> Path:
|
|
"""Gunzip the file, returning '{filepath}.gunzipped'."""
|
|
|
|
decomppath = filepath.with_suffix('%s.gunzipped' % filepath.suffix)
|
|
|
|
if not decomppath.exists() or filepath.stat().st_mtime >= decomppath.stat().st_mtime:
|
|
with gzip.open(str(filepath), 'rb') as infile, decomppath.open('wb') as outfile:
|
|
while True:
|
|
block = infile.read(32768)
|
|
if not block:
|
|
break
|
|
outfile.write(block)
|
|
return decomppath
|