diff --git a/cloud/cli.py b/cloud/cli.py index e9d6819..e94342c 100644 --- a/cloud/cli.py +++ b/cloud/cli.py @@ -37,6 +37,8 @@ def create_groups(): def reconcile_subscribers(): """For every user, check their subscription status with the store.""" + import threading + import concurrent.futures from pillar.api.blender_cloud.subscription import fetch_subscription_info service.fetch_role_to_group_id_map() @@ -47,27 +49,45 @@ def reconcile_subscribers(): user_count = found.count() log.info('Processing %i users', user_count) - for idx, user in enumerate(found): - log.info('Processing %i/%i %s', idx+1, user_count, user['email']) + lock = threading.Lock() - user_store = fetch_subscription_info(user['email']) - if not user_store: - log.error('Unable to reach store, aborting') - break + real_current_app = current_app._get_current_object() - if not user_store or user_store['cloud_access'] == 0: - action = 'revoke' - unsubscribed_users.append(user['email']) - else: - action = 'grant' + def do_user(idx, user): + log.info('Processing %i/%i %s', idx + 1, user_count, user['email']) - service.do_badger(action, 'subscriber', user_id=user['_id']) + with real_current_app.app_context(): + user_store = fetch_subscription_info(user['email']) + + if not user_store: + log.error('Unable to reach store, aborting') + return + + if not user_store or user_store['cloud_access'] == 0: + action = 'revoke' + with lock: + unsubscribed_users.append(user['email']) + else: + action = 'grant' + + with lock: + service.do_badger(action, 'subscriber', user_id=user['_id']) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + future_to_user = {executor.submit(do_user, idx, user): user + for idx, user in enumerate(found)} + for future in concurrent.futures.as_completed(future_to_user): + user = future_to_user[future] + try: + future.result() + except Exception as ex: + log.exception('Error updating user %s', user) if not unsubscribed_users: log.info('No unsubscribed users') return - print('The following users have been unsubscribed') + print('The following %i users have been unsubscribed' % len(unsubscribed_users)) for user in unsubscribed_users: print(user)