108 lines
4.5 KiB
Python
108 lines
4.5 KiB
Python
# ##### BEGIN GPL LICENSE BLOCK #####
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU General Public License
|
|
# as published by the Free Software Foundation; either version 2
|
|
# of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
#
|
|
# ##### END GPL LICENSE BLOCK #####
|
|
|
|
|
|
import json
|
|
import webbrowser
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from urllib.parse import parse_qs, quote as urlquote, urlparse
|
|
|
|
import requests
|
|
|
|
|
|
class SimpleOAuthAuthenticator(object):
|
|
def __init__(self, server_url, client_id, ports):
|
|
self.server_url = server_url
|
|
self.client_id = client_id
|
|
self.ports = ports
|
|
|
|
def _get_tokens(self, authorization_code=None, refresh_token=None, grant_type="authorization_code"):
|
|
data = {
|
|
"grant_type": grant_type,
|
|
"state": "random_state_string",
|
|
"client_id": self.client_id,
|
|
"scopes": "read write",
|
|
}
|
|
if hasattr(self, 'redirect_uri'):
|
|
data["redirect_uri"] = self.redirect_uri
|
|
if authorization_code:
|
|
data['code'] = authorization_code
|
|
if refresh_token:
|
|
data['refresh_token'] = refresh_token
|
|
|
|
response = requests.post(
|
|
'%s/o/token/' % self.server_url,
|
|
data=data
|
|
)
|
|
if response.status_code != 200:
|
|
print("error retrieving refresh tokens %s" % response.status_code)
|
|
print(response.content)
|
|
return None, None, None
|
|
|
|
response_json = json.loads(response.content)
|
|
refresh_token = response_json ['refresh_token']
|
|
access_token = response_json ['access_token']
|
|
return access_token, refresh_token, response_json
|
|
|
|
def get_new_token(self, register=True, redirect_url=None):
|
|
class HTTPServerHandler(BaseHTTPRequestHandler):
|
|
html_template = '<html>%(head)s<h1>%(message)s</h1></html>'
|
|
def do_GET(self):
|
|
self.send_response(200)
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
if 'code' in self.path:
|
|
self.auth_code = self.path.split('=')[1]
|
|
# Display to the user that they no longer need the browser window
|
|
if redirect_url:
|
|
redirect_string = (
|
|
'<head><meta http-equiv="refresh" content="0;url=%(redirect_url)s"></head>'
|
|
'<script> window.location.href="%(redirect_url)s"; </script>' % {'redirect_url': redirect_url}
|
|
)
|
|
else:
|
|
redirect_string = ""
|
|
self.wfile.write(bytes(self.html_template % {'head': redirect_string, 'message': 'You may now close this window.'}, 'utf-8'))
|
|
qs = parse_qs(urlparse(self.path).query)
|
|
self.server.authorization_code = qs['code'][0]
|
|
else:
|
|
self.wfile.write(bytes(self.html_template % {'head': '', 'message': 'Authorization failed.'}, 'utf-8'))
|
|
|
|
for port in self.ports:
|
|
try:
|
|
httpServer = HTTPServer(('localhost', port), HTTPServerHandler)
|
|
except OSError:
|
|
continue
|
|
break
|
|
self.redirect_uri = "http://localhost:%s/consumer/exchange/" % port
|
|
authorize_url = (
|
|
"/o/authorize?client_id=%s&state=random_state_string&response_type=code&"
|
|
"redirect_uri=%s" % (self.client_id, self.redirect_uri)
|
|
)
|
|
if register:
|
|
authorize_url = "%s/accounts/register/?next=%s" % (self.server_url, urlquote(authorize_url))
|
|
else:
|
|
authorize_url = "%s%s" % (self.server_url, authorize_url)
|
|
webbrowser.open_new(authorize_url)
|
|
|
|
httpServer.handle_request()
|
|
authorization_code = httpServer.authorization_code
|
|
return self._get_tokens(authorization_code=authorization_code)
|
|
|
|
def get_refreshed_token(self, refresh_token):
|
|
return self._get_tokens(refresh_token=refresh_token, grant_type="refresh_token")
|