T53161 elasticsearch can index nodes now. cli command. NOTE config changes!!
This commit is contained in:
parent
1fe88819f4
commit
a8849ec823
@ -693,6 +693,7 @@ class PillarServer(Eve):
|
||||
|
||||
api.setup_app(self)
|
||||
web.setup_app(self)
|
||||
|
||||
authentication.setup_app(self)
|
||||
|
||||
for ext in self.pillar_extensions.values():
|
||||
|
@ -2,9 +2,11 @@ def setup_app(app):
|
||||
from . import encoding, blender_id, projects, local_auth, file_storage
|
||||
from . import users, nodes, latest, blender_cloud, service, activities
|
||||
from . import organizations
|
||||
from . import search
|
||||
|
||||
encoding.setup_app(app, url_prefix='/encoding')
|
||||
blender_id.setup_app(app, url_prefix='/blender_id')
|
||||
search.setup_app(app, url_prefix='/newsearch')
|
||||
projects.setup_app(app, api_prefix='/p')
|
||||
local_auth.setup_app(app, url_prefix='/auth')
|
||||
file_storage.setup_app(app, url_prefix='/storage')
|
||||
|
@ -0,0 +1,13 @@
|
||||
import logging
|
||||
|
||||
#import bson
|
||||
#from flask import current_app
|
||||
|
||||
from .routes import blueprint_search
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_app(app, url_prefix: str =None):
|
||||
app.register_api_blueprint(
|
||||
blueprint_search, url_prefix=url_prefix)
|
@ -26,15 +26,16 @@ autocomplete = es.analyzer(
|
||||
|
||||
|
||||
class User(es.DocType):
|
||||
"""
|
||||
Elastic document describing user
|
||||
"""
|
||||
"""Elastic document describing user."""
|
||||
|
||||
name = es.String(
|
||||
fielddata=True,
|
||||
analyzer=autocomplete,
|
||||
)
|
||||
|
||||
class Meta:
|
||||
index = 'users'
|
||||
|
||||
|
||||
class Node(es.DocType):
|
||||
"""
|
||||
@ -43,12 +44,39 @@ class Node(es.DocType):
|
||||
|
||||
node_type = es.Keyword()
|
||||
|
||||
x_code = es.String(
|
||||
multi=True,
|
||||
objectID = es.Keyword()
|
||||
|
||||
name = es.String(
|
||||
fielddata=True,
|
||||
analyzer=autocomplete,
|
||||
analyzer=autocomplete
|
||||
)
|
||||
|
||||
user_id = es.Keyword()
|
||||
user_name = es.String(
|
||||
fielddata=True,
|
||||
analyzer=autocomplete
|
||||
)
|
||||
|
||||
description = es.String()
|
||||
|
||||
is_free = es.Boolean()
|
||||
|
||||
project_id = es.Keyword()
|
||||
project_name = es.String()
|
||||
|
||||
media = es.Keyword()
|
||||
|
||||
picture_url = es.Keyword()
|
||||
|
||||
tags = es.Keyword(multi=True)
|
||||
license_notes = es.String()
|
||||
|
||||
created_at = es.Date()
|
||||
updated_at = es.Date()
|
||||
|
||||
class Meta:
|
||||
index = 'nodes'
|
||||
|
||||
|
||||
def create_doc_from_user_data(user_to_index):
|
||||
doc_id = user_to_index['objectID']
|
||||
@ -59,8 +87,25 @@ def create_doc_from_user_data(user_to_index):
|
||||
def create_doc_from_node_data(node_to_index):
|
||||
|
||||
# node stuff
|
||||
doc_id = node_to_index['objectID']
|
||||
doc_id = str(node_to_index['objectID'])
|
||||
doc = Node(_id=doc_id)
|
||||
|
||||
doc.node_type = node_to_index['node_type']
|
||||
doc.name = node_to_index['name']
|
||||
doc.user_id = str(node_to_index['user']['_id'])
|
||||
doc.user_name = node_to_index['user']['full_name']
|
||||
doc.project_id = str(node_to_index['project']['_id'])
|
||||
doc.project_name = node_to_index['project']['name']
|
||||
|
||||
if node_to_index['node_type'] == 'asset':
|
||||
doc.media = node_to_index['media']
|
||||
|
||||
doc.picture_url = node_to_index.get('picture')
|
||||
|
||||
doc.tags = node_to_index.get('tags')
|
||||
doc.license_notes = node_to_index.get('license_notes')
|
||||
|
||||
doc.created_at = node_to_index['created']
|
||||
doc.updated_at = node_to_index['updated']
|
||||
|
||||
return doc
|
||||
|
@ -1,10 +1,25 @@
|
||||
import logging
|
||||
from pillar import current_app
|
||||
from elasticsearch_dsl.connections import connections
|
||||
|
||||
from . import documents
|
||||
|
||||
|
||||
elk_hosts = current_app.config['ELASTIC_SEARCH_HOSTS']
|
||||
|
||||
connections.create_connection(
|
||||
hosts=elk_hosts,
|
||||
sniff_on_start=True,
|
||||
timeout=20)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def push_updated_user(user_to_index: dict):
|
||||
"""Push an update to the Algolia index when a user item is updated"""
|
||||
"""
|
||||
Push an update to the Elastic index when
|
||||
a user item is updated.
|
||||
"""
|
||||
|
||||
log.warning(
|
||||
'WIP USER ELK INDEXING %s %s',
|
||||
@ -15,11 +30,18 @@ def push_updated_user(user_to_index: dict):
|
||||
def index_node_save(node_to_index: dict):
|
||||
|
||||
log.warning(
|
||||
'WIP USER NODE INDEXING %s',
|
||||
'ELK NODE INDEXING %s',
|
||||
node_to_index.get('objectID'))
|
||||
|
||||
log.warning(node_to_index)
|
||||
|
||||
doc = documents.create_doc_from_node_data(node_to_index)
|
||||
|
||||
log.warning('CREATED ELK DOC')
|
||||
doc.save()
|
||||
|
||||
|
||||
def index_node_delete(delete_id: str):
|
||||
|
||||
log.warning(
|
||||
'WIP NODE DELETE INDEXING %s', delete_id)
|
||||
log.warning('NODE DELETE INDEXING %s', delete_id)
|
||||
documents.Node(id=delete_id).delete()
|
||||
|
69
pillar/api/search/index.py
Normal file
69
pillar/api/search/index.py
Normal file
@ -0,0 +1,69 @@
|
||||
import logging
|
||||
# import time
|
||||
|
||||
# from elasticsearch import helpers
|
||||
# import elasticsearch
|
||||
|
||||
# from elasticsearch.client import IndicesClient
|
||||
|
||||
from elasticsearch.exceptions import NotFoundError
|
||||
from elasticsearch_dsl.connections import connections
|
||||
import elasticsearch_dsl as es
|
||||
|
||||
from pillar import current_app
|
||||
|
||||
from . import documents
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResetIndexTask(object):
|
||||
"""
|
||||
Clear and build index / mapping
|
||||
"""
|
||||
index = ''
|
||||
doc_types = []
|
||||
name = 'remove index'
|
||||
|
||||
def __init__(self):
|
||||
|
||||
if not self.index:
|
||||
raise ValueError("No index specified")
|
||||
|
||||
if not self.doc_types:
|
||||
raise ValueError("No doc_types specified")
|
||||
|
||||
connections.create_connection(
|
||||
hosts=current_app.config['ELASTIC_SEARCH_HOSTS'],
|
||||
# sniff_on_start=True,
|
||||
retry_on_timeout=True,
|
||||
)
|
||||
|
||||
def execute(self):
|
||||
|
||||
idx = es.Index(self.index)
|
||||
|
||||
try:
|
||||
idx.delete(ignore=404)
|
||||
log.info("Deleted index %s", self.index)
|
||||
except AttributeError:
|
||||
log.warning("Could not delete index '%s', ignoring", self.index)
|
||||
except NotFoundError:
|
||||
log.warning("Could not delete index '%s', ignoring", self.index)
|
||||
|
||||
# create doc types
|
||||
for dt in self.doc_types:
|
||||
idx.doc_type(dt)
|
||||
|
||||
# create index
|
||||
idx.create()
|
||||
|
||||
|
||||
class ResetNodeIndex(ResetIndexTask):
|
||||
index = current_app.config['ELASTIC_INDICES']['NODE']
|
||||
doc_types = [documents.Node]
|
||||
|
||||
|
||||
def reset_node_index():
|
||||
resettask = ResetNodeIndex()
|
||||
resettask.execute()
|
42
pillar/api/search/queries.py
Normal file
42
pillar/api/search/queries.py
Normal file
@ -0,0 +1,42 @@
|
||||
import logging
|
||||
import json
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search, Q
|
||||
from elasticsearch_dsl.connections import connections
|
||||
|
||||
from pillar import current_app
|
||||
|
||||
#elk_hosts = current_app.config['ELASTIC_SEARCH_HOSTS']
|
||||
#
|
||||
#connections.create_connection(
|
||||
# hosts=elk_hosts,
|
||||
# sniff_on_start=True,
|
||||
# timeout=20)
|
||||
#
|
||||
client = Elasticsearch()
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def do_search(query: str) -> dict:
|
||||
"""
|
||||
Given user input search for node/stuff
|
||||
"""
|
||||
should = [
|
||||
Q('match', name=query),
|
||||
Q('match', user_name=query),
|
||||
Q('match', project_name=query),
|
||||
Q('match', description=query),
|
||||
Q('term', media=query),
|
||||
Q('term', tags=query),
|
||||
]
|
||||
bool_query = Q('bool', should=should)
|
||||
search = Search(using=client)
|
||||
search.query = bool_query
|
||||
|
||||
if current_app.config['DEBUG']:
|
||||
log.debug(json.dumps(search.to_dict(), indent=4))
|
||||
|
||||
response = search.execute()
|
||||
|
||||
return response.to_dict()
|
33
pillar/api/search/routes.py
Normal file
33
pillar/api/search/routes.py
Normal file
@ -0,0 +1,33 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
from bson import ObjectId
|
||||
from flask import Blueprint, request, current_app, make_response, url_for
|
||||
from flask import Response
|
||||
from werkzeug import exceptions as wz_exceptions
|
||||
|
||||
from pillar.api.utils import authorization, jsonify, str2id
|
||||
from pillar.api.utils import mongo
|
||||
from pillar.api.utils.authorization import require_login, check_permissions
|
||||
from pillar.auth import current_user
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
blueprint_search = Blueprint('elksearch', __name__)
|
||||
|
||||
from . import queries
|
||||
|
||||
#@authorization.require_login(require_cap='subscriber')
|
||||
@blueprint_search.route('/', methods=['GET'])
|
||||
def search_nodes():
|
||||
|
||||
searchword = request.args.get('q', '')
|
||||
|
||||
if not searchword:
|
||||
return 'You are forgetting a "?q=whatareyoulookingfor"'
|
||||
|
||||
data = queries.do_search(searchword)
|
||||
|
||||
resp = Response(json.dumps(data), mimetype='application/json')
|
||||
return resp
|
@ -16,7 +16,7 @@ def index_user_save(to_index_user: dict):
|
||||
return
|
||||
|
||||
# Create or update Algolia index for the user
|
||||
index_users.save_object()
|
||||
index_users.save_object(to_index_user)
|
||||
|
||||
|
||||
@skip_when_testing
|
||||
|
@ -3,7 +3,6 @@ from bson import ObjectId
|
||||
from pillar import current_app
|
||||
from pillar.api.file_storage import generate_link
|
||||
|
||||
# TODO WIP (stephan) make index backend conditional on settings.
|
||||
from pillar.api.search import elastic_indexing
|
||||
|
||||
from pillar.api.search import algolia_indexing
|
||||
@ -57,11 +56,12 @@ def _handle_picture(node: dict, to_index: dict):
|
||||
is_public=True)
|
||||
|
||||
|
||||
def prepare_node_data(node_id: str):
|
||||
def prepare_node_data(node_id: str, node=None) -> dict:
|
||||
"""
|
||||
Given node build data object with fields to index
|
||||
"""
|
||||
node = _get_node_from_id(node_id)
|
||||
if node_id:
|
||||
node = _get_node_from_id(node_id)
|
||||
|
||||
if node is None:
|
||||
log.warning('Unable to find node %s, not updating Algolia.', node_id)
|
||||
@ -117,7 +117,7 @@ def prepare_node_data(node_id: str):
|
||||
return to_index
|
||||
|
||||
|
||||
def prepare_user_data(user_id: str):
|
||||
def prepare_user_data(user_id: str) -> dict:
|
||||
"""
|
||||
Prepare data to index for user node
|
||||
"""
|
||||
@ -158,8 +158,8 @@ def updated_user(user_id: str):
|
||||
user_to_index = prepare_user_data(user_id)
|
||||
|
||||
for searchoption in current_app.config['SEARCH_BACKENDS']:
|
||||
for searchmodule in SEARCH_BACKENDS[searchoption]:
|
||||
searchmodule.push_updated_user(user_to_index)
|
||||
searchmodule = SEARCH_BACKENDS[searchoption]
|
||||
searchmodule.push_updated_user(user_to_index)
|
||||
|
||||
|
||||
@current_app.celery.task(ignore_result=True)
|
||||
@ -167,7 +167,9 @@ def node_save(node_id: str):
|
||||
|
||||
to_index = prepare_node_data(node_id)
|
||||
|
||||
algolia_indexing.index_node_save(to_index)
|
||||
for searchoption in current_app.config['SEARCH_BACKENDS']:
|
||||
searchmodule = SEARCH_BACKENDS[searchoption]
|
||||
searchmodule.index_node_save(to_index)
|
||||
|
||||
|
||||
@current_app.celery.task(ignore_result=True)
|
||||
@ -177,3 +179,7 @@ def node_delete(node_id: str):
|
||||
# No need to fetch anything from Mongo.
|
||||
delete_id = ObjectId(node_id)
|
||||
algolia_indexing.index_node_delete(delete_id)
|
||||
|
||||
for searchoption in current_app.config['SEARCH_BACKENDS']:
|
||||
searchmodule = SEARCH_BACKENDS[searchoption]
|
||||
searchmodule.index_node_delete(delete_id)
|
||||
|
@ -12,6 +12,8 @@ from pillar.cli.celery import manager_celery
|
||||
from pillar.cli.maintenance import manager_maintenance
|
||||
from pillar.cli.operations import manager_operations
|
||||
from pillar.cli.setup import manager_setup
|
||||
from pillar.cli.elastic import manager_elk
|
||||
|
||||
from pillar.cli import translations
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -20,4 +22,5 @@ manager = Manager(current_app)
|
||||
manager.add_command('celery', manager_celery)
|
||||
manager.add_command("maintenance", manager_maintenance)
|
||||
manager.add_command("setup", manager_setup)
|
||||
manager.add_command("operations", manager_operations)
|
||||
manager.add_command("operations", manager_operations)
|
||||
manager.add_command("elastic", manager_elk)
|
||||
|
42
pillar/cli/elastic.py
Normal file
42
pillar/cli/elastic.py
Normal file
@ -0,0 +1,42 @@
|
||||
import logging
|
||||
|
||||
from flask_script import Manager
|
||||
|
||||
from pillar import current_app
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
manager_elk = Manager(
|
||||
current_app, usage="Elastic utilities, like reset_index()")
|
||||
|
||||
|
||||
@manager_elk.command
|
||||
def reset_index(elk_index):
|
||||
"""
|
||||
Destroy and recreate elastic indices
|
||||
|
||||
node, user ...
|
||||
"""
|
||||
#real_current_app = current_app._get_current_object()._get_current_object()
|
||||
|
||||
with current_app.app_context():
|
||||
from pillar.api.search import index
|
||||
if elk_index == 'nodes':
|
||||
index.reset_node_index()
|
||||
|
||||
|
||||
@manager_elk.command
|
||||
def reindex_nodes():
|
||||
|
||||
db = current_app.db()
|
||||
nodes_coll = db['nodes']
|
||||
node_count = nodes_coll.count()
|
||||
|
||||
log.debug('Reindexing %d in Elastic', node_count)
|
||||
|
||||
from pillar.celery.search_index_tasks import prepare_node_data
|
||||
from pillar.api.search import elastic_indexing
|
||||
|
||||
for node in nodes_coll.find():
|
||||
to_index = prepare_node_data('', node=node)
|
||||
elastic_indexing.index_node_save(to_index)
|
@ -75,6 +75,12 @@ ALGOLIA_INDEX_NODES = 'dev_Nodes'
|
||||
|
||||
SEARCH_BACKENDS = ['algolia', 'elastic']
|
||||
|
||||
ELASTIC_SEARCH_HOSTS = ['elasticsearch']
|
||||
ELASTIC_INDICES = {
|
||||
'NODE': 'nodes',
|
||||
'USER': 'users',
|
||||
}
|
||||
|
||||
|
||||
ZENCODER_API_KEY = '-SECRET-'
|
||||
ZENCODER_NOTIFICATIONS_SECRET = '-SECRET-'
|
||||
|
Loading…
x
Reference in New Issue
Block a user