Elastic: paralellise reindexing
It's marginally faster (on our production DB user reindexing goes down from 5+ minutes to 4 minutes), but will likely become significantly faster when we run ElasticSearch on its own machine.
This commit is contained in:
parent
408db5e060
commit
1d08f6850b
@ -866,3 +866,11 @@ class PillarServer(BlinkerCompatibleEve):
|
|||||||
@property
|
@property
|
||||||
def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]:
|
def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]:
|
||||||
return self._user_caps
|
return self._user_caps
|
||||||
|
|
||||||
|
@property
|
||||||
|
def real_app(self) -> 'PillarServer':
|
||||||
|
"""The real application object.
|
||||||
|
|
||||||
|
Can be used to obtain the real app object from a LocalProxy.
|
||||||
|
"""
|
||||||
|
return self
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
|
import concurrent.futures
|
||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
import bson
|
|
||||||
|
|
||||||
|
import bson
|
||||||
from flask_script import Manager
|
from flask_script import Manager
|
||||||
|
|
||||||
from pillar import current_app
|
from pillar import current_app
|
||||||
@ -16,6 +17,7 @@ name_to_task = {
|
|||||||
'nodes': index.ResetNodeIndex,
|
'nodes': index.ResetNodeIndex,
|
||||||
'users': index.ResetUserIndex,
|
'users': index.ResetUserIndex,
|
||||||
}
|
}
|
||||||
|
REINDEX_THREAD_COUNT = 3
|
||||||
|
|
||||||
|
|
||||||
@manager_elastic.option('indices', nargs='*')
|
@manager_elastic.option('indices', nargs='*')
|
||||||
@ -47,27 +49,41 @@ def _reindex_users():
|
|||||||
# in prepare_user_data(…)
|
# in prepare_user_data(…)
|
||||||
users = users_coll.find()
|
users = users_coll.find()
|
||||||
user_count = users.count()
|
user_count = users.count()
|
||||||
|
indexed = 0
|
||||||
|
|
||||||
log.info('Reindexing %d users in Elastic', user_count)
|
log.info('Reindexing %d users in Elastic', user_count)
|
||||||
|
|
||||||
from pillar.celery.search_index_tasks import prepare_user_data
|
from pillar.celery.search_index_tasks import prepare_user_data
|
||||||
from pillar.api.search import elastic_indexing
|
from pillar.api.search import elastic_indexing
|
||||||
|
|
||||||
indexed = 0
|
app = current_app.real_app
|
||||||
for idx, user in enumerate(users):
|
|
||||||
if idx % 100 == 0:
|
|
||||||
log.info('Processing user %d/%d', idx+1, user_count)
|
|
||||||
to_index = prepare_user_data('', user=user)
|
|
||||||
if not to_index:
|
|
||||||
log.debug('missing user..')
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
def do_work(work_idx_user):
|
||||||
elastic_indexing.push_updated_user(to_index)
|
nonlocal indexed
|
||||||
except(KeyError, AttributeError):
|
idx, user = work_idx_user
|
||||||
log.exception('Field is missing for %s', user)
|
|
||||||
else:
|
with app.app_context():
|
||||||
indexed += 1
|
if idx % 100 == 0:
|
||||||
|
log.info('Processing user %d/%d', idx+1, user_count)
|
||||||
|
to_index = prepare_user_data('', user=user)
|
||||||
|
if not to_index:
|
||||||
|
log.debug('not indexing user %s', user)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
elastic_indexing.push_updated_user(to_index)
|
||||||
|
except(KeyError, AttributeError):
|
||||||
|
log.exception('Field is missing for %s', user)
|
||||||
|
else:
|
||||||
|
indexed += 1
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=REINDEX_THREAD_COUNT) as executor:
|
||||||
|
result = executor.map(do_work, enumerate(users))
|
||||||
|
|
||||||
|
# When an exception occurs, it's enough to just iterate over the results.
|
||||||
|
# That will re-raise the exception in the main thread.
|
||||||
|
for ob in result:
|
||||||
|
log.debug('result: %s', ob)
|
||||||
log.info('Reindexed %d/%d users', indexed, user_count)
|
log.info('Reindexed %d/%d users', indexed, user_count)
|
||||||
|
|
||||||
|
|
||||||
@ -90,23 +106,35 @@ def _reindex_nodes():
|
|||||||
'_deleted': {'$ne': True},
|
'_deleted': {'$ne': True},
|
||||||
})
|
})
|
||||||
node_count = nodes.count()
|
node_count = nodes.count()
|
||||||
|
indexed = 0
|
||||||
|
|
||||||
log.info('Nodes %d will be reindexed in Elastic', node_count)
|
log.info('Nodes %d will be reindexed in Elastic', node_count)
|
||||||
|
app = current_app.real_app
|
||||||
|
|
||||||
from pillar.celery.search_index_tasks import prepare_node_data
|
from pillar.celery.search_index_tasks import prepare_node_data
|
||||||
from pillar.api.search import elastic_indexing
|
from pillar.api.search import elastic_indexing
|
||||||
|
|
||||||
indexed = 0
|
def do_work(work_idx_node):
|
||||||
for idx, node in enumerate(nodes):
|
nonlocal indexed
|
||||||
if idx % 100 == 0:
|
|
||||||
log.info('Processing node %d/%d', idx+1, node_count)
|
idx, node = work_idx_node
|
||||||
try:
|
with app.app_context():
|
||||||
to_index = prepare_node_data('', node=node)
|
if idx % 100 == 0:
|
||||||
elastic_indexing.index_node_save(to_index)
|
log.info('Processing node %d/%d', idx+1, node_count)
|
||||||
except (KeyError, AttributeError):
|
try:
|
||||||
log.exception('Node %s is missing Field', node)
|
to_index = prepare_node_data('', node=node)
|
||||||
else:
|
elastic_indexing.index_node_save(to_index)
|
||||||
indexed += 1
|
except (KeyError, AttributeError):
|
||||||
|
log.exception('Node %s is missing a field', node)
|
||||||
|
else:
|
||||||
|
indexed += 1
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=REINDEX_THREAD_COUNT) as executor:
|
||||||
|
result = executor.map(do_work, enumerate(nodes))
|
||||||
|
# When an exception occurs, it's enough to just iterate over the results.
|
||||||
|
# That will re-raise the exception in the main thread.
|
||||||
|
for ob in result:
|
||||||
|
log.debug('result: %s', ob)
|
||||||
log.info('Reindexed %d/%d nodes', indexed, node_count)
|
log.info('Reindexed %d/%d nodes', indexed, node_count)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user