support for partial downloads
(use local cache where possible)
This commit is contained in:
161
bam/cli.py
161
bam/cli.py
@@ -491,6 +491,10 @@ class bam_commands:
|
||||
all_deps=False,
|
||||
):
|
||||
|
||||
# ---------
|
||||
# constants
|
||||
CHUNK_SIZE = 1024
|
||||
|
||||
cfg = bam_config.load(abort=True)
|
||||
|
||||
if output_dir is None:
|
||||
@@ -519,6 +523,9 @@ class bam_commands:
|
||||
}),
|
||||
}
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# First request we simply get a list of files to download
|
||||
#
|
||||
import requests
|
||||
r = requests.get(
|
||||
bam_session.request_url("file"),
|
||||
@@ -556,7 +563,7 @@ class bam_commands:
|
||||
break
|
||||
|
||||
tot_size = 0
|
||||
for chunk in r.iter_content(chunk_size=1024):
|
||||
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||||
if chunk: # filter out keep-alive new chunks
|
||||
tot_size += len(chunk)
|
||||
f.write(chunk)
|
||||
@@ -564,6 +571,7 @@ class bam_commands:
|
||||
|
||||
sys.stdout.write("\rdownload: [%03d%%]" % ((100 * tot_size) // msg_size))
|
||||
sys.stdout.flush()
|
||||
del struct
|
||||
|
||||
# ---------------
|
||||
# extract the zip
|
||||
@@ -576,32 +584,151 @@ class bam_commands:
|
||||
os.remove(dst_dir_data)
|
||||
sys.stdout.write("\nwritten: %r\n" % session_rootdir)
|
||||
|
||||
# ----
|
||||
# Update cache
|
||||
cachedir = os.path.join(bam_config.find_rootdir(cwd=session_rootdir, abort=True), ".cache")
|
||||
# os.makedirs(cachedir, exist_ok=True)
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Second request we simply download the files..
|
||||
#
|
||||
# which we don't have in cache,
|
||||
# note that its possible we have all in cache and don't need to make a second request.
|
||||
files = []
|
||||
with open(os.path.join(session_rootdir, ".bam_paths_remap.json")) as fp:
|
||||
from bam.utils.system import uuid_from_file
|
||||
paths_remap = json.load(fp)
|
||||
|
||||
paths_uuid = bam_session.load_paths_uuid(session_rootdir)
|
||||
print(paths_uuid)
|
||||
|
||||
for f_src, f_dst in paths_remap.items():
|
||||
if f_src == ".":
|
||||
continue
|
||||
|
||||
uuid = paths_uuid.get(f_src)
|
||||
if uuid is not None:
|
||||
f_dst_abs = os.path.join(cachedir, f_dst)
|
||||
if os.path.exists(f_dst_abs):
|
||||
# check if we need to download this file?
|
||||
uuid_exists = uuid_from_file(f_dst_abs)
|
||||
assert(type(uuid) is type(uuid_exists))
|
||||
if uuid == uuid_exists:
|
||||
continue
|
||||
|
||||
files.append(f_dst)
|
||||
|
||||
del uuid_from_file
|
||||
|
||||
if files:
|
||||
payload = {
|
||||
"command": "checkout_download",
|
||||
"arguments": json.dumps({
|
||||
"files": files,
|
||||
}),
|
||||
}
|
||||
import requests
|
||||
r = requests.get(
|
||||
bam_session.request_url("file"),
|
||||
params=payload,
|
||||
auth=(cfg['user'], cfg['password']),
|
||||
stream=True,
|
||||
)
|
||||
|
||||
if r.status_code not in {200, }:
|
||||
# TODO(cam), make into reusable function?
|
||||
print("Error %d:\n%s" % (r.status_code, next(r.iter_content(chunk_size=1024)).decode('utf-8')))
|
||||
return
|
||||
|
||||
# TODO(cam) how to tell if we get back a message payload? or real data???
|
||||
# needed so we don't read past buffer bounds
|
||||
def iter_content_size(r, size, chunk_size=CHUNK_SIZE):
|
||||
while size >= chunk_size:
|
||||
size -= chunk_size
|
||||
yield r.raw.read(chunk_size)
|
||||
if size:
|
||||
yield r.raw.read(size)
|
||||
|
||||
|
||||
import struct
|
||||
ID_MESSAGE = 1
|
||||
ID_PAYLOAD = 2
|
||||
ID_PAYLOAD_EMPTY = 3
|
||||
ID_DONE = 4
|
||||
head = r.raw.read(4)
|
||||
if head != b'BAM\0':
|
||||
fatal("bad header from server")
|
||||
|
||||
file_index = 0
|
||||
while True:
|
||||
msg_type, msg_size = struct.unpack("<II", r.raw.read(8))
|
||||
if msg_type == ID_MESSAGE:
|
||||
sys.stdout.write(r.raw.read(msg_size).decode('utf-8'))
|
||||
sys.stdout.flush()
|
||||
elif msg_type == ID_PAYLOAD_EMPTY:
|
||||
file_index += 1
|
||||
elif msg_type == ID_PAYLOAD:
|
||||
f_rel = files[file_index]
|
||||
f_abs = os.path.join(cachedir, files[file_index])
|
||||
file_index += 1
|
||||
|
||||
# server also prints... we could do this a bit different...
|
||||
sys.stdout.write("file: %r" % f_rel)
|
||||
sys.stdout.flush()
|
||||
|
||||
os.makedirs(os.path.dirname(f_abs), exist_ok=True)
|
||||
|
||||
with open(f_abs, "wb") as f:
|
||||
tot_size = 0
|
||||
# for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||||
for chunk in iter_content_size(r, msg_size, chunk_size=CHUNK_SIZE):
|
||||
if chunk: # filter out keep-alive new chunks
|
||||
tot_size += len(chunk)
|
||||
f.write(chunk)
|
||||
f.flush()
|
||||
|
||||
sys.stdout.write("\rdownload: [%03d%%]" % ((100 * tot_size) // msg_size))
|
||||
sys.stdout.flush()
|
||||
assert(tot_size == msg_size)
|
||||
|
||||
elif msg_type == ID_DONE:
|
||||
break
|
||||
else:
|
||||
raise Exception("Unknown message-type %d" % msg_type)
|
||||
del struct
|
||||
|
||||
|
||||
del files
|
||||
|
||||
# ------------
|
||||
# Update Cache
|
||||
#
|
||||
# TODO, remove stale cache
|
||||
cachedir = os.path.join(bam_config.find_rootdir(cwd=session_rootdir, abort=True), ".cache")
|
||||
# os.makedirs(cachedir, exist_ok=True)
|
||||
|
||||
# we need this to map to project level paths
|
||||
#
|
||||
# Copy cache into our session before applying binary edits.
|
||||
with open(os.path.join(session_rootdir, ".bam_paths_remap.json")) as fp:
|
||||
paths_remap = json.load(fp)
|
||||
for f_src, f_dst in paths_remap.items():
|
||||
if f_src == ".":
|
||||
continue
|
||||
f_src_abs = os.path.join(session_rootdir, f_src)
|
||||
if not os.path.exists(f_src_abs):
|
||||
for f_dst, f_src in paths_remap.items():
|
||||
if f_dst == ".":
|
||||
continue
|
||||
|
||||
f_dst_abs = os.path.join(cachedir, f_dst)
|
||||
os.makedirs(os.path.dirname(f_dst_abs), exist_ok=True)
|
||||
import shutil
|
||||
# print("from ", f_src_abs, os.path.exists(f_src_abs))
|
||||
# print("to ", f_dst_abs, os.path.exists(f_dst_abs))
|
||||
# print("CREATING: ", f_dst_abs)
|
||||
shutil.copyfile(f_src_abs, f_dst_abs)
|
||||
del shutil
|
||||
f_src_abs = os.path.join(cachedir, f_src)
|
||||
|
||||
# this should 'almost' always be true
|
||||
if os.path.exists(f_src_abs):
|
||||
|
||||
f_dst_abs = os.path.join(session_rootdir, f_dst)
|
||||
os.makedirs(os.path.dirname(f_dst_abs), exist_ok=True)
|
||||
|
||||
import shutil
|
||||
# print("from ", f_dst_abs, os.path.exists(f_dst_abs))
|
||||
# print("to ", f_src_abs, os.path.exists(f_src_abs))
|
||||
# print("CREATING: ", f_src_abs)
|
||||
shutil.copyfile(f_src_abs, f_dst_abs)
|
||||
del shutil
|
||||
# import time
|
||||
# time.sleep(10000)
|
||||
|
||||
del paths_remap, cachedir
|
||||
# ...done updating cache
|
||||
|
Reference in New Issue
Block a user