From b155b0916e441a299c111031ed25e56fd694ba70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 11 Jul 2017 14:57:57 +0200 Subject: [PATCH] CLI reconcile_users: process 10 users in parallel --- cloud/cli.py | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/cloud/cli.py b/cloud/cli.py index e9d6819..e94342c 100644 --- a/cloud/cli.py +++ b/cloud/cli.py @@ -37,6 +37,8 @@ def create_groups(): def reconcile_subscribers(): """For every user, check their subscription status with the store.""" + import threading + import concurrent.futures from pillar.api.blender_cloud.subscription import fetch_subscription_info service.fetch_role_to_group_id_map() @@ -47,27 +49,45 @@ def reconcile_subscribers(): user_count = found.count() log.info('Processing %i users', user_count) - for idx, user in enumerate(found): - log.info('Processing %i/%i %s', idx+1, user_count, user['email']) + lock = threading.Lock() - user_store = fetch_subscription_info(user['email']) - if not user_store: - log.error('Unable to reach store, aborting') - break + real_current_app = current_app._get_current_object() - if not user_store or user_store['cloud_access'] == 0: - action = 'revoke' - unsubscribed_users.append(user['email']) - else: - action = 'grant' + def do_user(idx, user): + log.info('Processing %i/%i %s', idx + 1, user_count, user['email']) - service.do_badger(action, 'subscriber', user_id=user['_id']) + with real_current_app.app_context(): + user_store = fetch_subscription_info(user['email']) + + if not user_store: + log.error('Unable to reach store, aborting') + return + + if not user_store or user_store['cloud_access'] == 0: + action = 'revoke' + with lock: + unsubscribed_users.append(user['email']) + else: + action = 'grant' + + with lock: + service.do_badger(action, 'subscriber', user_id=user['_id']) + + with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: + future_to_user = {executor.submit(do_user, idx, user): user + for idx, user in enumerate(found)} + for future in concurrent.futures.as_completed(future_to_user): + user = future_to_user[future] + try: + future.result() + except Exception as ex: + log.exception('Error updating user %s', user) if not unsubscribed_users: log.info('No unsubscribed users') return - print('The following users have been unsubscribed') + print('The following %i users have been unsubscribed' % len(unsubscribed_users)) for user in unsubscribed_users: print(user)