extensions-website/utils.py

182 lines
5.3 KiB
Python

from typing import Optional
import datetime
import itertools
import logging
import re
import time
from urllib.parse import (
parse_qsl,
ParseResult,
unquote_to_bytes,
urlencode as urllib_urlencode,
)
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.core.validators import validate_ipv46_address
from django.http import HttpRequest
from django.http.response import HttpResponseRedirectBase
from django.utils.encoding import force_bytes, force_str
from django.utils.http import _urlparse
import django.utils.text
User = get_user_model()
log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
IPV4_WITH_PORT = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):[0-9]+")
"""Regexp matching an IPv4 address with a port number."""
IPV6_WITH_PORT = re.compile(r"\[([0-9:]+)\]:[0-9]+")
"""Regexp matching an IPv6 address with a port number."""
def urlencode(items):
"""A Unicode-safe URLencoder."""
try:
return urllib_urlencode(items)
except UnicodeEncodeError:
return urllib_urlencode([(k, force_bytes(v)) for k, v in items])
def utc_millesecs_from_epoch(for_datetime=None):
"""
Returns millesconds from the Unix epoch in UTC.
If `for_datetime` is None, the current datetime will be used.
"""
if not for_datetime:
for_datetime = datetime.datetime.now()
# Number of seconds.
seconds = time.mktime(for_datetime.utctimetuple())
# timetuple() doesn't care about more precision than seconds, but we do.
# Add microseconds as a fraction of a second to keep the precision.
seconds += for_datetime.microsecond / 1000000.0
# Now convert to milliseconds.
return int(seconds * 1000)
def slugify(s: str):
"""Convert a given string to a URL slug.
Do it the same way Django does it, but replace underscores with dashes first.
"""
return django.utils.text.slugify(s.replace('_', '-'))
def urlparams(url_, hash=None, **query):
"""
Add a fragment and/or query parameters to a URL.
New query params will be appended to existing parameters, except duplicate
names, which will be replaced.
"""
url = _urlparse(force_str(url_))
fragment = hash if hash is not None else url.fragment
# Use dict(parse_qsl) so we don't get lists of values.
query_dict = dict(parse_qsl(force_str(url.query))) if url.query else {}
query_dict.update((k, force_bytes(v) if v is not None else v) for k, v in query.items())
query_string = urlencode(
[(k, unquote_to_bytes(v)) for k, v in query_dict.items() if v is not None]
)
result = ParseResult(url.scheme, url.netloc, url.path, url.params, query_string, fragment)
return result.geturl()
def send_mail(*args, **kwargs):
"""A wrapper around django.core.mail.EmailMessage."""
pass # TODO implement send_mail
def chunked(seq, n):
"""
Yield successive n-sized chunks from seq.
>>> for group in chunked(range(8), 3):
... print group
[0, 1, 2]
[3, 4, 5]
[6, 7]
"""
seq = iter(seq)
while True:
rv = list(itertools.islice(seq, 0, n))
if not rv:
break
yield rv
class HttpResponseTemporaryRedirect(HttpResponseRedirectBase):
"""Similar to HTTP 302 but keeps the request method and body so we can redirect POSTs too."""
status_code = 307
def clean_ip_address(request: HttpRequest) -> str:
"""Retrieve a valid IP address from the given request.
Raises a django.code.exceptions.ValidationError
if no valid IP address could be determined.
"""
ip_address = get_client_ip(request)
validate_ipv46_address(ip_address)
return ip_address
def get_client_ip(request: HttpRequest) -> str:
"""Returns the IP of the request, accounting for the possibility of being
behind a proxy.
"""
x_forwarded_for: Optional[str] = request.META.get('HTTP_X_FORWARDED_FOR', None)
if x_forwarded_for:
# X_FORWARDED_FOR returns client1, proxy1, proxy2,...
remote_addr = x_forwarded_for.split(', ', 1)[0].strip()
ip_address = _remove_port_nr(remote_addr)
try:
# X_FORWARDED_FOR can contain bogus, only use it if it's valid
validate_ipv46_address(ip_address)
return ip_address
except ValidationError:
logger.warning('Unable to parse X-Forwarded-For %s', x_forwarded_for)
remote_addr = request.META.get('REMOTE_ADDR', '')
if not remote_addr:
return ''
# REMOTE_ADDR can also be 'ip1,ip2' if people mess around with HTTP
# headers (we've seen this happen). Don't trust anything in that case.
if ',' in remote_addr:
return ''
return _remove_port_nr(remote_addr)
def _remove_port_nr(remote_addr: str) -> str:
# Occasionally the port number is included in REMOTE_ADDR.
# This needs to be filtered out so that downstream requests
# can just interpret the value as actual IP address.
if len(remote_addr) > 128:
# Prevent DoS attacks by not allowing obvious nonsense.
return ''
if ':' not in remote_addr:
return remote_addr
ipv4_with_port_match = IPV4_WITH_PORT.match(remote_addr)
if ipv4_with_port_match:
return ipv4_with_port_match.group(1)
ipv6_with_port_match = IPV6_WITH_PORT.match(remote_addr)
if ipv6_with_port_match:
return ipv6_with_port_match.group(1)
return remote_addr