Added p.a.users.add_user_to_group() function
This commit is contained in:
parent
7c5aef033d
commit
ad9a981cda
@ -1,6 +1,43 @@
|
||||
import logging
|
||||
|
||||
import bson
|
||||
from flask import current_app
|
||||
|
||||
from . import hooks
|
||||
from .routes import blueprint_api
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def add_user_to_group(user_id: bson.ObjectId, group_id: bson.ObjectId):
|
||||
"""Makes the user member of the given group.
|
||||
|
||||
Directly uses MongoDB, so that it doesn't require any special permissions.
|
||||
"""
|
||||
|
||||
from pymongo.results import UpdateResult
|
||||
|
||||
assert isinstance(user_id, bson.ObjectId)
|
||||
assert isinstance(group_id, bson.ObjectId)
|
||||
|
||||
log.info('Adding user %s to group %s', user_id, group_id)
|
||||
|
||||
users_coll = current_app.db('users')
|
||||
db_user = users_coll.find_one(user_id, projection={'groups': 1})
|
||||
if db_user is None:
|
||||
raise ValueError('user %s not found', user_id, group_id)
|
||||
|
||||
groups = set(db_user.get('groups', []))
|
||||
groups.add(group_id)
|
||||
|
||||
# Sort the groups so that we have predictable, repeatable results.
|
||||
result: UpdateResult = users_coll.update_one(
|
||||
{'_id': db_user['_id']},
|
||||
{'$set': {'groups': sorted(groups)}})
|
||||
|
||||
if result.matched_count == 0:
|
||||
raise ValueError('Unable to add user %s to group %s; user not found.')
|
||||
|
||||
|
||||
def setup_app(app, api_prefix):
|
||||
app.on_pre_GET_users += hooks.check_user_access
|
||||
|
@ -359,6 +359,11 @@ class AbstractPillarTest(TestMinimal):
|
||||
proj_coll = self.app.db()['projects']
|
||||
return proj_coll.find_one(project_id)
|
||||
|
||||
def fetch_user_from_db(self, user_id=ctd.EXAMPLE_USER['_id']):
|
||||
with self.app.app_context():
|
||||
users_coll = self.app.db('users')
|
||||
return users_coll.find_one(user_id)
|
||||
|
||||
@staticmethod
|
||||
def join_url_params(params):
|
||||
"""Constructs a query string from a dictionary and appends it to a url.
|
||||
|
50
tests/test_api/test_users.py
Normal file
50
tests/test_api/test_users.py
Normal file
@ -0,0 +1,50 @@
|
||||
import bson
|
||||
|
||||
from pillar.tests import AbstractPillarTest
|
||||
|
||||
|
||||
class UsersTest(AbstractPillarTest):
|
||||
def setUp(self, **kwargs):
|
||||
super().setUp(**kwargs)
|
||||
|
||||
self.group_names = [f'test{num}' for num in range(10)]
|
||||
self.group_map = self.create_standard_groups(additional_groups=self.group_names)
|
||||
|
||||
def test_add_user_to_group_happy(self):
|
||||
from pillar.api import users
|
||||
|
||||
user_id = bson.ObjectId(24 * '1')
|
||||
|
||||
self.create_user(user_id, roles={'subscriber'}, groups=[self.group_map['subscriber']])
|
||||
|
||||
db_user = self.fetch_user_from_db(user_id)
|
||||
self.assertEqual([self.group_map['subscriber']], db_user['groups'])
|
||||
|
||||
with self.app.test_request_context():
|
||||
users.add_user_to_group(user_id, self.group_map['test1'])
|
||||
|
||||
db_user = self.fetch_user_from_db(user_id)
|
||||
self.assertEqual([
|
||||
self.group_map['subscriber'],
|
||||
self.group_map['test1'],
|
||||
], db_user['groups'])
|
||||
|
||||
def test_add_user_to_group_no_initial_groups(self):
|
||||
from pillar.api import users
|
||||
|
||||
user_id = bson.ObjectId(24 * '1')
|
||||
self.create_user(user_id, roles=set())
|
||||
|
||||
with self.app.test_request_context():
|
||||
# Ensure the user doesn't even have a 'groups' property.
|
||||
users_coll = self.app.db('users')
|
||||
users_coll.update_one({'_id': user_id}, {'$unset': {'groups': 1}})
|
||||
db_user = self.fetch_user_from_db(user_id)
|
||||
self.assertNotIn('groups', db_user)
|
||||
|
||||
users.add_user_to_group(user_id, self.group_map['test1'])
|
||||
|
||||
db_user = self.fetch_user_from_db(user_id)
|
||||
self.assertEqual([
|
||||
self.group_map['test1'],
|
||||
], db_user['groups'])
|
Loading…
x
Reference in New Issue
Block a user