From ad9a981cda7b487119de82c10a3df671fe6459d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 12 May 2017 13:37:12 +0200 Subject: [PATCH] Added p.a.users.add_user_to_group() function --- pillar/api/users/__init__.py | 37 ++++++++++++++++++++++++++ pillar/tests/__init__.py | 5 ++++ tests/test_api/test_users.py | 50 ++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 tests/test_api/test_users.py diff --git a/pillar/api/users/__init__.py b/pillar/api/users/__init__.py index 15289df8..78c16b11 100644 --- a/pillar/api/users/__init__.py +++ b/pillar/api/users/__init__.py @@ -1,6 +1,43 @@ +import logging + +import bson +from flask import current_app + from . import hooks from .routes import blueprint_api +log = logging.getLogger(__name__) + + +def add_user_to_group(user_id: bson.ObjectId, group_id: bson.ObjectId): + """Makes the user member of the given group. + + Directly uses MongoDB, so that it doesn't require any special permissions. + """ + + from pymongo.results import UpdateResult + + assert isinstance(user_id, bson.ObjectId) + assert isinstance(group_id, bson.ObjectId) + + log.info('Adding user %s to group %s', user_id, group_id) + + users_coll = current_app.db('users') + db_user = users_coll.find_one(user_id, projection={'groups': 1}) + if db_user is None: + raise ValueError('user %s not found', user_id, group_id) + + groups = set(db_user.get('groups', [])) + groups.add(group_id) + + # Sort the groups so that we have predictable, repeatable results. + result: UpdateResult = users_coll.update_one( + {'_id': db_user['_id']}, + {'$set': {'groups': sorted(groups)}}) + + if result.matched_count == 0: + raise ValueError('Unable to add user %s to group %s; user not found.') + def setup_app(app, api_prefix): app.on_pre_GET_users += hooks.check_user_access diff --git a/pillar/tests/__init__.py b/pillar/tests/__init__.py index ab5b0edf..601e58e9 100644 --- a/pillar/tests/__init__.py +++ b/pillar/tests/__init__.py @@ -359,6 +359,11 @@ class AbstractPillarTest(TestMinimal): proj_coll = self.app.db()['projects'] return proj_coll.find_one(project_id) + def fetch_user_from_db(self, user_id=ctd.EXAMPLE_USER['_id']): + with self.app.app_context(): + users_coll = self.app.db('users') + return users_coll.find_one(user_id) + @staticmethod def join_url_params(params): """Constructs a query string from a dictionary and appends it to a url. diff --git a/tests/test_api/test_users.py b/tests/test_api/test_users.py new file mode 100644 index 00000000..6010dfbb --- /dev/null +++ b/tests/test_api/test_users.py @@ -0,0 +1,50 @@ +import bson + +from pillar.tests import AbstractPillarTest + + +class UsersTest(AbstractPillarTest): + def setUp(self, **kwargs): + super().setUp(**kwargs) + + self.group_names = [f'test{num}' for num in range(10)] + self.group_map = self.create_standard_groups(additional_groups=self.group_names) + + def test_add_user_to_group_happy(self): + from pillar.api import users + + user_id = bson.ObjectId(24 * '1') + + self.create_user(user_id, roles={'subscriber'}, groups=[self.group_map['subscriber']]) + + db_user = self.fetch_user_from_db(user_id) + self.assertEqual([self.group_map['subscriber']], db_user['groups']) + + with self.app.test_request_context(): + users.add_user_to_group(user_id, self.group_map['test1']) + + db_user = self.fetch_user_from_db(user_id) + self.assertEqual([ + self.group_map['subscriber'], + self.group_map['test1'], + ], db_user['groups']) + + def test_add_user_to_group_no_initial_groups(self): + from pillar.api import users + + user_id = bson.ObjectId(24 * '1') + self.create_user(user_id, roles=set()) + + with self.app.test_request_context(): + # Ensure the user doesn't even have a 'groups' property. + users_coll = self.app.db('users') + users_coll.update_one({'_id': user_id}, {'$unset': {'groups': 1}}) + db_user = self.fetch_user_from_db(user_id) + self.assertNotIn('groups', db_user) + + users.add_user_to_group(user_id, self.group_map['test1']) + + db_user = self.fetch_user_from_db(user_id) + self.assertEqual([ + self.group_map['test1'], + ], db_user['groups'])