|
|
|
|
|
from __future__ import print_function |
|
|
|
import argparse |
|
import os |
|
import os.path as osp |
|
import re |
|
import shutil |
|
import sys |
|
import tempfile |
|
|
|
import requests |
|
import six |
|
import tqdm |
|
|
|
|
|
|
|
|
|
|
|
|
|
CHUNK_SIZE = 512 * 1024 |
|
|
|
|
|
def get_url_from_gdrive_confirmation(contents): |
|
url = '' |
|
for line in contents.splitlines(): |
|
m = re.search('href="(\/uc\?export=download[^"]+)', line) |
|
if m: |
|
url = 'https://docs.google.com' + m.groups()[0] |
|
url = url.replace('&', '&') |
|
return url |
|
m = re.search('confirm=([^;&]+)', line) |
|
if m: |
|
confirm = m.groups()[0] |
|
url = re.sub(r'confirm=([^;&]+)', r'confirm='+confirm, url) |
|
return url |
|
m = re.search('"downloadUrl":"([^"]+)', line) |
|
if m: |
|
url = m.groups()[0] |
|
url = url.replace('\\u003d', '=') |
|
url = url.replace('\\u0026', '&') |
|
return url |
|
|
|
|
|
def is_google_drive_url(url): |
|
m = re.match('^https?://drive.google.com/uc\?id=.*$', url) |
|
return m is not None |
|
|
|
|
|
def download(url, output, quiet): |
|
url_origin = url |
|
sess = requests.session() |
|
|
|
is_gdrive = is_google_drive_url(url) |
|
|
|
while True: |
|
res = sess.get(url, stream=True) |
|
if 'Content-Disposition' in res.headers: |
|
|
|
break |
|
if not is_gdrive: |
|
break |
|
|
|
|
|
url = get_url_from_gdrive_confirmation(res.text) |
|
|
|
if url is None: |
|
print('Permission denied: %s' % url_origin, file=sys.stderr) |
|
print("Maybe you need to change permission over " |
|
"'Anyone with the link'?", file=sys.stderr) |
|
return |
|
|
|
if output is None: |
|
if is_gdrive: |
|
m = re.search('filename="(.*)"', |
|
res.headers['Content-Disposition']) |
|
output = m.groups()[0] |
|
else: |
|
output = osp.basename(url) |
|
|
|
output_is_path = isinstance(output, six.string_types) |
|
|
|
if not quiet: |
|
print('Downloading...', file=sys.stderr) |
|
print('From:', url_origin, file=sys.stderr) |
|
print('To:', osp.abspath(output) if output_is_path else output, |
|
file=sys.stderr) |
|
|
|
if output_is_path: |
|
tmp_file = tempfile.mktemp( |
|
suffix=tempfile.template, |
|
prefix=osp.basename(output), |
|
dir=osp.dirname(output), |
|
) |
|
f = open(tmp_file, 'wb') |
|
else: |
|
tmp_file = None |
|
f = output |
|
|
|
try: |
|
total = res.headers.get('Content-Length') |
|
if total is not None: |
|
total = int(total) |
|
if not quiet: |
|
pbar = tqdm.tqdm(total=total, unit='B', unit_scale=True) |
|
for chunk in res.iter_content(chunk_size=CHUNK_SIZE): |
|
f.write(chunk) |
|
if not quiet: |
|
pbar.update(len(chunk)) |
|
if not quiet: |
|
pbar.close() |
|
if tmp_file: |
|
f.close() |
|
shutil.copy(tmp_file, output) |
|
except IOError as e: |
|
print(e, file=sys.stderr) |
|
return |
|
finally: |
|
try: |
|
if tmp_file: |
|
os.remove(tmp_file) |
|
except OSError: |
|
pass |
|
|
|
return output |
|
|
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
|
parser.add_argument( |
|
'url_or_id', help='url or file id (with --id) to download file from') |
|
parser.add_argument('-O', '--output', help='output filename') |
|
parser.add_argument('-q', '--quiet', action='store_true', |
|
help='suppress standard output') |
|
parser.add_argument('--id', action='store_true', |
|
help='flag to specify file id instead of url') |
|
args = parser.parse_args() |
|
|
|
print(args) |
|
if args.output == '-': |
|
if six.PY3: |
|
args.output = sys.stdout.buffer |
|
else: |
|
args.output = sys.stdout |
|
|
|
download(args.url_or_id, args.output, args.quiet) |
|
|
|
if __name__ == '__main__': |
|
main() |
|
|