CLI reconcile_users: process 10 users in parallel
This commit is contained in:
28
cloud/cli.py
28
cloud/cli.py
@@ -37,6 +37,8 @@ def create_groups():
|
|||||||
def reconcile_subscribers():
|
def reconcile_subscribers():
|
||||||
"""For every user, check their subscription status with the store."""
|
"""For every user, check their subscription status with the store."""
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import concurrent.futures
|
||||||
from pillar.api.blender_cloud.subscription import fetch_subscription_info
|
from pillar.api.blender_cloud.subscription import fetch_subscription_info
|
||||||
|
|
||||||
service.fetch_role_to_group_id_map()
|
service.fetch_role_to_group_id_map()
|
||||||
@@ -47,27 +49,45 @@ def reconcile_subscribers():
|
|||||||
user_count = found.count()
|
user_count = found.count()
|
||||||
log.info('Processing %i users', user_count)
|
log.info('Processing %i users', user_count)
|
||||||
|
|
||||||
for idx, user in enumerate(found):
|
lock = threading.Lock()
|
||||||
log.info('Processing %i/%i %s', idx+1, user_count, user['email'])
|
|
||||||
|
|
||||||
|
real_current_app = current_app._get_current_object()
|
||||||
|
|
||||||
|
def do_user(idx, user):
|
||||||
|
log.info('Processing %i/%i %s', idx + 1, user_count, user['email'])
|
||||||
|
|
||||||
|
with real_current_app.app_context():
|
||||||
user_store = fetch_subscription_info(user['email'])
|
user_store = fetch_subscription_info(user['email'])
|
||||||
|
|
||||||
if not user_store:
|
if not user_store:
|
||||||
log.error('Unable to reach store, aborting')
|
log.error('Unable to reach store, aborting')
|
||||||
break
|
return
|
||||||
|
|
||||||
if not user_store or user_store['cloud_access'] == 0:
|
if not user_store or user_store['cloud_access'] == 0:
|
||||||
action = 'revoke'
|
action = 'revoke'
|
||||||
|
with lock:
|
||||||
unsubscribed_users.append(user['email'])
|
unsubscribed_users.append(user['email'])
|
||||||
else:
|
else:
|
||||||
action = 'grant'
|
action = 'grant'
|
||||||
|
|
||||||
|
with lock:
|
||||||
service.do_badger(action, 'subscriber', user_id=user['_id'])
|
service.do_badger(action, 'subscriber', user_id=user['_id'])
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
||||||
|
future_to_user = {executor.submit(do_user, idx, user): user
|
||||||
|
for idx, user in enumerate(found)}
|
||||||
|
for future in concurrent.futures.as_completed(future_to_user):
|
||||||
|
user = future_to_user[future]
|
||||||
|
try:
|
||||||
|
future.result()
|
||||||
|
except Exception as ex:
|
||||||
|
log.exception('Error updating user %s', user)
|
||||||
|
|
||||||
if not unsubscribed_users:
|
if not unsubscribed_users:
|
||||||
log.info('No unsubscribed users')
|
log.info('No unsubscribed users')
|
||||||
return
|
return
|
||||||
|
|
||||||
print('The following users have been unsubscribed')
|
print('The following %i users have been unsubscribed' % len(unsubscribed_users))
|
||||||
for user in unsubscribed_users:
|
for user in unsubscribed_users:
|
||||||
print(user)
|
print(user)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user