192 lines
5.8 KiB
Python
192 lines
5.8 KiB
Python
from typing import Optional
|
|
from urllib.parse import urljoin
|
|
import datetime
|
|
import itertools
|
|
import logging
|
|
import re
|
|
import time
|
|
|
|
from urllib.parse import (
|
|
parse_qsl,
|
|
ParseResult,
|
|
unquote_to_bytes,
|
|
urlencode as urllib_urlencode,
|
|
)
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import validate_ipv46_address
|
|
from django.http import HttpRequest
|
|
from django.http.response import HttpResponseRedirectBase
|
|
from django.utils.encoding import force_bytes, force_str
|
|
from django.utils.http import _urlparse
|
|
import django.utils.text
|
|
|
|
|
|
User = get_user_model()
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
IPV4_WITH_PORT = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):[0-9]+")
|
|
"""Regexp matching an IPv4 address with a port number."""
|
|
|
|
IPV6_WITH_PORT = re.compile(r"\[([0-9:]+)\]:[0-9]+")
|
|
"""Regexp matching an IPv6 address with a port number."""
|
|
|
|
|
|
def urlencode(items):
|
|
"""A Unicode-safe URLencoder."""
|
|
try:
|
|
return urllib_urlencode(items)
|
|
except UnicodeEncodeError:
|
|
return urllib_urlencode([(k, force_bytes(v)) for k, v in items])
|
|
|
|
|
|
def utc_millesecs_from_epoch(for_datetime=None):
|
|
"""Returns millesconds from the Unix epoch in UTC.
|
|
|
|
If `for_datetime` is None, the current datetime will be used.
|
|
"""
|
|
if not for_datetime:
|
|
for_datetime = datetime.datetime.now()
|
|
# Number of seconds.
|
|
seconds = time.mktime(for_datetime.utctimetuple())
|
|
# timetuple() doesn't care about more precision than seconds, but we do.
|
|
# Add microseconds as a fraction of a second to keep the precision.
|
|
seconds += for_datetime.microsecond / 1000000.0
|
|
# Now convert to milliseconds.
|
|
return int(seconds * 1000)
|
|
|
|
|
|
def slugify(s: str):
|
|
"""Convert a given string to a URL slug.
|
|
|
|
Do it the same way Django does it, but replace underscores with dashes first.
|
|
"""
|
|
return django.utils.text.slugify(s.replace('_', '-'))
|
|
|
|
|
|
def urlparams(url_, hash=None, **query):
|
|
"""Add a fragment and/or query parameters to a URL.
|
|
|
|
New query params will be appended to existing parameters, except duplicate
|
|
names, which will be replaced.
|
|
"""
|
|
url = _urlparse(force_str(url_))
|
|
|
|
fragment = hash if hash is not None else url.fragment
|
|
|
|
# Use dict(parse_qsl) so we don't get lists of values.
|
|
query_dict = dict(parse_qsl(force_str(url.query))) if url.query else {}
|
|
query_dict.update((k, force_bytes(v) if v is not None else v) for k, v in query.items())
|
|
query_string = urlencode(
|
|
[(k, unquote_to_bytes(v)) for k, v in query_dict.items() if v is not None]
|
|
)
|
|
result = ParseResult(url.scheme, url.netloc, url.path, url.params, query_string, fragment)
|
|
return result.geturl()
|
|
|
|
|
|
def send_mail(*args, **kwargs):
|
|
"""A wrapper around django.core.mail.EmailMessage."""
|
|
pass # TODO implement send_mail
|
|
|
|
|
|
def chunked(seq, n):
|
|
"""Yield successive n-sized chunks from seq.
|
|
|
|
>>> for group in chunked(range(8), 3):
|
|
... print group
|
|
[0, 1, 2]
|
|
[3, 4, 5]
|
|
[6, 7]
|
|
"""
|
|
seq = iter(seq)
|
|
while True:
|
|
rv = list(itertools.islice(seq, 0, n))
|
|
if not rv:
|
|
break
|
|
yield rv
|
|
|
|
|
|
class HttpResponseTemporaryRedirect(HttpResponseRedirectBase):
|
|
"""Similar to HTTP 302 but keeps the request method and body so we can redirect POSTs too."""
|
|
|
|
status_code = 307
|
|
|
|
|
|
def clean_ip_address(request: HttpRequest) -> str:
|
|
"""Retrieve a valid IP address from the given request.
|
|
|
|
Raises a django.code.exceptions.ValidationError
|
|
if no valid IP address could be determined.
|
|
"""
|
|
ip_address = get_client_ip(request)
|
|
validate_ipv46_address(ip_address)
|
|
return ip_address
|
|
|
|
|
|
def get_client_ip(request: HttpRequest) -> str:
|
|
"""Returns the IP of the request, accounting for the possibility of being
|
|
behind a proxy.
|
|
"""
|
|
x_forwarded_for: Optional[str] = request.META.get('HTTP_X_FORWARDED_FOR', None)
|
|
if x_forwarded_for:
|
|
# X_FORWARDED_FOR returns client1, proxy1, proxy2,...
|
|
remote_addr = x_forwarded_for.split(', ', 1)[0].strip()
|
|
ip_address = _remove_port_nr(remote_addr)
|
|
try:
|
|
# X_FORWARDED_FOR can contain bogus, only use it if it's valid
|
|
validate_ipv46_address(ip_address)
|
|
return ip_address
|
|
except ValidationError:
|
|
logger.warning('Unable to parse X-Forwarded-For %s', x_forwarded_for)
|
|
|
|
remote_addr = request.META.get('REMOTE_ADDR', '')
|
|
if not remote_addr:
|
|
return ''
|
|
|
|
# REMOTE_ADDR can also be 'ip1,ip2' if people mess around with HTTP
|
|
# headers (we've seen this happen). Don't trust anything in that case.
|
|
if ',' in remote_addr:
|
|
return ''
|
|
|
|
return _remove_port_nr(remote_addr)
|
|
|
|
|
|
def _remove_port_nr(remote_addr: str) -> str:
|
|
# Occasionally the port number is included in REMOTE_ADDR.
|
|
# This needs to be filtered out so that downstream requests
|
|
# can just interpret the value as actual IP address.
|
|
|
|
if len(remote_addr) > 128:
|
|
# Prevent DoS attacks by not allowing obvious nonsense.
|
|
return ''
|
|
|
|
if ':' not in remote_addr:
|
|
return remote_addr
|
|
|
|
ipv4_with_port_match = IPV4_WITH_PORT.match(remote_addr)
|
|
if ipv4_with_port_match:
|
|
return ipv4_with_port_match.group(1)
|
|
|
|
ipv6_with_port_match = IPV6_WITH_PORT.match(remote_addr)
|
|
if ipv6_with_port_match:
|
|
return ipv6_with_port_match.group(1)
|
|
|
|
return remote_addr
|
|
|
|
|
|
def absolutify(url: str, request=None) -> str:
|
|
"""Return an absolute URL."""
|
|
if url and url.startswith(('http://', 'https://')):
|
|
return url
|
|
|
|
proto = 'http' if settings.DEBUG else 'https'
|
|
domain = get_current_site(request).domain
|
|
return urljoin(f'{proto}://{domain}', url)
|