extensions-website/utils.py
Anna Sirota cc900979d6 Basic email template for notifications (#96)
Based on the proposal #93

Email subjects change to the following:

```
Add-on approved: "Genuinely Disgusting"
New comment on Add-on "Genuinely Disgusting"
Add-on rated: "Genuinely Disgusting"
Add-on reported: "Genuinely Disgusting"
Add-on rating reported: "Genuinely Disgusting"
Add-on changes requested: "Genuinely Disgusting"
Add-on review requested: "Genuinely Disgusting"
```

Notification emails can also be viewed in the admin `Email previews` section (with fake objects constructed and passed into email template contexts).

Indicating why the email was sent is not in this change because we currently don't store why **precisely** (can only guess it's either because of `moderators` group or extension follow, the state of either can change after the notification was generated). This can be part of the `Notification` records however, after that the reason can be included in the email.

Reviewed-on: #96
Reviewed-by: Oleg-Komarov <oleg-komarov@noreply.localhost>
2024-05-02 14:04:22 +02:00

241 lines
7.1 KiB
Python

from html.parser import HTMLParser
from typing import Optional
from urllib.parse import urljoin
import datetime
import itertools
import logging
import re
import time
from urllib.parse import (
parse_qsl,
ParseResult,
unquote_to_bytes,
urlencode as urllib_urlencode,
)
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
from django.core.validators import validate_ipv46_address
from django.http import HttpRequest
from django.http.response import HttpResponseRedirectBase
from django.urls import reverse
from django.utils.encoding import force_bytes, force_str
from django.utils.http import _urlparse
import django.utils.text
User = get_user_model()
log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
IPV4_WITH_PORT = re.compile(r"([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):[0-9]+")
"""Regexp matching an IPv4 address with a port number."""
IPV6_WITH_PORT = re.compile(r"\[([0-9:]+)\]:[0-9]+")
"""Regexp matching an IPv6 address with a port number."""
def urlencode(items):
"""A Unicode-safe URLencoder."""
try:
return urllib_urlencode(items)
except UnicodeEncodeError:
return urllib_urlencode([(k, force_bytes(v)) for k, v in items])
def utc_millesecs_from_epoch(for_datetime=None):
"""Returns millesconds from the Unix epoch in UTC.
If `for_datetime` is None, the current datetime will be used.
"""
if not for_datetime:
for_datetime = datetime.datetime.now()
# Number of seconds.
seconds = time.mktime(for_datetime.utctimetuple())
# timetuple() doesn't care about more precision than seconds, but we do.
# Add microseconds as a fraction of a second to keep the precision.
seconds += for_datetime.microsecond / 1000000.0
# Now convert to milliseconds.
return int(seconds * 1000)
def slugify(s: str):
"""Convert a given string to a URL slug.
Do it the same way Django does it, but replace underscores with dashes first.
"""
return django.utils.text.slugify(s.replace('_', '-'))
def urlparams(url_, hash=None, **query):
"""Add a fragment and/or query parameters to a URL.
New query params will be appended to existing parameters, except duplicate
names, which will be replaced.
"""
url = _urlparse(force_str(url_))
fragment = hash if hash is not None else url.fragment
# Use dict(parse_qsl) so we don't get lists of values.
query_dict = dict(parse_qsl(force_str(url.query))) if url.query else {}
query_dict.update((k, force_bytes(v) if v is not None else v) for k, v in query.items())
query_string = urlencode(
[(k, unquote_to_bytes(v)) for k, v in query_dict.items() if v is not None]
)
result = ParseResult(url.scheme, url.netloc, url.path, url.params, query_string, fragment)
return result.geturl()
def send_mail(*args, **kwargs):
"""A wrapper around django.core.mail.EmailMessage."""
pass # TODO implement send_mail
def chunked(seq, n):
"""Yield successive n-sized chunks from seq.
>>> for group in chunked(range(8), 3):
... print group
[0, 1, 2]
[3, 4, 5]
[6, 7]
"""
seq = iter(seq)
while True:
rv = list(itertools.islice(seq, 0, n))
if not rv:
break
yield rv
class HttpResponseTemporaryRedirect(HttpResponseRedirectBase):
"""Similar to HTTP 302 but keeps the request method and body so we can redirect POSTs too."""
status_code = 307
def clean_ip_address(request: HttpRequest) -> str:
"""Retrieve a valid IP address from the given request.
Raises a django.code.exceptions.ValidationError
if no valid IP address could be determined.
"""
ip_address = get_client_ip(request)
validate_ipv46_address(ip_address)
return ip_address
def get_client_ip(request: HttpRequest) -> str:
"""Returns the IP of the request, accounting for the possibility of being
behind a proxy.
"""
x_forwarded_for: Optional[str] = request.META.get('HTTP_X_FORWARDED_FOR', None)
if x_forwarded_for:
# X_FORWARDED_FOR returns client1, proxy1, proxy2,...
remote_addr = x_forwarded_for.split(', ', 1)[0].strip()
ip_address = _remove_port_nr(remote_addr)
try:
# X_FORWARDED_FOR can contain bogus, only use it if it's valid
validate_ipv46_address(ip_address)
return ip_address
except ValidationError:
logger.warning('Unable to parse X-Forwarded-For %s', x_forwarded_for)
remote_addr = request.META.get('REMOTE_ADDR', '')
if not remote_addr:
return ''
# REMOTE_ADDR can also be 'ip1,ip2' if people mess around with HTTP
# headers (we've seen this happen). Don't trust anything in that case.
if ',' in remote_addr:
return ''
return _remove_port_nr(remote_addr)
def _remove_port_nr(remote_addr: str) -> str:
# Occasionally the port number is included in REMOTE_ADDR.
# This needs to be filtered out so that downstream requests
# can just interpret the value as actual IP address.
if len(remote_addr) > 128:
# Prevent DoS attacks by not allowing obvious nonsense.
return ''
if ':' not in remote_addr:
return remote_addr
ipv4_with_port_match = IPV4_WITH_PORT.match(remote_addr)
if ipv4_with_port_match:
return ipv4_with_port_match.group(1)
ipv6_with_port_match = IPV6_WITH_PORT.match(remote_addr)
if ipv6_with_port_match:
return ipv6_with_port_match.group(1)
return remote_addr
def absolutify(url: str, request=None) -> str:
"""Return an absolute URL."""
if url and url.startswith(('http://', 'https://')):
return url
proto = 'http' if settings.DEBUG else 'https'
domain = get_current_site(request).domain
return urljoin(f'{proto}://{domain}', url)
def absolute_url(
view_name: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None
) -> str:
"""Same as django.urls.reverse() but returned as an absolute URL."""
relative_url = reverse(view_name, args=args, kwargs=kwargs)
return absolutify(relative_url)
class HTMLFilter(HTMLParser):
skip_text_of = ('a', 'style')
text = ''
skip_tag_text = False
def handle_starttag(self, tag, attrs):
if tag in self.skip_text_of:
self.skip_tag_text = True
for name, value in attrs:
if name == 'href':
self.skip_tag_text = True
self.text += value
if tag in ('quote', 'q'):
self.text += ''
def handle_endtag(self, tag):
if tag in self.skip_text_of:
self.skip_tag_text = False
if tag in ('quote', 'q'):
self.text += '\n\n'
def handle_data(self, data):
if self.skip_tag_text:
return
self.text += data
def html_to_text(data: str) -> str:
f = HTMLFilter()
f.feed(data)
lines = [_.lstrip(' \t') for _ in f.text.split('\n')]
skip_empty = 0
for line in lines:
if not re.match(r'^\s*$', line):
break
skip_empty += 1
return '\n'.join(lines[skip_empty:])