Add support for compressed chunks on download
uses lzma compression (4mb chunks currently)
This commit is contained in:
57
bam/cli.py
57
bam/cli.py
@@ -653,15 +653,21 @@ class bam_commands:
|
||||
import struct
|
||||
ID_MESSAGE = 1
|
||||
ID_PAYLOAD = 2
|
||||
ID_PAYLOAD_EMPTY = 3
|
||||
ID_DONE = 4
|
||||
ID_PAYLOAD_APPEND = 3
|
||||
ID_PAYLOAD_EMPTY = 4
|
||||
ID_DONE = 5
|
||||
head = r.raw.read(4)
|
||||
if head != b'BAM\0':
|
||||
fatal("bad header from server")
|
||||
|
||||
file_index = 0
|
||||
is_header_read = True
|
||||
while True:
|
||||
msg_type, msg_size = struct.unpack("<II", r.raw.read(8))
|
||||
if is_header_read:
|
||||
msg_type, msg_size = struct.unpack("<II", r.raw.read(8))
|
||||
else:
|
||||
is_header_read = True
|
||||
|
||||
if msg_type == ID_MESSAGE:
|
||||
sys.stdout.write(r.raw.read(msg_size).decode('utf-8'))
|
||||
sys.stdout.flush()
|
||||
@@ -679,20 +685,45 @@ class bam_commands:
|
||||
os.makedirs(os.path.dirname(f_abs), exist_ok=True)
|
||||
|
||||
with open(f_abs, "wb") as f:
|
||||
tot_size = 0
|
||||
# for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||||
for chunk in iter_content_size(r, msg_size, chunk_size=CHUNK_SIZE):
|
||||
if chunk: # filter out keep-alive new chunks
|
||||
tot_size += len(chunk)
|
||||
f.write(chunk)
|
||||
f.flush()
|
||||
while True:
|
||||
tot_size = 0
|
||||
# No need to worry about filling memory,
|
||||
# total chunk size is capped by the server
|
||||
chunks = []
|
||||
# for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
|
||||
for chunk in iter_content_size(r, msg_size, chunk_size=CHUNK_SIZE):
|
||||
if chunk: # filter out keep-alive new chunks
|
||||
tot_size += len(chunk)
|
||||
# f.write(chunk)
|
||||
# f.flush()
|
||||
chunks.append(chunk)
|
||||
|
||||
sys.stdout.write("\rdownload: [%03d%%]" % ((100 * tot_size) // msg_size))
|
||||
sys.stdout.flush()
|
||||
assert(tot_size == msg_size)
|
||||
sys.stdout.write("\rdownload: [%03d%%]" % ((100 * tot_size) // msg_size))
|
||||
sys.stdout.flush()
|
||||
assert(tot_size == msg_size)
|
||||
|
||||
# decompress all chunks
|
||||
import lzma
|
||||
f.write(lzma.decompress(b''.join(chunks)))
|
||||
f.flush()
|
||||
del chunks
|
||||
|
||||
# take care! - re-reading the next header to see if
|
||||
# we're appending to this file or not
|
||||
msg_type, msg_size = struct.unpack("<II", r.raw.read(8))
|
||||
if msg_type == ID_PAYLOAD_APPEND:
|
||||
continue
|
||||
# otherwise continue the outer loop, without re-reading the header
|
||||
|
||||
# don't re-read the header next iteration
|
||||
is_header_read = False
|
||||
break
|
||||
|
||||
elif msg_type == ID_DONE:
|
||||
break
|
||||
elif msg_type == ID_PAYLOAD_APPEND:
|
||||
# Should only handle in a read-loop above
|
||||
raise Exception("Invalid state for message-type %d" % msg_type)
|
||||
else:
|
||||
raise Exception("Unknown message-type %d" % msg_type)
|
||||
del struct
|
||||
|
@@ -278,14 +278,17 @@ class FileAPI(Resource):
|
||||
# return Response(f, direct_passthrough=True)
|
||||
return Response(response_message_iter(), direct_passthrough=True)
|
||||
elif command == 'checkout_download':
|
||||
# 4mb chunks
|
||||
CHUNK_COMPRESS = 4194304
|
||||
# CHUNK_COMPRESS = 512 # for testing, we can ensure many chunks are supported
|
||||
files = command_args['files']
|
||||
|
||||
def response_message_iter():
|
||||
ID_MESSAGE = 1
|
||||
ID_PAYLOAD = 2
|
||||
ID_PAYLOAD_EMPTY = 3
|
||||
ID_DONE = 4
|
||||
# ID_PAYLOAD_APPEND = 3
|
||||
ID_PAYLOAD_APPEND = 3
|
||||
ID_PAYLOAD_EMPTY = 4
|
||||
ID_DONE = 5
|
||||
import struct
|
||||
|
||||
def report(txt):
|
||||
@@ -295,15 +298,6 @@ class FileAPI(Resource):
|
||||
yield b'BAM\0'
|
||||
|
||||
# pack the file!
|
||||
import tempfile
|
||||
|
||||
# weak! (ignore original opened file)
|
||||
'''
|
||||
filepath_zip = tempfile.mkstemp(suffix=".zip")
|
||||
os.close(filepath_zip[0])
|
||||
filepath_zip = filepath_zip[1]
|
||||
'''
|
||||
|
||||
for f_rel in files:
|
||||
f_abs = os.path.join(project.repository_path, f_rel)
|
||||
if os.path.exists(f_abs):
|
||||
@@ -314,12 +308,20 @@ class FileAPI(Resource):
|
||||
f_size = f.tell()
|
||||
f.seek(0, os.SEEK_SET)
|
||||
|
||||
yield struct.pack('<II', ID_PAYLOAD, f_size)
|
||||
while True:
|
||||
data = f.read(1024)
|
||||
if not data:
|
||||
break
|
||||
yield data
|
||||
id_payload = ID_PAYLOAD
|
||||
|
||||
f_size_left = f_size
|
||||
import lzma
|
||||
while f_size_left:
|
||||
data_raw = f.read(CHUNK_COMPRESS)
|
||||
f_size_left -= len(data_raw)
|
||||
data_lzma = lzma.compress(data_raw)
|
||||
del data_raw
|
||||
assert(f_size_left >= 0)
|
||||
|
||||
yield struct.pack('<II', id_payload, len(data_lzma))
|
||||
yield data_lzma
|
||||
id_payload = ID_PAYLOAD_APPEND
|
||||
else:
|
||||
yield report("%s: %r\n" % ("source missing", f_rel))
|
||||
yield struct.pack('<II', ID_PAYLOAD_EMPTY, 0)
|
||||
|
Reference in New Issue
Block a user