extensions-website/extensions/views/public.py

222 lines
8.7 KiB
Python

from collections import OrderedDict
import logging
from django.contrib.auth import get_user_model
from django.db import connection
from django.db.models import Count, Q
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _
from django.views.generic.list import ListView
from extensions.models import Extension, Version, Tag
from constants.base import (
EXTENSION_TYPE_SLUGS,
EXTENSION_TYPE_PLURAL,
EXTENSION_TYPE_CHOICES,
)
from stats.models import ExtensionDownload, VersionDownload
import teams.models
User = get_user_model()
log = logging.getLogger(__name__)
class ListedExtensionsView(ListView):
model = Extension
queryset = Extension.objects.listed.prefetch_related(
'authors',
'latest_version__file',
'latest_version__tags',
'preview_set',
'preview_set__file',
'ratings',
'team',
).distinct()
context_object_name = 'extensions'
class HomeView(ListedExtensionsView):
paginate_by = 16
template_name = 'extensions/home.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
q = super().get_queryset().order_by('-average_score')
context['addons'] = q.filter(type=EXTENSION_TYPE_CHOICES.BPY)[:8]
context['themes'] = q.filter(type=EXTENSION_TYPE_CHOICES.THEME)[:8]
return context
def extension_version_download(request, type_slug, slug, version, filename):
"""Download an extension version and count downloads.
The `filename` parameter is used to pass a file name ending with `.zip`.
This is a convention Blender uses to initiate an extension installation on an HTML anchor
drag&drop.
"""
extension_version = get_object_or_404(Version, extension__slug=slug, version=version)
ExtensionDownload.create_from_request(request, object_id=extension_version.extension_id)
VersionDownload.create_from_request(request, object_id=extension_version.pk)
return redirect(extension_version.downloadable_signed_url + f'?filename={filename}')
class SearchView(ListedExtensionsView):
paginate_by = 16
template_name = 'extensions/list.html'
default_sort_by = '-average_score'
sort_by_options = OrderedDict(
[
('-average_score', _('Rating')),
('-download_count', _('Downloads')),
('-date_approved', _('Newest First')),
('date_approved', _('Oldest First')),
('name', _('Title (A-Z)')),
('-name', _('Title (Z-A)')),
]
)
def _get_type_id_by_slug(self):
return next(k for k, v in EXTENSION_TYPE_SLUGS.items() if v == self.kwargs['type_slug'])
def _get_type_by_slug(self):
return EXTENSION_TYPE_PLURAL[self._get_type_id_by_slug()]
def _get_sort_by(self):
sort_by = self.request.GET.get('sort_by', self.default_sort_by)
if sort_by not in self.sort_by_options:
sort_by = self.default_sort_by
return sort_by
def get_queryset(self):
queryset = super().get_queryset().order_by(self._get_sort_by())
if self.kwargs.get('tag_slug'):
queryset = queryset.filter(
latest_version__tags__slug=self.kwargs['tag_slug']
).distinct()
if self.kwargs.get('team_slug'):
queryset = queryset.filter(team__slug=self.kwargs['team_slug'])
if self.kwargs.get('user_id'):
queryset = queryset.filter(
authors__maintainer__user_id=self.kwargs['user_id']
).distinct()
if self.kwargs.get('type_slug'):
_type = self._get_type_id_by_slug()
queryset = queryset.filter(type=_type)
search_query = self.request.GET.get('q')
if not search_query:
return queryset
# WARNING: full-text search support only on postgres
if connection.vendor == 'postgresql':
queryset = self.postgres_fts(queryset, search_query)
else:
filter = Q()
for token in search_query.split():
filter &= (
Q(slug__icontains=token)
| Q(name__icontains=token)
| Q(description__icontains=token)
| Q(latest_version__tags__name__icontains=token)
)
queryset = queryset.filter(filter).distinct()
return queryset
def postgres_fts(self, queryset, search_query):
"""Postgres full text search (fast) and a fuzzy trigram search (slow) as a fallback.
Searches Extension name and description only, ranking name matches higher.
If we need to extend the functionality, it's better to consider using a different approach,
e.g. introduce meilisearch.
Limits the results size to 32 items (2 pages), assuming that nobody will click through many
pages if we failed to present the vital results on the first page.
Relies on indexed expressions:
CREATE INDEX extensions_fts ON extensions_extension USING
gin ((to_tsvector('english', name) || ' ' || to_tsvector('english', description)));
CREATE INDEX extensions_trgm_gin ON extensions_extension USING
gin((((name)::text || ' '::text) || description) gin_trgm_ops);
"""
with connection.cursor() as cursor:
sql = """
select id
from extensions_extension
where (
(to_tsvector('english', name) || ' ' || to_tsvector('english', description))
@@ websearch_to_tsquery('english', %(query)s)
) and is_listed
order by ts_rank(
to_tsvector('english', name),
websearch_to_tsquery('english', %(query)s)
) desc
limit 32"""
cursor.execute(sql, {'query': search_query})
pks = [row[0] for row in cursor.fetchall()]
if not pks:
# fallback to fuzzy trigram search
sql = """
select id
from extensions_extension
where ((name || ' ' || description) %%> %(query)s)
and is_listed
order by %(query)s <<<-> (name || ' ' || description)
limit 32"""
cursor.execute(sql, {'query': search_query})
pks = [row[0] for row in cursor.fetchall()]
# pks are ordered by ranking, keep that order
# this approach is fine under the assumption that the list is small
return sorted(queryset.filter(pk__in=pks).order_by(), key=lambda x: pks.index(x.pk))
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if self.kwargs.get('user_id'):
context['author'] = get_object_or_404(User, pk=self.kwargs['user_id'])
if self.kwargs.get('tag_slug'):
context['tag'] = get_object_or_404(Tag, slug=self.kwargs['tag_slug'])
if self.kwargs.get('type_slug'):
context['type'] = self._get_type_by_slug()
if self.kwargs.get('team_slug'):
context['team'] = get_object_or_404(teams.models.Team, slug=self.kwargs['team_slug'])
sort_by = self._get_sort_by()
context['sort_by'] = sort_by
context['sort_by_option_name'] = self.sort_by_options.get(sort_by)
context['sort_by_options'] = self.sort_by_options
# Determine which tags to list depending on the context.
tag_type_id = None
if context.get('type'):
tag_type_id = self._get_type_id_by_slug()
elif context.get('tag'):
tag_type_id = context['tag'].type
if tag_type_id:
tags = [
{
'count': t['count'],
'name': t['latest_version__tags__name'],
'slug': t['latest_version__tags__slug'],
}
for t in Extension.objects.listed.select_related('latest_version__tags')
.filter(latest_version__tags__type=tag_type_id)
.values('latest_version__tags__name', 'latest_version__tags__slug')
.annotate(count=Count('id'))
.order_by('latest_version__tags__name')
]
context['tags'] = tags
if 'tag' in context:
# this is silly, but the list is short
tag_slug = context['tag'].slug
for t in tags:
if t['slug'] == tag_slug:
context['current_tag_count'] = t['count']
break
context['total_count'] = super().get_queryset().filter(type=tag_type_id).count()
return context