diff --git a/pillar/__init__.py b/pillar/__init__.py index 66d7f062..7eb06611 100644 --- a/pillar/__init__.py +++ b/pillar/__init__.py @@ -477,6 +477,7 @@ class PillarServer(BlinkerCompatibleEve): # Pillar-defined Celery task modules: celery_task_modules = [ + 'pillar.celery.badges', 'pillar.celery.email_tasks', 'pillar.celery.file_link_tasks', 'pillar.celery.search_index_tasks', diff --git a/pillar/api/eve_settings.py b/pillar/api/eve_settings.py index 65a8a471..b12d3b8e 100644 --- a/pillar/api/eve_settings.py +++ b/pillar/api/eve_settings.py @@ -152,6 +152,14 @@ users_schema = { }, }, + 'badges': { + 'type': 'dict', + 'schema': { + 'html': {'type': 'string'}, # HTML fetched from Blender ID. + 'expires': {'type': 'datetime'}, # When we should fetch it again. + }, + }, + # Properties defined by extensions. Extensions should use their name (see the # PillarExtension.name property) as the key, and are free to use whatever they want as value, # but we suggest a dict for future extendability. diff --git a/pillar/badge_sync.py b/pillar/badge_sync.py new file mode 100644 index 00000000..7979a947 --- /dev/null +++ b/pillar/badge_sync.py @@ -0,0 +1,185 @@ +import collections +import datetime +import logging +import typing +from urllib.parse import urljoin + +import bson +import requests + +from pillar import current_app +from pillar.api.utils import utcnow + +SyncUser = collections.namedtuple('SyncUser', 'user_id token bid_user_id') +BadgeHTML = collections.namedtuple('BadgeHTML', 'html expires') +log = logging.getLogger(__name__) + + +class StopRefreshing(Exception): + """Indicates that Blender ID is having problems. + + Further badge refreshes should be put on hold to avoid bludgeoning + a suffering Blender ID. + """ + + +def find_users_to_sync() -> typing.Iterable[SyncUser]: + """Return user information of syncable users with badges.""" + + now = utcnow() + tokens_coll = current_app.db('tokens') + cursor = tokens_coll.aggregate([ + # Find all users who have a 'badge' scope in their OAuth token. + {'$match': { + 'token': {'$exists': True}, + 'oauth_scopes': 'badge', + 'expire_time': {'$gt': now}, + }}, + {'$lookup': { + 'from': 'users', + 'localField': 'user', + 'foreignField': '_id', + 'as': 'user' + }}, + + # Prevent 'user' from being an array. + {'$unwind': {'path': '$user'}}, + + # Get the Blender ID user ID only. + {'$unwind': {'path': '$user.auth'}}, + {'$match': {'user.auth.provider': 'blender-id'}}, + + # Only select those users whose badge doesn't exist or has expired. + {'$match': { + 'user.badges.expires': {'$not': {'$gt': now}} + }}, + + # Make sure that the badges that expire last are also refreshed last. + {'$sort': {'user.badges.expires': 1}}, + + # Reduce the document to the info we're after. + {'$project': { + 'token': True, + 'user._id': True, + 'user.auth.user_id': True, + 'user.badges.expires': True, + }}, + ]) + + log.debug('Aggregating tokens and users') + for user_info in cursor: + log.debug('User %s has badges %s', + user_info['user']['_id'], user_info['user'].get('badges')) + yield SyncUser( + user_id=user_info['user']['_id'], + token=user_info['token'], + bid_user_id=user_info['user']['auth']['user_id']) + + +def fetch_badge_html(session: requests.Session, user: SyncUser, size: str) \ + -> typing.Optional[BadgeHTML]: + """Fetch a Blender ID badge for this user. + + :param session: + :param user: + :param size: Size indication for the badge images, see the Blender ID + documentation/code. As of this writing valid sizes are {'s', 'm', 'l'}. + """ + my_log = log.getChild('fetch_badge_html') + + blender_id_endpoint = current_app.config['BLENDER_ID_ENDPOINT'] + url = urljoin(blender_id_endpoint, f'api/badges/{user.bid_user_id}/html/{size}') + + my_log.debug('Fetching badge HTML at %s for user %s', url, user.user_id) + try: + resp = session.get(url, headers={'Authorization': f'Bearer {user.token}'}) + except requests.ConnectionError as ex: + my_log.warning('Unable to connect to Blender ID at %s: %s', url, ex) + raise StopRefreshing() + + if resp.status_code == 204: + my_log.debug('No badges for user %s', user.user_id) + return None + if resp.status_code == 403: + my_log.warning('Tried fetching %s for user %s but received a 403: %s', + url, user.user_id, resp.text) + return None + if resp.status_code == 400: + my_log.warning('Blender ID did not accept our GET request at %s for user %s: %s', + url, user.user_id, resp.text) + return None + if resp.status_code == 500: + my_log.warning('Blender ID returned an internal server error on %s for user %s, ' + 'aborting all badge refreshes: %s', url, user.user_id, resp.text) + raise StopRefreshing() + if resp.status_code == 404: + my_log.warning('Blender ID has no user %s for our user %s', user.bid_user_id, user.user_id) + return None + resp.raise_for_status() + + my_log.debug('Received new badge HTML from %s for user %s', url, user.user_id) + badge_expiry = badge_expiry_config() + return BadgeHTML( + html=resp.text, + expires=utcnow() + badge_expiry, + ) + + +def refresh_all_badges(only_user_id: typing.Optional[bson.ObjectId] = None, *, + dry_run=False, + timelimit: datetime.timedelta): + """Re-fetch all badges for all users, except when already refreshed recently. + + :param only_user_id: Only refresh this user. This is expected to be used + sparingly during manual maintenance / debugging sessions only. It does + fetch all users to refresh, and in Python code skips all except the + given one. + :param dry_run: if True the changes are described in the log, but not performed. + :param timelimit: Refreshing will stop after this time. This allows for cron(-like) + jobs to run without overlapping, even when the number fo badges to refresh + becomes larger than possible within the period of the cron job. + """ + from requests.adapters import HTTPAdapter + my_log = log.getChild('fetch_badge_html') + + # Test the config before we start looping over the world. + badge_expiry = badge_expiry_config() + if not badge_expiry or not isinstance(badge_expiry, datetime.timedelta): + raise ValueError('BLENDER_ID_BADGE_EXPIRY not configured properly, should be a timedelta') + + session = requests.Session() + session.mount('https://', HTTPAdapter(max_retries=5)) + users_coll = current_app.db('users') + + deadline = utcnow() + timelimit + + num_updates = 0 + for user_info in find_users_to_sync(): + if utcnow() > deadline: + my_log.info('Stopping badge refresh because the timelimit %s (H:MM:SS) was hit.', + timelimit) + break + + if only_user_id and user_info.user_id != only_user_id: + my_log.debug('Skipping user %s', user_info.user_id) + continue + try: + badge_info = fetch_badge_html(session, user_info, 's') + except StopRefreshing: + my_log.error('Blender ID has internal problems, stopping badge refreshing at user %s', + user_info) + break + + num_updates += 1 + my_log.info('Updating badges HTML for Blender ID %s, user %s', + user_info.bid_user_id, user_info.user_id) + if not dry_run: + result = users_coll.update_one({'_id': user_info.user_id}, + {'$set': {'badges': badge_info._asdict()}}) + if result.matched_count != 1: + my_log.warning('Unable to update badges for user %s', user_info.user_id) + my_log.info('Updated badges of %d users%s', num_updates, ' (dry-run)' if dry_run else '') + + +def badge_expiry_config() -> datetime.timedelta: + return current_app.config.get('BLENDER_ID_BADGE_EXPIRY') diff --git a/pillar/celery/badges.py b/pillar/celery/badges.py new file mode 100644 index 00000000..c4389c98 --- /dev/null +++ b/pillar/celery/badges.py @@ -0,0 +1,20 @@ +"""Badge HTML synchronisation. + +Note that this module can only be imported when an application context is +active. Best to late-import this in the functions where it's needed. +""" +import datetime +import logging + +from pillar import current_app, badge_sync + +log = logging.getLogger(__name__) + + +@current_app.celery.task(ignore_result=True) +def sync_badges_for_users(timelimit_seconds: int): + """Synchronises Blender ID badges for the most-urgent users.""" + + timelimit = datetime.timedelta(seconds=timelimit_seconds) + log.info('Refreshing badges, timelimit is %s (H:MM:SS)', timelimit) + badge_sync.refresh_all_badges(timelimit=timelimit) diff --git a/pillar/cli/__init__.py b/pillar/cli/__init__.py index 421ed0e9..50d8d63c 100644 --- a/pillar/cli/__init__.py +++ b/pillar/cli/__init__.py @@ -13,6 +13,7 @@ from pillar.cli.maintenance import manager_maintenance from pillar.cli.operations import manager_operations from pillar.cli.setup import manager_setup from pillar.cli.elastic import manager_elastic +from . import badges from pillar.cli import translations @@ -24,3 +25,4 @@ manager.add_command("maintenance", manager_maintenance) manager.add_command("setup", manager_setup) manager.add_command("operations", manager_operations) manager.add_command("elastic", manager_elastic) +manager.add_command("badges", badges.manager) diff --git a/pillar/cli/badges.py b/pillar/cli/badges.py new file mode 100644 index 00000000..92d7ec90 --- /dev/null +++ b/pillar/cli/badges.py @@ -0,0 +1,39 @@ +import datetime +import logging + +from flask_script import Manager +from pillar import current_app, badge_sync +from pillar.api.utils import utcnow + +log = logging.getLogger(__name__) + +manager = Manager(current_app, usage="Badge operations") + + +@manager.option('-u', '--user', dest='email', default='', help='Email address of the user to sync') +@manager.option('-a', '--all', dest='sync_all', action='store_true', default=False, + help='Sync all users') +@manager.option('--go', action='store_true', default=False, + help='Actually perform the sync; otherwise it is a dry-run.') +def sync(email: str = '', sync_all: bool=False, go: bool=False): + if bool(email) == bool(sync_all): + raise ValueError('Use either --user or --all.') + + if email: + users_coll = current_app.db('users') + db_user = users_coll.find_one({'email': email}, projection={'_id': True}) + if not db_user: + raise ValueError(f'No user with email {email!r} found') + specific_user = db_user['_id'] + else: + specific_user = None + + if not go: + log.info('Performing dry-run, not going to change the user database.') + start_time = utcnow() + badge_sync.refresh_all_badges(specific_user, dry_run=not go, + timelimit=datetime.timedelta(hours=1)) + end_time = utcnow() + log.info('%s took %s (H:MM:SS)', + 'Updating user badges' if go else 'Dry-run', + end_time - start_time) diff --git a/pillar/config.py b/pillar/config.py index 93e3adad..dd7383dc 100644 --- a/pillar/config.py +++ b/pillar/config.py @@ -1,6 +1,8 @@ +from collections import defaultdict +import datetime import os.path from os import getenv -from collections import defaultdict + import requests.certs # Certificate file for communication with other systems. @@ -204,8 +206,18 @@ CELERY_BEAT_SCHEDULE = { 'schedule': 600, # every N seconds 'args': ('gcs', 100) }, + 'refresh-blenderid-badges': { + 'task': 'pillar.celery.badges.sync_badges_for_users', + 'schedule': 600, # every N seconds + 'args': (540, ), # time limit in seconds, keep shorter than 'schedule' + } } +# Badges will be re-fetched every timedelta. +# TODO(Sybren): A proper value should be determined after we actually have users with badges. +BLENDER_ID_BADGE_EXPIRY = datetime.timedelta(hours=4) + + # Mapping from user role to capabilities obtained by users with that role. USER_CAPABILITIES = defaultdict(**{ 'subscriber': {'subscriber', 'home-project'}, diff --git a/pillar/tests/__init__.py b/pillar/tests/__init__.py index 8f58e556..b502f9d0 100644 --- a/pillar/tests/__init__.py +++ b/pillar/tests/__init__.py @@ -349,15 +349,21 @@ class AbstractPillarTest(TestMinimal): with flask.request_started.connected_to(signal_handler, self.app): yield - def create_valid_auth_token(self, user_id, token='token'): + # TODO: rename to 'create_auth_token' now that 'expire_in_days' can be negative. + def create_valid_auth_token(self, + user_id: ObjectId, + token='token', + *, + oauth_scopes: typing.Optional[typing.List[str]]=None, + expire_in_days=1) -> dict: from pillar.api.utils import utcnow - future = utcnow() + datetime.timedelta(days=1) + future = utcnow() + datetime.timedelta(days=expire_in_days) with self.app.test_request_context(): from pillar.api.utils import authentication as auth - token_data = auth.store_token(user_id, token, future, None) + token_data = auth.store_token(user_id, token, future, oauth_scopes=oauth_scopes) return token_data diff --git a/pillar/web/users/routes.py b/pillar/web/users/routes.py index b4162207..79136eb5 100644 --- a/pillar/web/users/routes.py +++ b/pillar/web/users/routes.py @@ -70,6 +70,9 @@ def oauth_callback(provider): db_user = find_user_in_db(user_info, provider=provider) db_id, status = upsert_user(db_user) + # TODO(Sybren): If the user doesn't have any badges, but the access token + # does have 'badge' scope, we should fetch the badges in the background. + if oauth_user.access_token: # TODO(Sybren): make nr of days configurable, or get from OAuthSignIn subclass. token_expiry = utcnow() + datetime.timedelta(days=15) diff --git a/tests/test_badge_sync.py b/tests/test_badge_sync.py new file mode 100644 index 00000000..d491f429 --- /dev/null +++ b/tests/test_badge_sync.py @@ -0,0 +1,195 @@ +import datetime + +import requests +import responses + +from pillar.tests import AbstractPillarTest + +httpmock = responses.RequestsMock() + + +class AbstractSyncTest(AbstractPillarTest): + def setUp(self): + super().setUp() + self.uid1 = self.create_user(24 * '1') + self.uid2 = self.create_user(24 * '2') + + # Make sure the users have different auth info. + with self.app.app_context(): + users_coll = self.app.db('users') + users_coll.update_one( + {'_id': self.uid1}, + {'$set': {'auth': [ + {'provider': 'local', 'user_id': '47', 'token': ''}, + {'provider': 'blender-id', 'user_id': '1947', 'token': ''}, + ]}}) + users_coll.update_one( + {'_id': self.uid2}, + {'$set': {'auth': [ + {'provider': 'blender-id', 'user_id': '4488', 'token': ''}, + {'provider': 'local', 'user_id': '48', 'token': ''}, + ]}}) + + self.create_valid_auth_token(self.uid1, token='find-this-token-uid1', + oauth_scopes=['email', 'badge']) + self.create_valid_auth_token(self.uid1, token='no-badge-scope', + oauth_scopes=['email']) + self.create_valid_auth_token(self.uid1, token='expired', + oauth_scopes=['email', 'badge'], + expire_in_days=-1) + + self.create_valid_auth_token(self.uid2, token='find-this-token-uid2', + oauth_scopes=['email', 'badge']) + self.create_valid_auth_token(self.uid2, token='no-badge-scope', + oauth_scopes=['email']) + self.create_valid_auth_token(self.uid2, token='expired', + oauth_scopes=['email', 'badge'], + expire_in_days=-1) + + from pillar import badge_sync + self.sync_user1 = badge_sync.SyncUser(self.uid1, 'find-this-token-uid1', '1947') + self.sync_user2 = badge_sync.SyncUser(self.uid2, 'find-this-token-uid2', '4488') + + +class FindUsersToSyncTest(AbstractSyncTest): + def test_no_badge_fetched_yet(self): + from pillar import badge_sync + with self.app.app_context(): + found = set(badge_sync.find_users_to_sync()) + self.assertEqual({self.sync_user1, self.sync_user2}, found) + + def _update_badge_expiry(self, delta_minutes1, delta_minutes2): + """Make badges of userN expire in delta_minutesN minutes.""" + from pillar.api.utils import utcnow, remove_private_keys + now = utcnow() + + # Do the update via Eve so that that flow is covered too. + users_coll = self.app.db('users') + db_user1 = users_coll.find_one(self.uid1) + db_user1['badges'] = { + 'html': 'badge for user 1', + 'expires': now + datetime.timedelta(minutes=delta_minutes1) + } + r, _, _, status = self.app.put_internal('users', + remove_private_keys(db_user1), + _id=self.uid1) + self.assertEqual(200, status, r) + + db_user2 = users_coll.find_one(self.uid2) + db_user2['badges'] = { + 'html': 'badge for user 2', + 'expires': now + datetime.timedelta(minutes=delta_minutes2) + } + r, _, _, status = self.app.put_internal('users', + remove_private_keys(db_user2), + _id=self.uid2) + self.assertEqual(200, status, r) + + def test_badge_fetched_recently(self): + from pillar import badge_sync + + # Badges of user1 expired, user2 didn't yet. + with self.app.app_context(): + self._update_badge_expiry(-5, 5) + found = list(badge_sync.find_users_to_sync()) + self.assertEqual([self.sync_user1], found) + + # Badges of both users expired, but user2 expired longer ago. + with self.app.app_context(): + self._update_badge_expiry(-5, -10) + found = list(badge_sync.find_users_to_sync()) + self.assertEqual([self.sync_user2, self.sync_user1], found) + + # Badges of both not expired yet. + with self.app.app_context(): + self._update_badge_expiry(2, 3) + found = list(badge_sync.find_users_to_sync()) + self.assertEqual([], found) + + +class FetchHTMLTest(AbstractSyncTest): + @httpmock.activate + def test_happy(self): + from pillar import badge_sync + from pillar.api.utils import utcnow + + def check_request(request: requests.PreparedRequest): + if request.headers['Authorization'] != 'Bearer find-this-token-uid1': + return 403, {}, 'BAD TOKEN' + return 200, {'Content-Type': 'text/html; charset=utf-8'}, 'твоја мајка'.encode() + + httpmock.add_callback('GET', 'http://id.local:8001/api/badges/1947/html/s', check_request) + + with self.app.app_context(): + badge_html = badge_sync.fetch_badge_html(requests.Session(), self.sync_user1, 's') + expected_expire = utcnow() + self.app.config['BLENDER_ID_BADGE_EXPIRY'] + + self.assertEqual('твоја мајка', badge_html.html) + margin = datetime.timedelta(minutes=1) + self.assertLess(expected_expire - margin, badge_html.expires) + self.assertGreater(expected_expire + margin, badge_html.expires) + + @httpmock.activate + def test_internal_server_error(self): + from pillar import badge_sync + + httpmock.add('GET', 'http://id.local:8001/api/badges/1947/html/s', + body='oops', status=500) + + with self.assertRaises(badge_sync.StopRefreshing), self.app.app_context(): + badge_sync.fetch_badge_html(requests.Session(), self.sync_user1, 's') + + @httpmock.activate + def test_no_badge(self): + from pillar import badge_sync + + httpmock.add('GET', 'http://id.local:8001/api/badges/1947/html/s', + body='', status=204) + with self.app.app_context(): + badge_html = badge_sync.fetch_badge_html(requests.Session(), self.sync_user1, 's') + self.assertIsNone(badge_html) + + @httpmock.activate + def test_no_such_user(self): + from pillar import badge_sync + + httpmock.add('GET', 'http://id.local:8001/api/badges/1947/html/s', + body='Not Found', status=404) + with self.app.app_context(): + badge_html = badge_sync.fetch_badge_html(requests.Session(), self.sync_user1, 's') + self.assertIsNone(badge_html) + + @httpmock.activate + def test_no_connection_possible(self): + from pillar import badge_sync + + with self.assertRaises(badge_sync.StopRefreshing), self.app.app_context(): + badge_sync.fetch_badge_html(requests.Session(), self.sync_user1, 's') + + +class RefreshAllTest(AbstractSyncTest): + @httpmock.activate + def test_happy(self): + from pillar import badge_sync + + httpmock.add('GET', 'http://id.local:8001/api/badges/1947/html/s', + body='badges for Agent 47') + httpmock.add('GET', 'http://id.local:8001/api/badges/4488/html/s', + body='badges for that other user') + + with self.app.app_context(): + badge_sync.refresh_all_badges(timelimit=datetime.timedelta(seconds=4)) + + db_user1 = self.get('/api/users/me', auth_token=self.sync_user1.token).json + db_user2 = self.get('/api/users/me', auth_token=self.sync_user2.token).json + self.assertEqual('badges for Agent 47', db_user1['badges']['html']) + self.assertEqual('badges for that other user', db_user2['badges']['html']) + + @httpmock.activate + def test_timelimit(self): + from pillar import badge_sync + + # This shouldn't hit any connection error, because it should immediately + # hit the time limit, before doing any call to Blender ID. + with self.app.app_context(): + badge_sync.refresh_all_badges(timelimit=datetime.timedelta(seconds=-4))