Anna Sirota
cc900979d6
Based on the proposal #93 Email subjects change to the following: ``` Add-on approved: "Genuinely Disgusting" New comment on Add-on "Genuinely Disgusting" Add-on rated: "Genuinely Disgusting" Add-on reported: "Genuinely Disgusting" Add-on rating reported: "Genuinely Disgusting" Add-on changes requested: "Genuinely Disgusting" Add-on review requested: "Genuinely Disgusting" ``` Notification emails can also be viewed in the admin `Email previews` section (with fake objects constructed and passed into email template contexts). Indicating why the email was sent is not in this change because we currently don't store why **precisely** (can only guess it's either because of `moderators` group or extension follow, the state of either can change after the notification was generated). This can be part of the `Notification` records however, after that the reason can be included in the email. Reviewed-on: #96 Reviewed-by: Oleg-Komarov <oleg-komarov@noreply.localhost>
241 lines
7.1 KiB
Python
241 lines
7.1 KiB
Python
from html.parser import HTMLParser
|
|
from typing import Optional
|
|
from urllib.parse import urljoin
|
|
import datetime
|
|
import itertools
|
|
import logging
|
|
import re
|
|
import time
|
|
|
|
from urllib.parse import (
|
|
parse_qsl,
|
|
ParseResult,
|
|
unquote_to_bytes,
|
|
urlencode as urllib_urlencode,
|
|
)
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.sites.shortcuts import get_current_site
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import validate_ipv46_address
|
|
from django.http import HttpRequest
|
|
from django.http.response import HttpResponseRedirectBase
|
|
from django.urls import reverse
|
|
from django.utils.encoding import force_bytes, force_str
|
|
from django.utils.http import _urlparse
|
|
import django.utils.text
|
|
|
|
|
|
User = get_user_model()
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
IPV4_WITH_PORT = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):[0-9]+")
|
|
"""Regexp matching an IPv4 address with a port number."""
|
|
|
|
IPV6_WITH_PORT = re.compile(r"\[([0-9:]+)\]:[0-9]+")
|
|
"""Regexp matching an IPv6 address with a port number."""
|
|
|
|
|
|
def urlencode(items):
|
|
"""A Unicode-safe URLencoder."""
|
|
try:
|
|
return urllib_urlencode(items)
|
|
except UnicodeEncodeError:
|
|
return urllib_urlencode([(k, force_bytes(v)) for k, v in items])
|
|
|
|
|
|
def utc_millesecs_from_epoch(for_datetime=None):
|
|
"""Returns millesconds from the Unix epoch in UTC.
|
|
|
|
If `for_datetime` is None, the current datetime will be used.
|
|
"""
|
|
if not for_datetime:
|
|
for_datetime = datetime.datetime.now()
|
|
# Number of seconds.
|
|
seconds = time.mktime(for_datetime.utctimetuple())
|
|
# timetuple() doesn't care about more precision than seconds, but we do.
|
|
# Add microseconds as a fraction of a second to keep the precision.
|
|
seconds += for_datetime.microsecond / 1000000.0
|
|
# Now convert to milliseconds.
|
|
return int(seconds * 1000)
|
|
|
|
|
|
def slugify(s: str):
|
|
"""Convert a given string to a URL slug.
|
|
|
|
Do it the same way Django does it, but replace underscores with dashes first.
|
|
"""
|
|
return django.utils.text.slugify(s.replace('_', '-'))
|
|
|
|
|
|
def urlparams(url_, hash=None, **query):
|
|
"""Add a fragment and/or query parameters to a URL.
|
|
|
|
New query params will be appended to existing parameters, except duplicate
|
|
names, which will be replaced.
|
|
"""
|
|
url = _urlparse(force_str(url_))
|
|
|
|
fragment = hash if hash is not None else url.fragment
|
|
|
|
# Use dict(parse_qsl) so we don't get lists of values.
|
|
query_dict = dict(parse_qsl(force_str(url.query))) if url.query else {}
|
|
query_dict.update((k, force_bytes(v) if v is not None else v) for k, v in query.items())
|
|
query_string = urlencode(
|
|
[(k, unquote_to_bytes(v)) for k, v in query_dict.items() if v is not None]
|
|
)
|
|
result = ParseResult(url.scheme, url.netloc, url.path, url.params, query_string, fragment)
|
|
return result.geturl()
|
|
|
|
|
|
def send_mail(*args, **kwargs):
|
|
"""A wrapper around django.core.mail.EmailMessage."""
|
|
pass # TODO implement send_mail
|
|
|
|
|
|
def chunked(seq, n):
|
|
"""Yield successive n-sized chunks from seq.
|
|
|
|
>>> for group in chunked(range(8), 3):
|
|
... print group
|
|
[0, 1, 2]
|
|
[3, 4, 5]
|
|
[6, 7]
|
|
"""
|
|
seq = iter(seq)
|
|
while True:
|
|
rv = list(itertools.islice(seq, 0, n))
|
|
if not rv:
|
|
break
|
|
yield rv
|
|
|
|
|
|
class HttpResponseTemporaryRedirect(HttpResponseRedirectBase):
|
|
"""Similar to HTTP 302 but keeps the request method and body so we can redirect POSTs too."""
|
|
|
|
status_code = 307
|
|
|
|
|
|
def clean_ip_address(request: HttpRequest) -> str:
|
|
"""Retrieve a valid IP address from the given request.
|
|
|
|
Raises a django.code.exceptions.ValidationError
|
|
if no valid IP address could be determined.
|
|
"""
|
|
ip_address = get_client_ip(request)
|
|
validate_ipv46_address(ip_address)
|
|
return ip_address
|
|
|
|
|
|
def get_client_ip(request: HttpRequest) -> str:
|
|
"""Returns the IP of the request, accounting for the possibility of being
|
|
behind a proxy.
|
|
"""
|
|
x_forwarded_for: Optional[str] = request.META.get('HTTP_X_FORWARDED_FOR', None)
|
|
if x_forwarded_for:
|
|
# X_FORWARDED_FOR returns client1, proxy1, proxy2,...
|
|
remote_addr = x_forwarded_for.split(', ', 1)[0].strip()
|
|
ip_address = _remove_port_nr(remote_addr)
|
|
try:
|
|
# X_FORWARDED_FOR can contain bogus, only use it if it's valid
|
|
validate_ipv46_address(ip_address)
|
|
return ip_address
|
|
except ValidationError:
|
|
logger.warning('Unable to parse X-Forwarded-For %s', x_forwarded_for)
|
|
|
|
remote_addr = request.META.get('REMOTE_ADDR', '')
|
|
if not remote_addr:
|
|
return ''
|
|
|
|
# REMOTE_ADDR can also be 'ip1,ip2' if people mess around with HTTP
|
|
# headers (we've seen this happen). Don't trust anything in that case.
|
|
if ',' in remote_addr:
|
|
return ''
|
|
|
|
return _remove_port_nr(remote_addr)
|
|
|
|
|
|
def _remove_port_nr(remote_addr: str) -> str:
|
|
# Occasionally the port number is included in REMOTE_ADDR.
|
|
# This needs to be filtered out so that downstream requests
|
|
# can just interpret the value as actual IP address.
|
|
|
|
if len(remote_addr) > 128:
|
|
# Prevent DoS attacks by not allowing obvious nonsense.
|
|
return ''
|
|
|
|
if ':' not in remote_addr:
|
|
return remote_addr
|
|
|
|
ipv4_with_port_match = IPV4_WITH_PORT.match(remote_addr)
|
|
if ipv4_with_port_match:
|
|
return ipv4_with_port_match.group(1)
|
|
|
|
ipv6_with_port_match = IPV6_WITH_PORT.match(remote_addr)
|
|
if ipv6_with_port_match:
|
|
return ipv6_with_port_match.group(1)
|
|
|
|
return remote_addr
|
|
|
|
|
|
def absolutify(url: str, request=None) -> str:
|
|
"""Return an absolute URL."""
|
|
if url and url.startswith(('http://', 'https://')):
|
|
return url
|
|
|
|
proto = 'http' if settings.DEBUG else 'https'
|
|
domain = get_current_site(request).domain
|
|
return urljoin(f'{proto}://{domain}', url)
|
|
|
|
|
|
def absolute_url(
|
|
view_name: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None
|
|
) -> str:
|
|
"""Same as django.urls.reverse() but returned as an absolute URL."""
|
|
relative_url = reverse(view_name, args=args, kwargs=kwargs)
|
|
return absolutify(relative_url)
|
|
|
|
|
|
class HTMLFilter(HTMLParser):
|
|
skip_text_of = ('a', 'style')
|
|
text = ''
|
|
skip_tag_text = False
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in self.skip_text_of:
|
|
self.skip_tag_text = True
|
|
for name, value in attrs:
|
|
if name == 'href':
|
|
self.skip_tag_text = True
|
|
self.text += value
|
|
if tag in ('quote', 'q'):
|
|
self.text += '“'
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag in self.skip_text_of:
|
|
self.skip_tag_text = False
|
|
if tag in ('quote', 'q'):
|
|
self.text += '”\n\n'
|
|
|
|
def handle_data(self, data):
|
|
if self.skip_tag_text:
|
|
return
|
|
self.text += data
|
|
|
|
|
|
def html_to_text(data: str) -> str:
|
|
f = HTMLFilter()
|
|
f.feed(data)
|
|
lines = [_.lstrip(' \t') for _ in f.text.split('\n')]
|
|
skip_empty = 0
|
|
for line in lines:
|
|
if not re.match(r'^\s*$', line):
|
|
break
|
|
skip_empty += 1
|
|
return '\n'.join(lines[skip_empty:])
|