Adding Token generation for users on /tokens
This commit is contained in:
parent
38b4d01581
commit
a45a4b491e
@ -1,12 +1,50 @@
|
||||
from eve import Eve
|
||||
from eve.auth import TokenAuth
|
||||
|
||||
import random
|
||||
import string
|
||||
|
||||
from eve.auth import TokenAuth
|
||||
from eve.auth import BasicAuth
|
||||
from eve.io.mongo import Validator
|
||||
from bson import ObjectId
|
||||
|
||||
|
||||
class TokensAuth(TokenAuth):
|
||||
def check_auth(self, token, allowed_roles, resource, method):
|
||||
tokens = app.data.driver.db['tokens']
|
||||
lookup = {'token': token}
|
||||
token = tokens.find_one(lookup)
|
||||
if not token:
|
||||
return False
|
||||
users = app.data.driver.db['users']
|
||||
lookup = {'firstname': token['username']}
|
||||
if allowed_roles:
|
||||
lookup['role'] = {'$in': allowed_roles}
|
||||
user = users.find_one(lookup)
|
||||
if not user:
|
||||
return False
|
||||
return token
|
||||
|
||||
class BasicsAuth(BasicAuth):
|
||||
def check_auth(self, username, password, allowed_roles, resource, method):
|
||||
return username == 'admin' and password == 'secret'
|
||||
|
||||
|
||||
class MyTokenAuth(BasicsAuth):
|
||||
def __init__(self):
|
||||
self.token_auth = TokensAuth()
|
||||
self.authorized_protected = BasicsAuth.authorized
|
||||
|
||||
def authorized(self, allowed_roles, resource, method):
|
||||
if resource=='tokens':
|
||||
return self.authorized_protected(self, allowed_roles, resource, method)
|
||||
else:
|
||||
return self.token_auth.authorized(allowed_roles, resource, method)
|
||||
|
||||
def authorized_protected(self):
|
||||
pass
|
||||
|
||||
|
||||
class ValidateCustomFields(Validator):
|
||||
def _validate_valid_properties(self, valid_properties, field, value):
|
||||
node_types = app.data.driver.db['node_types']
|
||||
@ -22,23 +60,14 @@ class ValidateCustomFields(Validator):
|
||||
self._error(field, "Must be hi")
|
||||
|
||||
|
||||
class RolesAuth(TokenAuth):
|
||||
def check_auth(self, token, allowed_roles, resource, method):
|
||||
accounts = app.data.driver.db['users']
|
||||
lookup = {'token': token}
|
||||
if allowed_roles:
|
||||
lookup['role'] = {'$in': allowed_roles}
|
||||
account = accounts.find_one(lookup)
|
||||
return account
|
||||
|
||||
|
||||
def add_token(documents):
|
||||
# Don't use this in production:
|
||||
# You should at least make sure that the token is unique.
|
||||
# print ("Adding Token")
|
||||
for document in documents:
|
||||
document["token"] = (''.join(random.choice(string.ascii_uppercase)
|
||||
for x in range(10)))
|
||||
|
||||
app = Eve(validator=ValidateCustomFields, auth=RolesAuth)
|
||||
app.on_insert_users += add_token
|
||||
|
||||
app = Eve(validator=ValidateCustomFields, auth=MyTokenAuth)
|
||||
app.on_insert_tokens += add_token
|
||||
|
@ -1,5 +1,6 @@
|
||||
import os
|
||||
|
||||
from authentication import RolesAuth
|
||||
|
||||
# Enable reads (GET), inserts (POST) and DELETE for resources/collections
|
||||
# (if you omit this line, the API will default to ['GET'] and provide
|
||||
@ -12,8 +13,6 @@ ITEM_METHODS = ['GET', 'PATCH', 'PUT', 'DELETE']
|
||||
|
||||
|
||||
users_schema = {
|
||||
# Schema definition, based on Cerberus grammar. Check the Cerberus project
|
||||
# (https://github.com/nicolaiarocci/cerberus) for details.
|
||||
'firstname': {
|
||||
'type': 'string',
|
||||
'minlength': 1,
|
||||
@ -23,13 +22,7 @@ users_schema = {
|
||||
'type': 'string',
|
||||
'minlength': 1,
|
||||
'maxlength': 15,
|
||||
'required': True,
|
||||
# talk about hard constraints! For the purpose of the demo
|
||||
# 'lastname' is an API entry-point, so we need it to be unique.
|
||||
'unique': True,
|
||||
},
|
||||
# 'role' is a list, and can only contain values from 'allowed'.
|
||||
# changed to string
|
||||
'role': {
|
||||
'type': 'string',
|
||||
'allowed': ["author", "contributor", "copy"],
|
||||
@ -102,6 +95,17 @@ node_types_schema = {
|
||||
}
|
||||
|
||||
|
||||
tokens_schema = {
|
||||
'username': {
|
||||
'type': 'string',
|
||||
'required': True,
|
||||
},
|
||||
'token': {
|
||||
'type': 'string',
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
nodes = {
|
||||
# We choose to override global cache-control directives for this resource.
|
||||
'cache_control': 'max-age=10,must-revalidate',
|
||||
@ -143,25 +147,32 @@ users = {
|
||||
# most global settings can be overridden at resource level
|
||||
'resource_methods': ['GET', 'POST'],
|
||||
|
||||
# Allow 'token' to be returned with POST responses
|
||||
'extra_response_fields': ['token'],
|
||||
|
||||
'public_methods': ['GET', 'POST'],
|
||||
# 'public_item_methods': ['GET'],
|
||||
|
||||
|
||||
'schema': users_schema
|
||||
}
|
||||
|
||||
tokens = {
|
||||
'resource_methods': ['POST'],
|
||||
|
||||
# Allow 'token' to be returned with POST responses
|
||||
'extra_response_fields': ['token'],
|
||||
|
||||
'schema' : tokens_schema
|
||||
}
|
||||
|
||||
DOMAIN = {
|
||||
'users': users,
|
||||
'nodes' : nodes,
|
||||
'node_types': node_types,
|
||||
'tokens': tokens,
|
||||
}
|
||||
|
||||
try:
|
||||
os.environ['TEST_ATTRACT']
|
||||
#print ("Using attract_test database")
|
||||
MONGO_DBNAME = 'attract_test'
|
||||
except:
|
||||
pass
|
||||
|
@ -4,10 +4,17 @@ import unittest
|
||||
from pymongo import MongoClient
|
||||
from bson import ObjectId
|
||||
from datetime import datetime
|
||||
import base64
|
||||
|
||||
|
||||
class AttractTestCase(unittest.TestCase):
|
||||
|
||||
|
||||
def encodeUsrPass(self, user, password):
|
||||
usrPass = "{0}:{1}".format(user, password)
|
||||
b64Val = base64.b64encode(usrPass)
|
||||
return b64Val
|
||||
|
||||
def addUser(self, firstname, lastname, role):
|
||||
return self.app.post('/users', data=dict(
|
||||
firstname=firstname,
|
||||
@ -46,6 +53,21 @@ class AttractTestCase(unittest.TestCase):
|
||||
headers=headers,
|
||||
follow_redirects=True)
|
||||
|
||||
def login(self, username, password):
|
||||
headers = {
|
||||
'content-type': 'application/json',
|
||||
'Authorization': 'Basic {0}'.format(
|
||||
self.encodeUsrPass(username, password))
|
||||
}
|
||||
data = {
|
||||
'username': username,
|
||||
}
|
||||
return self.app.post(
|
||||
'/tokens',
|
||||
data=json.dumps(data),
|
||||
headers=headers,
|
||||
follow_redirects=True)
|
||||
|
||||
def logout(self):
|
||||
return self.app.get('/logout', follow_redirects=True)
|
||||
|
||||
@ -72,6 +94,11 @@ class AttractTestCase(unittest.TestCase):
|
||||
rv = self.addNode('Shot01', '55016a52135d32466fc800be', properties)
|
||||
assert 201 == rv.status_code
|
||||
|
||||
def test_login(self):
|
||||
rv = self.login('admin', 'secret')
|
||||
#print (rv.data)
|
||||
assert 201 == rv.status_code
|
||||
|
||||
def test_empty_db(self):
|
||||
rv = self.app.get('/')
|
||||
assert 401 == rv.status_code
|
||||
@ -93,7 +120,6 @@ class AttractTestCase(unittest.TestCase):
|
||||
"_updated": datetime.now(),
|
||||
"firstname": "TestFirstname",
|
||||
"lastname": "TestLastname",
|
||||
"token": "ANLGNSIEZJ",
|
||||
"role": "author",
|
||||
"_created": datetime.now(),
|
||||
"_etag": "302236e27f51d2e26041ae9de49505d77332b260"
|
||||
@ -108,8 +134,16 @@ class AttractTestCase(unittest.TestCase):
|
||||
"_etag": "0ea3c4f684a0cda85525184d5606c4f4ce6ac5f5"
|
||||
}
|
||||
|
||||
test_token = {
|
||||
"-id": ObjectId("5502f289135d3274cb658ba7"),
|
||||
"username": "TestFirstname",
|
||||
"token": "ANLGNSIEZJ",
|
||||
"_etag": "1e96ed46b133b7ede5ce6ef0d6d4fc53edd9f2ba"
|
||||
}
|
||||
|
||||
db.users.insert(test_user)
|
||||
db.node_types.insert(test_node_type)
|
||||
db.tokens.insert(test_token)
|
||||
|
||||
# Initialize Attract
|
||||
os.environ['TEST_ATTRACT'] = '1'
|
||||
|
Loading…
x
Reference in New Issue
Block a user