yolov7 / yolov7detect /utils /google_utils.py
Mauricio Guerta
My yolov7detect
640ea1d
raw
history blame
6.32 kB
# Google utils: https://cloud.google.com/storage/docs/reference/libraries
import os
import platform
import subprocess
import time
from pathlib import Path
import requests
import torch
def attempt_download_from_hub(repo_id, hf_token=None):
# https://github.com/fcakyon/yolov5-pip/blob/main/yolov5/utils/downloads.py
from huggingface_hub import hf_hub_download, list_repo_files
from huggingface_hub.utils._errors import RepositoryNotFoundError
from huggingface_hub.utils._validators import HFValidationError
try:
repo_files = list_repo_files(repo_id=repo_id, repo_type='model', token=hf_token)
model_file = [f for f in repo_files if f.endswith('.pt')][0]
file = hf_hub_download(
repo_id=repo_id,
filename=model_file,
repo_type='model',
token=hf_token,
)
return file
except (RepositoryNotFoundError, HFValidationError):
return None
def gsutil_getsize(url=''):
# gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du
s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8')
return eval(s.split(' ')[0]) if len(s) else 0 # bytes
""" The attempt_download function has been fixed by @kadirnar.
tag = subprocess.check_output('git tag', shell=True).decode().split()[-1]
IndexError: list index out of range
"""
def attempt_download(file, repo="WongKinYiu/yolov7"):
# Attempt file download if does not exist
file = Path(str(file).strip().replace("'", "").lower())
if not file.exists():
# Set default assets
assets = [
"yolov7.pt",
"yolov7-tiny.pt",
"yolov7x.pt",
"yolov7-d6.pt",
"yolov7-e6.pt",
"yolov7-e6e.pt",
"yolov7-w6.pt",
]
try:
response = requests.get(f"https://api.github.com/repos/{repo}/releases").json() # github api
if len(response) > 0:
release_assets = response[0] # get dictionary of assets
# Get names of assets if it rleases exists
assets = [release_asset["name"] for release_asset in release_assets["assets"]]
# Get first tag which is the latest tag
tag = release_assets.get("tag_name")
except KeyError: # fallback plan
tag = subprocess.check_output("git tag", shell=True).decode().split()[-1]
except subprocess.CalledProcessError: # fallback to default release if can't get tag
tag = "v0.1"
name = file.name
if name in assets:
msg = f"{file} missing, try downloading from https://github.com/{repo}/releases/"
redundant = False # second download option
try: # GitHub
url = f"https://github.com/{repo}/releases/download/{tag}/{name}"
print(f"Downloading {url} to {file}...")
torch.hub.download_url_to_file(url, file)
assert file.exists() and file.stat().st_size > 1e6 # check
except Exception as e: # GCP
print(f"Download error: {e}")
assert redundant, "No secondary mirror"
url = f"https://storage.googleapis.com/{repo}/ckpt/{name}"
print(f"Downloading {url} to {file}...")
# torch.hub.download_url_to_file(url, weights)
os.system(f"curl -L {url} -o {file}")
finally:
if not file.exists() or file.stat().st_size < 1e6: # check
file.unlink(missing_ok=True) # remove partial downloads
print(f"ERROR: Download failure: {msg}")
print("")
return file
return str(file) # return file
def gdrive_download(id='', file='tmp.zip'):
# Downloads a file from Google Drive. from yolov7.utils.google_utils import *; gdrive_download()
t = time.time()
file = Path(file)
cookie = Path('cookie') # gdrive cookie
print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='')
file.unlink(missing_ok=True) # remove existing file
cookie.unlink(missing_ok=True) # remove existing cookie
# Attempt file download
out = "NUL" if platform.system() == "Windows" else "/dev/null"
os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}')
if os.path.exists('cookie'): # large file
s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}'
else: # small file
s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"'
r = os.system(s) # execute, capture return
cookie.unlink(missing_ok=True) # remove existing cookie
# Error check
if r != 0:
file.unlink(missing_ok=True) # remove partial
print('Download error ') # raise Exception('Download error')
return r
# Unzip if archive
if file.suffix == '.zip':
print('unzipping... ', end='')
os.system(f'unzip -q {file}') # unzip
file.unlink() # remove zip to free space
print(f'Done ({time.time() - t:.1f}s)')
return r
def get_token(cookie="./cookie"):
with open(cookie) as f:
for line in f:
if "download" in line:
return line.split()[-1]
return ""
# def upload_blob(bucket_name, source_file_name, destination_blob_name):
# # Uploads a file to a bucket
# # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
#
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(destination_blob_name)
#
# blob.upload_from_filename(source_file_name)
#
# print('File {} uploaded to {}.'.format(
# source_file_name,
# destination_blob_name))
#
#
# def download_blob(bucket_name, source_blob_name, destination_file_name):
# # Uploads a blob from a bucket
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(source_blob_name)
#
# blob.download_to_filename(destination_file_name)
#
# print('Blob {} downloaded to {}.'.format(
# source_blob_name,
# destination_file_name))