diff --git a/cloud/cli.py b/cloud/cli.py index ed450a7..cb78743 100644 --- a/cloud/cli.py +++ b/cloud/cli.py @@ -1,8 +1,11 @@ #!/usr/bin/env python import logging +from urllib.parse import urljoin + from flask import current_app from flask_script import Manager +import requests from pillar.cli import manager from pillar.api import service @@ -39,39 +42,76 @@ def reconcile_subscribers(): import threading import concurrent.futures - from pillar.api.blender_cloud.subscription import fetch_subscription_info + + from pillar.auth import UserClass + from pillar.api.blender_cloud.subscription import do_update_subscription + + sessions = threading.local() service.fetch_role_to_group_id_map() users_coll = current_app.data.driver.db['users'] - unsubscribed_users = [] - found = users_coll.find({'roles': 'subscriber'}) - user_count = found.count() - log.info('Processing %i users', user_count) + found = users_coll.find({'auth.provider': 'blender-id'}) + count_users = found.count() + count_skipped = count_processed = 0 + log.info('Processing %i users', count_users) lock = threading.Lock() real_current_app = current_app._get_current_object() + api_token = real_current_app.config['BLENDER_ID_USER_INFO_TOKEN'] + api_url = real_current_app.config['BLENDER_ID_USER_INFO_API'] + def do_user(idx, user): - log.info('Processing %i/%i %s', idx + 1, user_count, user['email']) + nonlocal count_skipped, count_processed - with real_current_app.app_context(): - user_store = fetch_subscription_info(user['email']) + log.info('Processing %i/%i %s', idx + 1, count_users, user['email']) - if not user_store: - log.error('Unable to reach store, aborting') - return - - if not user_store or user_store['cloud_access'] == 0: - action = 'revoke' - with lock: - unsubscribed_users.append(user['email']) - else: - action = 'grant' + # Get the Requests session for this thread. + try: + sess = sessions.session + except AttributeError: + sess = sessions.session = requests.Session() + # Get the info from Blender ID + bid_user_ids = [auth['user_id'] + for auth in user['auth'] + if auth['provider'] == 'blender-id'] + if not bid_user_ids: with lock: - service.do_badger(action, role='subscriber', user_id=user['_id']) + count_skipped += 1 + return + bid_user_id = bid_user_ids[0] + + url = urljoin(api_url, bid_user_id) + resp = sess.get(url, headers={'Authorization': f'Bearer {api_token}'}) + + if resp.status_code == 404: + log.info('User %s with Blender ID %s not found, skipping', user['email'], bid_user_id) + with lock: + count_skipped += 1 + return + + if resp.status_code != 200: + log.error('Unable to reach Blender ID (code %d), aborting', resp.status_code) + with lock: + count_skipped += 1 + return + + bid_user = resp.json() + if not bid_user: + log.error('Unable to parse response for user %s, aborting', user['email']) + with lock: + count_skipped += 1 + return + + # Actually update the user, and do it thread-safe just to be sure. + with real_current_app.app_context(): + local_user = UserClass.construct('', user) + with lock: + do_update_subscription(local_user, bid_user) + count_processed += 1 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_user = {executor.submit(do_user, idx, user): user @@ -83,13 +123,9 @@ def reconcile_subscribers(): except Exception as ex: log.exception('Error updating user %s', user) - if not unsubscribed_users: - log.info('No unsubscribed users') - return - - print('The following %i users have been unsubscribed' % len(unsubscribed_users)) - for user in unsubscribed_users: - print(user) + log.info('Done reconciling %d subscribers', count_users) + log.info(' processed: %d', count_processed) + log.info(' skipped : %d', count_skipped) manager.add_command("cloud", manager_cloud)