From 1d08f6850bb754e474bcad61df3604614877ded6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 9 Jan 2018 17:05:31 +0100 Subject: [PATCH] Elastic: paralellise reindexing It's marginally faster (on our production DB user reindexing goes down from 5+ minutes to 4 minutes), but will likely become significantly faster when we run ElasticSearch on its own machine. --- pillar/__init__.py | 8 +++++ pillar/cli/elastic.py | 80 +++++++++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 26 deletions(-) diff --git a/pillar/__init__.py b/pillar/__init__.py index a6ae2571..f0959d65 100644 --- a/pillar/__init__.py +++ b/pillar/__init__.py @@ -866,3 +866,11 @@ class PillarServer(BlinkerCompatibleEve): @property def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]: return self._user_caps + + @property + def real_app(self) -> 'PillarServer': + """The real application object. + + Can be used to obtain the real app object from a LocalProxy. + """ + return self diff --git a/pillar/cli/elastic.py b/pillar/cli/elastic.py index afe9f37d..b7237b2c 100644 --- a/pillar/cli/elastic.py +++ b/pillar/cli/elastic.py @@ -1,7 +1,8 @@ +import concurrent.futures import logging import typing -import bson +import bson from flask_script import Manager from pillar import current_app @@ -16,6 +17,7 @@ name_to_task = { 'nodes': index.ResetNodeIndex, 'users': index.ResetUserIndex, } +REINDEX_THREAD_COUNT = 3 @manager_elastic.option('indices', nargs='*') @@ -47,27 +49,41 @@ def _reindex_users(): # in prepare_user_data(…) users = users_coll.find() user_count = users.count() + indexed = 0 log.info('Reindexing %d users in Elastic', user_count) from pillar.celery.search_index_tasks import prepare_user_data from pillar.api.search import elastic_indexing - indexed = 0 - for idx, user in enumerate(users): - if idx % 100 == 0: - log.info('Processing user %d/%d', idx+1, user_count) - to_index = prepare_user_data('', user=user) - if not to_index: - log.debug('missing user..') - continue + app = current_app.real_app - try: - elastic_indexing.push_updated_user(to_index) - except(KeyError, AttributeError): - log.exception('Field is missing for %s', user) - else: - indexed += 1 + def do_work(work_idx_user): + nonlocal indexed + idx, user = work_idx_user + + with app.app_context(): + if idx % 100 == 0: + log.info('Processing user %d/%d', idx+1, user_count) + to_index = prepare_user_data('', user=user) + if not to_index: + log.debug('not indexing user %s', user) + return + + try: + elastic_indexing.push_updated_user(to_index) + except(KeyError, AttributeError): + log.exception('Field is missing for %s', user) + else: + indexed += 1 + + with concurrent.futures.ThreadPoolExecutor(max_workers=REINDEX_THREAD_COUNT) as executor: + result = executor.map(do_work, enumerate(users)) + + # When an exception occurs, it's enough to just iterate over the results. + # That will re-raise the exception in the main thread. + for ob in result: + log.debug('result: %s', ob) log.info('Reindexed %d/%d users', indexed, user_count) @@ -90,23 +106,35 @@ def _reindex_nodes(): '_deleted': {'$ne': True}, }) node_count = nodes.count() + indexed = 0 log.info('Nodes %d will be reindexed in Elastic', node_count) + app = current_app.real_app from pillar.celery.search_index_tasks import prepare_node_data from pillar.api.search import elastic_indexing - indexed = 0 - for idx, node in enumerate(nodes): - if idx % 100 == 0: - log.info('Processing node %d/%d', idx+1, node_count) - try: - to_index = prepare_node_data('', node=node) - elastic_indexing.index_node_save(to_index) - except (KeyError, AttributeError): - log.exception('Node %s is missing Field', node) - else: - indexed += 1 + def do_work(work_idx_node): + nonlocal indexed + + idx, node = work_idx_node + with app.app_context(): + if idx % 100 == 0: + log.info('Processing node %d/%d', idx+1, node_count) + try: + to_index = prepare_node_data('', node=node) + elastic_indexing.index_node_save(to_index) + except (KeyError, AttributeError): + log.exception('Node %s is missing a field', node) + else: + indexed += 1 + + with concurrent.futures.ThreadPoolExecutor(max_workers=REINDEX_THREAD_COUNT) as executor: + result = executor.map(do_work, enumerate(nodes)) + # When an exception occurs, it's enough to just iterate over the results. + # That will re-raise the exception in the main thread. + for ob in result: + log.debug('result: %s', ob) log.info('Reindexed %d/%d nodes', indexed, node_count)