Added CLI command for moving top-level nodes between projects.
Also introduces a slightly nicer way to get the database interface, and an object-oriented way to allow dependency injection.
This commit is contained in:
@@ -509,3 +509,11 @@ class PillarServer(Eve):
|
|||||||
links.sort(key=lambda t: len(t[0]) + 100 * ('/api/' in t[0]))
|
links.sort(key=lambda t: len(t[0]) + 100 * ('/api/' in t[0]))
|
||||||
|
|
||||||
pprint(links)
|
pprint(links)
|
||||||
|
|
||||||
|
def db(self):
|
||||||
|
"""Returns the MongoDB database.
|
||||||
|
|
||||||
|
:rtype: flask_pymongo.PyMongo
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.data.driver.db
|
||||||
|
@@ -12,6 +12,7 @@ import requests
|
|||||||
import requests.exceptions
|
import requests.exceptions
|
||||||
|
|
||||||
from . import stream_to_gcs, generate_all_links, ensure_valid_link
|
from . import stream_to_gcs, generate_all_links, ensure_valid_link
|
||||||
|
import pillar.api.utils.gcs
|
||||||
|
|
||||||
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend']
|
__all__ = ['PrerequisiteNotMetError', 'change_file_storage_backend']
|
||||||
|
|
||||||
@@ -138,7 +139,8 @@ def fetch_file_from_local(file_doc):
|
|||||||
:param file_doc: dict with 'link' key pointing to a path in STORAGE_DIR, and
|
:param file_doc: dict with 'link' key pointing to a path in STORAGE_DIR, and
|
||||||
'content_type' key.
|
'content_type' key.
|
||||||
:type file_doc: dict
|
:type file_doc: dict
|
||||||
:rtype: dict
|
:rtype: dict self._log.info('Moving file %s to project %s', file_id, dest_proj['_id'])
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
local_file = open(os.path.join(current_app.config['STORAGE_DIR'], file_doc['file_path']), 'rb')
|
local_file = open(os.path.join(current_app.config['STORAGE_DIR'], file_doc['file_path']), 'rb')
|
||||||
@@ -148,3 +150,42 @@ def fetch_file_from_local(file_doc):
|
|||||||
'local_file': local_file
|
'local_file': local_file
|
||||||
}
|
}
|
||||||
return local_finfo
|
return local_finfo
|
||||||
|
|
||||||
|
|
||||||
|
def gcs_move_to_bucket(file_id, dest_project_id, skip_gcs=False):
|
||||||
|
"""Moves a file from its own bucket to the new project_id bucket."""
|
||||||
|
|
||||||
|
files_coll = current_app.db()['files']
|
||||||
|
|
||||||
|
f = files_coll.find_one(file_id)
|
||||||
|
if f is None:
|
||||||
|
raise ValueError('File with _id: {} not found'.format(file_id))
|
||||||
|
|
||||||
|
# Check that new backend differs from current one
|
||||||
|
if f['backend'] != 'gcs':
|
||||||
|
raise ValueError('Only Google Cloud Storage is supported for now.')
|
||||||
|
|
||||||
|
# Move file and variations to the new bucket.
|
||||||
|
if skip_gcs:
|
||||||
|
log.warning('NOT ACTUALLY MOVING file %s on GCS, just updating MongoDB', file_id)
|
||||||
|
else:
|
||||||
|
src_project = f['project']
|
||||||
|
pillar.api.utils.gcs.copy_to_bucket(f['file_path'], src_project, dest_project_id)
|
||||||
|
for var in f.get('variations', []):
|
||||||
|
pillar.api.utils.gcs.copy_to_bucket(var['file_path'], src_project, dest_project_id)
|
||||||
|
|
||||||
|
# Update the file document after moving was successful.
|
||||||
|
log.info('Switching file %s to project %s', file_id, dest_project_id)
|
||||||
|
update_result = files_coll.update_one({'_id': file_id},
|
||||||
|
{'$set': {'project': dest_project_id}})
|
||||||
|
if update_result.matched_count != 1:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Unable to update file %s in MongoDB: matched_count=%i; modified_count=%i' % (
|
||||||
|
file_id, update_result.matched_count, update_result.modified_count))
|
||||||
|
|
||||||
|
log.info('Switching file %s: matched_count=%i; modified_count=%i',
|
||||||
|
file_id, update_result.matched_count, update_result.modified_count)
|
||||||
|
|
||||||
|
# Regenerate the links for this file
|
||||||
|
f['project'] = dest_project_id
|
||||||
|
generate_all_links(f, now=datetime.datetime.now(tz=bson.tz_util.utc))
|
||||||
|
110
pillar/api/nodes/moving.py
Normal file
110
pillar/api/nodes/moving.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
"""Code for moving around nodes."""
|
||||||
|
|
||||||
|
import attr
|
||||||
|
import flask_pymongo.wrappers
|
||||||
|
from bson import ObjectId
|
||||||
|
|
||||||
|
from pillar import attrs_extra
|
||||||
|
import pillar.api.file_storage.moving
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class NodeMover(object):
|
||||||
|
db = attr.ib(validator=attr.validators.instance_of(flask_pymongo.wrappers.Database))
|
||||||
|
skip_gcs = attr.ib(default=False, validator=attr.validators.instance_of(bool))
|
||||||
|
_log = attrs_extra.log('%s.NodeMover' % __name__)
|
||||||
|
|
||||||
|
def change_project(self, node, dest_proj):
|
||||||
|
"""Moves a node and children to a new project."""
|
||||||
|
|
||||||
|
assert isinstance(node, dict)
|
||||||
|
assert isinstance(dest_proj, dict)
|
||||||
|
|
||||||
|
for move_node in self._children(node):
|
||||||
|
self._change_project(move_node, dest_proj)
|
||||||
|
|
||||||
|
def _change_project(self, node, dest_proj):
|
||||||
|
"""Changes the project of a single node, non-recursively."""
|
||||||
|
|
||||||
|
node_id = node['_id']
|
||||||
|
proj_id = dest_proj['_id']
|
||||||
|
self._log.info('Moving node %s to project %s', node_id, proj_id)
|
||||||
|
|
||||||
|
# Find all files in the node.
|
||||||
|
moved_files = set()
|
||||||
|
self._move_files(moved_files, dest_proj, self._files(node.get('picture', None)))
|
||||||
|
self._move_files(moved_files, dest_proj, self._files(node['properties'], 'file'))
|
||||||
|
self._move_files(moved_files, dest_proj, self._files(node['properties'], 'files', 'file'))
|
||||||
|
self._move_files(moved_files, dest_proj,
|
||||||
|
self._files(node['properties'], 'attachments', 'files', 'file'))
|
||||||
|
|
||||||
|
# Switch the node's project after its files have been moved.
|
||||||
|
self._log.info('Switching node %s to project %s', node_id, proj_id)
|
||||||
|
nodes_coll = self.db['nodes']
|
||||||
|
update_result = nodes_coll.update_one({'_id': node_id},
|
||||||
|
{'$set': {'project': proj_id}})
|
||||||
|
if update_result.matched_count != 1:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Unable to update node %s in MongoDB: matched_count=%i; modified_count=%i' % (
|
||||||
|
node_id, update_result.matched_count, update_result.modified_count))
|
||||||
|
|
||||||
|
def _move_files(self, moved_files, dest_proj, file_generator):
|
||||||
|
"""Tries to find all files from the given properties."""
|
||||||
|
|
||||||
|
for file_id in file_generator:
|
||||||
|
if file_id in moved_files:
|
||||||
|
continue
|
||||||
|
moved_files.add(file_id)
|
||||||
|
self.move_file(dest_proj, file_id)
|
||||||
|
|
||||||
|
def move_file(self, dest_proj, file_id):
|
||||||
|
"""Moves a single file to another project"""
|
||||||
|
|
||||||
|
self._log.info('Moving file %s to project %s', file_id, dest_proj['_id'])
|
||||||
|
pillar.api.file_storage.moving.gcs_move_to_bucket(file_id, dest_proj['_id'],
|
||||||
|
skip_gcs=self.skip_gcs)
|
||||||
|
|
||||||
|
def _files(self, file_ref, *properties):
|
||||||
|
"""Yields file ObjectIDs."""
|
||||||
|
|
||||||
|
# Degenerate cases.
|
||||||
|
if not file_ref:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Single ObjectID
|
||||||
|
if isinstance(file_ref, ObjectId):
|
||||||
|
assert not properties
|
||||||
|
yield file_ref
|
||||||
|
return
|
||||||
|
|
||||||
|
# List of ObjectIDs
|
||||||
|
if isinstance(file_ref, list):
|
||||||
|
for item in file_ref:
|
||||||
|
for subitem in self._files(item, *properties):
|
||||||
|
yield subitem
|
||||||
|
return
|
||||||
|
|
||||||
|
# Dict, use properties[0] as key
|
||||||
|
if isinstance(file_ref, dict):
|
||||||
|
try:
|
||||||
|
subref = file_ref[properties[0]]
|
||||||
|
except KeyError:
|
||||||
|
# Silently skip non-existing keys.
|
||||||
|
return
|
||||||
|
|
||||||
|
for subitem in self._files(subref, *properties[1:]):
|
||||||
|
yield subitem
|
||||||
|
return
|
||||||
|
|
||||||
|
raise TypeError('File ref is of type %s, not implemented' % type(file_ref))
|
||||||
|
|
||||||
|
def _children(self, node):
|
||||||
|
"""Generator, recursively yields the node and its children."""
|
||||||
|
|
||||||
|
yield node
|
||||||
|
|
||||||
|
nodes_coll = self.db['nodes']
|
||||||
|
for child in nodes_coll.find({'parent': node['_id']}):
|
||||||
|
# "yield from self.children(child)" was introduced in Python 3.3
|
||||||
|
for grandchild in self._children(child):
|
||||||
|
yield grandchild
|
@@ -169,6 +169,15 @@ class GoogleCloudStorageBucket(object):
|
|||||||
blob.content_disposition = u'attachment; filename="{0}"'.format(name)
|
blob.content_disposition = u'attachment; filename="{0}"'.format(name)
|
||||||
blob.patch()
|
blob.patch()
|
||||||
|
|
||||||
|
def copy_blob(self, blob, to_bucket):
|
||||||
|
"""Copies the given blob from this bucket to the other bucket.
|
||||||
|
|
||||||
|
Returns the new blob.
|
||||||
|
"""
|
||||||
|
|
||||||
|
assert isinstance(to_bucket, GoogleCloudStorageBucket)
|
||||||
|
return self.bucket.copy_blob(blob, to_bucket.bucket)
|
||||||
|
|
||||||
|
|
||||||
def update_file_name(node):
|
def update_file_name(node):
|
||||||
"""Assign to the CGS blob the same name of the asset node. This way when
|
"""Assign to the CGS blob the same name of the asset node. This way when
|
||||||
@@ -222,3 +231,16 @@ def update_file_name(node):
|
|||||||
if 'files' in node['properties']:
|
if 'files' in node['properties']:
|
||||||
for file_props in node['properties']['files']:
|
for file_props in node['properties']['files']:
|
||||||
_update_name(file_props['file'], file_props)
|
_update_name(file_props['file'], file_props)
|
||||||
|
|
||||||
|
|
||||||
|
def copy_to_bucket(file_path, src_project_id, dest_project_id):
|
||||||
|
"""Copies a file from one bucket to the other."""
|
||||||
|
|
||||||
|
log.info('Copying %s from project bucket %s to %s',
|
||||||
|
file_path, src_project_id, dest_project_id)
|
||||||
|
|
||||||
|
src_storage = GoogleCloudStorageBucket(str(src_project_id))
|
||||||
|
dest_storage = GoogleCloudStorageBucket(str(dest_project_id))
|
||||||
|
|
||||||
|
blob = src_storage.Get(file_path, to_dict=False)
|
||||||
|
src_storage.copy_blob(blob, dest_storage)
|
||||||
|
17
pillar/attrs_extra.py
Normal file
17
pillar/attrs_extra.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
"""Extra functionality for attrs."""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
|
||||||
|
def log(name):
|
||||||
|
"""Returns a logger attr.ib
|
||||||
|
|
||||||
|
:param name: name to pass to logging.getLogger()
|
||||||
|
:rtype: attr.ib
|
||||||
|
"""
|
||||||
|
return attr.ib(default=logging.getLogger(name),
|
||||||
|
repr=False,
|
||||||
|
hash=False,
|
||||||
|
cmp=False)
|
@@ -444,3 +444,61 @@ def mass_copy_between_backends(src_backend='cdnsun', dest_backend='gcs'):
|
|||||||
|
|
||||||
log.info('%i files copied ok', copied_ok)
|
log.info('%i files copied ok', copied_ok)
|
||||||
log.info('%i files we did not copy', copy_errs)
|
log.info('%i files we did not copy', copy_errs)
|
||||||
|
|
||||||
|
|
||||||
|
@manager.command
|
||||||
|
@manager.option('-p', '--project', dest='dest_proj_url',
|
||||||
|
help='Destination project URL')
|
||||||
|
@manager.option('-f', '--force', dest='force', action='store_true', default=False,
|
||||||
|
help='Move even when already at the given project.')
|
||||||
|
@manager.option('-s', '--skip-gcs', dest='skip_gcs', action='store_true', default=False,
|
||||||
|
help='Skip file handling on GCS, just update the database.')
|
||||||
|
def move_group_node_project(node_uuid, dest_proj_url, force=False, skip_gcs=False):
|
||||||
|
"""Copies all files from one project to the other, then moves the nodes.
|
||||||
|
|
||||||
|
The node and all its children are moved recursively.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from pillar.api.nodes import moving
|
||||||
|
from pillar.api.utils import str2id
|
||||||
|
|
||||||
|
logging.getLogger('pillar').setLevel(logging.INFO)
|
||||||
|
|
||||||
|
db = current_app.db()
|
||||||
|
nodes_coll = db['nodes']
|
||||||
|
projs_coll = db['projects']
|
||||||
|
|
||||||
|
# Parse CLI args and get the node, source and destination projects.
|
||||||
|
node_uuid = str2id(node_uuid)
|
||||||
|
node = nodes_coll.find_one({'_id': node_uuid})
|
||||||
|
if node is None:
|
||||||
|
log.error("Node %s can't be found!", node_uuid)
|
||||||
|
return 1
|
||||||
|
|
||||||
|
if node.get('parent', None):
|
||||||
|
log.error('Node cannot have a parent, it must be top-level.')
|
||||||
|
return 4
|
||||||
|
|
||||||
|
src_proj = projs_coll.find_one({'_id': node['project']})
|
||||||
|
dest_proj = projs_coll.find_one({'url': dest_proj_url})
|
||||||
|
|
||||||
|
if src_proj is None:
|
||||||
|
log.warning("Node's source project %s doesn't exist!", node['project'])
|
||||||
|
if dest_proj is None:
|
||||||
|
log.error("Destination project url='%s' doesn't exist.", dest_proj_url)
|
||||||
|
return 2
|
||||||
|
if src_proj['_id'] == dest_proj['_id']:
|
||||||
|
if force:
|
||||||
|
log.warning("Node is already at project url='%s'!", dest_proj_url)
|
||||||
|
else:
|
||||||
|
log.error("Node is already at project url='%s'!", dest_proj_url)
|
||||||
|
return 3
|
||||||
|
|
||||||
|
log.info("Mass-moving %s (%s) and children from project '%s' (%s) to '%s' (%s)",
|
||||||
|
node_uuid, node['name'], src_proj['url'], src_proj['_id'], dest_proj['url'],
|
||||||
|
dest_proj['_id'])
|
||||||
|
|
||||||
|
mover = moving.NodeMover(db=db, skip_gcs=skip_gcs)
|
||||||
|
mover.change_project(node, dest_proj)
|
||||||
|
|
||||||
|
log.info('Done moving.')
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
# Primary requirements
|
# Primary requirements
|
||||||
|
attrs==16.2.0
|
||||||
algoliasearch==1.8.0
|
algoliasearch==1.8.0
|
||||||
bcrypt==2.0.0
|
bcrypt==2.0.0
|
||||||
blinker==1.4
|
blinker==1.4
|
||||||
|
143
tests/test_api/test_nodes_moving.py
Normal file
143
tests/test_api/test_nodes_moving.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
from __future__ import absolute_import, print_function
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from bson import ObjectId
|
||||||
|
import flask_pymongo.wrappers
|
||||||
|
|
||||||
|
# Required to mock this module, otherwise 'pillar.api.file_storage' doesn't have
|
||||||
|
# the attribute 'moving'
|
||||||
|
import pillar.api.file_storage.moving
|
||||||
|
|
||||||
|
|
||||||
|
class NodeMoverTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
from pillar.api.nodes import moving
|
||||||
|
|
||||||
|
self.db = mock.MagicMock(spec=flask_pymongo.wrappers.Database)
|
||||||
|
self.mover = moving.NodeMover(db=self.db)
|
||||||
|
|
||||||
|
def test_file_generator(self):
|
||||||
|
# Degenerate cases.
|
||||||
|
self.assertEqual([], list(self.mover._files(None)))
|
||||||
|
self.assertEqual([], list(self.mover._files([])))
|
||||||
|
self.assertEqual([], list(self.mover._files({})))
|
||||||
|
|
||||||
|
# Single ID
|
||||||
|
self.assertEqual([ObjectId(24 * 'a')],
|
||||||
|
list(self.mover._files(ObjectId(24 * 'a'))))
|
||||||
|
|
||||||
|
# Keyed single ID
|
||||||
|
self.assertEqual([ObjectId(24 * 'a')],
|
||||||
|
list(self.mover._files({'file': ObjectId(24 * 'a')}, 'file')))
|
||||||
|
|
||||||
|
# Keyed list of IDs
|
||||||
|
self.assertEqual([ObjectId(24 * 'a'), ObjectId(24 * 'b')],
|
||||||
|
list(self.mover._files({'files': [ObjectId(24 * 'a'), ObjectId(24 * 'b')]},
|
||||||
|
'files')))
|
||||||
|
|
||||||
|
# Keyed dict with keyed list of IDs
|
||||||
|
self.assertEqual([ObjectId(24 * 'a'), ObjectId(24 * 'b')],
|
||||||
|
list(self.mover._files(
|
||||||
|
{'attachments': {'files': [ObjectId(24 * 'a'), ObjectId(24 * 'b')]}},
|
||||||
|
'attachments', 'files')))
|
||||||
|
|
||||||
|
# Keyed dict with list of keyed lists of IDs
|
||||||
|
found = self.mover._files(
|
||||||
|
{'attachments': [
|
||||||
|
{'files': [ObjectId(24 * 'a'), ObjectId(24 * 'b')]},
|
||||||
|
{'files': [ObjectId(24 * 'c'), ObjectId(24 * 'd')]}]},
|
||||||
|
'attachments', 'files')
|
||||||
|
self.assertEqual(
|
||||||
|
[ObjectId(24 * 'a'), ObjectId(24 * 'b'), ObjectId(24 * 'c'), ObjectId(24 * 'd')],
|
||||||
|
list(found))
|
||||||
|
|
||||||
|
# And one step futher
|
||||||
|
found = self.mover._files(
|
||||||
|
{'attachments': [
|
||||||
|
{'files': [{'file': ObjectId(24 * 'a')},
|
||||||
|
{'file': ObjectId(24 * 'b')}]},
|
||||||
|
{'files': [{'file': ObjectId(24 * 'c')},
|
||||||
|
{'file': ObjectId(24 * 'd')}]}
|
||||||
|
]},
|
||||||
|
'attachments', 'files', 'file')
|
||||||
|
self.assertEqual(
|
||||||
|
[ObjectId(24 * 'a'), ObjectId(24 * 'b'), ObjectId(24 * 'c'), ObjectId(24 * 'd')],
|
||||||
|
list(found))
|
||||||
|
|
||||||
|
# Test double IDs
|
||||||
|
found = self.mover._files(
|
||||||
|
{'attachments': [
|
||||||
|
{'files': [{'file': ObjectId(24 * 'a')},
|
||||||
|
{'file': ObjectId(24 * 'b')}]},
|
||||||
|
{'files': [{'file': ObjectId(24 * 'a')},
|
||||||
|
{'file': ObjectId(24 * 'd')}]}
|
||||||
|
]},
|
||||||
|
'attachments', 'files', 'file')
|
||||||
|
self.assertEqual(
|
||||||
|
[ObjectId(24 * 'a'), ObjectId(24 * 'b'), ObjectId(24 * 'a'), ObjectId(24 * 'd')],
|
||||||
|
list(found))
|
||||||
|
|
||||||
|
@mock.patch('pillar.api.file_storage.moving', autospec=True)
|
||||||
|
def test_move_nodes(self, mock_fsmoving):
|
||||||
|
node = {
|
||||||
|
'_id': ObjectId(24 * 'a'),
|
||||||
|
'picture': ObjectId(24 * 'b'),
|
||||||
|
'properties': {
|
||||||
|
'attachments': [
|
||||||
|
{'files': [
|
||||||
|
{'file': ObjectId(24 * 'c')},
|
||||||
|
{'file': ObjectId(24 * 'd')},
|
||||||
|
]}
|
||||||
|
],
|
||||||
|
'files': [
|
||||||
|
{'file': ObjectId(24 * 'e')},
|
||||||
|
{'file': ObjectId(24 * 'b')},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prid = ObjectId('project_dest')
|
||||||
|
new_project = {
|
||||||
|
'_id': prid
|
||||||
|
}
|
||||||
|
|
||||||
|
update_res = mock.Mock()
|
||||||
|
update_res.matched_count = 1
|
||||||
|
update_res.modified_count = 1
|
||||||
|
self.db['nodes'].update_one.return_value = update_res
|
||||||
|
self.mover.change_project(node, new_project)
|
||||||
|
|
||||||
|
mock_fsmoving.gcs_move_to_bucket.assert_has_calls([
|
||||||
|
mock.call(ObjectId(24 * 'b'), prid, skip_gcs=False),
|
||||||
|
mock.call(ObjectId(24 * 'e'), prid, skip_gcs=False),
|
||||||
|
mock.call(ObjectId(24 * 'c'), prid, skip_gcs=False),
|
||||||
|
mock.call(ObjectId(24 * 'd'), prid, skip_gcs=False),
|
||||||
|
])
|
||||||
|
|
||||||
|
@mock.patch('pillar.api.file_storage.moving', autospec=True)
|
||||||
|
def test_move_node_without_picture_or_att(self, mock_fsmoving):
|
||||||
|
node = {
|
||||||
|
'_id': ObjectId(24 * 'a'),
|
||||||
|
'properties': {
|
||||||
|
'files': [
|
||||||
|
{'file': ObjectId(24 * 'e')},
|
||||||
|
{'file': ObjectId(24 * 'b')},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prid = ObjectId('project_dest')
|
||||||
|
new_project = {
|
||||||
|
'_id': prid
|
||||||
|
}
|
||||||
|
|
||||||
|
update_res = mock.Mock()
|
||||||
|
update_res.matched_count = 1
|
||||||
|
update_res.modified_count = 1
|
||||||
|
self.db['nodes'].update_one.return_value = update_res
|
||||||
|
self.mover.change_project(node, new_project)
|
||||||
|
|
||||||
|
mock_fsmoving.gcs_move_to_bucket.assert_has_calls([
|
||||||
|
mock.call(ObjectId(24 * 'e'), prid, skip_gcs=False),
|
||||||
|
mock.call(ObjectId(24 * 'b'), prid, skip_gcs=False),
|
||||||
|
])
|
Reference in New Issue
Block a user