Reworked subscription reconciliation to use Blender ID instead of Store
This commit is contained in:
88
cloud/cli.py
88
cloud/cli.py
@@ -1,8 +1,11 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from flask_script import Manager
|
from flask_script import Manager
|
||||||
|
import requests
|
||||||
|
|
||||||
from pillar.cli import manager
|
from pillar.cli import manager
|
||||||
from pillar.api import service
|
from pillar.api import service
|
||||||
@@ -39,39 +42,76 @@ def reconcile_subscribers():
|
|||||||
|
|
||||||
import threading
|
import threading
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
from pillar.api.blender_cloud.subscription import fetch_subscription_info
|
|
||||||
|
from pillar.auth import UserClass
|
||||||
|
from pillar.api.blender_cloud.subscription import do_update_subscription
|
||||||
|
|
||||||
|
sessions = threading.local()
|
||||||
|
|
||||||
service.fetch_role_to_group_id_map()
|
service.fetch_role_to_group_id_map()
|
||||||
|
|
||||||
users_coll = current_app.data.driver.db['users']
|
users_coll = current_app.data.driver.db['users']
|
||||||
unsubscribed_users = []
|
found = users_coll.find({'auth.provider': 'blender-id'})
|
||||||
found = users_coll.find({'roles': 'subscriber'})
|
count_users = found.count()
|
||||||
user_count = found.count()
|
count_skipped = count_processed = 0
|
||||||
log.info('Processing %i users', user_count)
|
log.info('Processing %i users', count_users)
|
||||||
|
|
||||||
lock = threading.Lock()
|
lock = threading.Lock()
|
||||||
|
|
||||||
real_current_app = current_app._get_current_object()
|
real_current_app = current_app._get_current_object()
|
||||||
|
|
||||||
|
api_token = real_current_app.config['BLENDER_ID_USER_INFO_TOKEN']
|
||||||
|
api_url = real_current_app.config['BLENDER_ID_USER_INFO_API']
|
||||||
|
|
||||||
def do_user(idx, user):
|
def do_user(idx, user):
|
||||||
log.info('Processing %i/%i %s', idx + 1, user_count, user['email'])
|
nonlocal count_skipped, count_processed
|
||||||
|
|
||||||
with real_current_app.app_context():
|
log.info('Processing %i/%i %s', idx + 1, count_users, user['email'])
|
||||||
user_store = fetch_subscription_info(user['email'])
|
|
||||||
|
|
||||||
if not user_store:
|
# Get the Requests session for this thread.
|
||||||
log.error('Unable to reach store, aborting')
|
try:
|
||||||
return
|
sess = sessions.session
|
||||||
|
except AttributeError:
|
||||||
if not user_store or user_store['cloud_access'] == 0:
|
sess = sessions.session = requests.Session()
|
||||||
action = 'revoke'
|
|
||||||
with lock:
|
|
||||||
unsubscribed_users.append(user['email'])
|
|
||||||
else:
|
|
||||||
action = 'grant'
|
|
||||||
|
|
||||||
|
# Get the info from Blender ID
|
||||||
|
bid_user_ids = [auth['user_id']
|
||||||
|
for auth in user['auth']
|
||||||
|
if auth['provider'] == 'blender-id']
|
||||||
|
if not bid_user_ids:
|
||||||
with lock:
|
with lock:
|
||||||
service.do_badger(action, role='subscriber', user_id=user['_id'])
|
count_skipped += 1
|
||||||
|
return
|
||||||
|
bid_user_id = bid_user_ids[0]
|
||||||
|
|
||||||
|
url = urljoin(api_url, bid_user_id)
|
||||||
|
resp = sess.get(url, headers={'Authorization': f'Bearer {api_token}'})
|
||||||
|
|
||||||
|
if resp.status_code == 404:
|
||||||
|
log.info('User %s with Blender ID %s not found, skipping', user['email'], bid_user_id)
|
||||||
|
with lock:
|
||||||
|
count_skipped += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
if resp.status_code != 200:
|
||||||
|
log.error('Unable to reach Blender ID (code %d), aborting', resp.status_code)
|
||||||
|
with lock:
|
||||||
|
count_skipped += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
bid_user = resp.json()
|
||||||
|
if not bid_user:
|
||||||
|
log.error('Unable to parse response for user %s, aborting', user['email'])
|
||||||
|
with lock:
|
||||||
|
count_skipped += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
# Actually update the user, and do it thread-safe just to be sure.
|
||||||
|
with real_current_app.app_context():
|
||||||
|
local_user = UserClass.construct('', user)
|
||||||
|
with lock:
|
||||||
|
do_update_subscription(local_user, bid_user)
|
||||||
|
count_processed += 1
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
|
||||||
future_to_user = {executor.submit(do_user, idx, user): user
|
future_to_user = {executor.submit(do_user, idx, user): user
|
||||||
@@ -83,13 +123,9 @@ def reconcile_subscribers():
|
|||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
log.exception('Error updating user %s', user)
|
log.exception('Error updating user %s', user)
|
||||||
|
|
||||||
if not unsubscribed_users:
|
log.info('Done reconciling %d subscribers', count_users)
|
||||||
log.info('No unsubscribed users')
|
log.info(' processed: %d', count_processed)
|
||||||
return
|
log.info(' skipped : %d', count_skipped)
|
||||||
|
|
||||||
print('The following %i users have been unsubscribed' % len(unsubscribed_users))
|
|
||||||
for user in unsubscribed_users:
|
|
||||||
print(user)
|
|
||||||
|
|
||||||
|
|
||||||
manager.add_command("cloud", manager_cloud)
|
manager.add_command("cloud", manager_cloud)
|
||||||
|
Reference in New Issue
Block a user