|
- # Download utils
-
- import os
- import platform
- import subprocess
- import time
- import urllib
- from pathlib import Path
-
- import requests
- import torch
-
-
- def gsutil_getsize(url=''):
- # gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du
- s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8')
- return eval(s.split(' ')[0]) if len(s) else 0 # bytes
-
-
- def safe_download(file, url, url2=None, min_bytes=1E0, error_msg=''):
- # Attempts to download file from url or url2, checks and removes incomplete downloads < min_bytes
- file = Path(file)
- assert_msg = f"Downloaded file '{file}' does not exist or size is < min_bytes={min_bytes}"
- try: # url1
- print(f'Downloading {url} to {file}...')
- torch.hub.download_url_to_file(url, str(file))
- assert file.exists() and file.stat().st_size > min_bytes, assert_msg # check
- except Exception as e: # url2
- file.unlink(missing_ok=True) # remove partial downloads
- print(f'ERROR: {e}\nRe-attempting {url2 or url} to {file}...')
- os.system(f"curl -L '{url2 or url}' -o '{file}' --retry 3 -C -") # curl download, retry and resume on fail
- finally:
- if not file.exists() or file.stat().st_size < min_bytes: # check
- file.unlink(missing_ok=True) # remove partial downloads
- print(f"ERROR: {assert_msg}\n{error_msg}")
- print('')
-
-
- def attempt_download(file, repo='ultralytics/yolov5'): # from utils.downloads import *; attempt_download()
- # Attempt file download if does not exist
- file = Path(str(file).strip().replace("'", ''))
-
- if not file.exists():
- # URL specified
- name = Path(urllib.parse.unquote(str(file))).name # decode '%2F' to '/' etc.
- if str(file).startswith(('http:/', 'https:/')): # download
- url = str(file).replace(':/', '://') # Pathlib turns :// -> :/
- name = name.split('?')[0] # parse authentication https://url.com/file.txt?auth...
- safe_download(file=name, url=url, min_bytes=1E5)
- return name
-
- # GitHub assets
- file.parent.mkdir(parents=True, exist_ok=True) # make parent dir (if required)
- try:
- response = requests.get(f'https://api.github.com/repos/{repo}/releases/latest').json() # github api
- assets = [x['name'] for x in response['assets']] # release assets, i.e. ['yolov5s.pt', 'yolov5m.pt', ...]
- tag = response['tag_name'] # i.e. 'v1.0'
- except: # fallback plan
- assets = ['yolov5s.pt', 'yolov5m.pt', 'yolov5l.pt', 'yolov5x.pt',
- 'yolov5s6.pt', 'yolov5m6.pt', 'yolov5l6.pt', 'yolov5x6.pt']
- try:
- tag = subprocess.check_output('git tag', shell=True, stderr=subprocess.STDOUT).decode().split()[-1]
- except:
- tag = 'v5.0' # current release
-
- if name in assets:
- safe_download(file,
- url=f'https://github.com/{repo}/releases/download/{tag}/{name}',
- # url2=f'https://storage.googleapis.com/{repo}/ckpt/{name}', # backup url (optional)
- min_bytes=1E5,
- error_msg=f'{file} missing, try downloading from https://github.com/{repo}/releases/')
-
- return str(file)
-
-
- def gdrive_download(id='16TiPfZj7htmTyhntwcZyEEAejOUxuT6m', file='tmp.zip'):
- # Downloads a file from Google Drive. from yolov5.utils.downloads import *; gdrive_download()
- t = time.time()
- file = Path(file)
- cookie = Path('cookie') # gdrive cookie
- print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='')
- file.unlink(missing_ok=True) # remove existing file
- cookie.unlink(missing_ok=True) # remove existing cookie
-
- # Attempt file download
- out = "NUL" if platform.system() == "Windows" else "/dev/null"
- os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}')
- if os.path.exists('cookie'): # large file
- s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}'
- else: # small file
- s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"'
- r = os.system(s) # execute, capture return
- cookie.unlink(missing_ok=True) # remove existing cookie
-
- # Error check
- if r != 0:
- file.unlink(missing_ok=True) # remove partial
- print('Download error ') # raise Exception('Download error')
- return r
-
- # Unzip if archive
- if file.suffix == '.zip':
- print('unzipping... ', end='')
- os.system(f'unzip -q {file}') # unzip
- file.unlink() # remove zip to free space
-
- print(f'Done ({time.time() - t:.1f}s)')
- return r
-
-
- def get_token(cookie="./cookie"):
- with open(cookie) as f:
- for line in f:
- if "download" in line:
- return line.split()[-1]
- return ""
-
- # Google utils: https://cloud.google.com/storage/docs/reference/libraries ----------------------------------------------
- #
- #
- # def upload_blob(bucket_name, source_file_name, destination_blob_name):
- # # Uploads a file to a bucket
- # # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
- #
- # storage_client = storage.Client()
- # bucket = storage_client.get_bucket(bucket_name)
- # blob = bucket.blob(destination_blob_name)
- #
- # blob.upload_from_filename(source_file_name)
- #
- # print('File {} uploaded to {}.'.format(
- # source_file_name,
- # destination_blob_name))
- #
- #
- # def download_blob(bucket_name, source_blob_name, destination_file_name):
- # # Uploads a blob from a bucket
- # storage_client = storage.Client()
- # bucket = storage_client.get_bucket(bucket_name)
- # blob = bucket.blob(source_blob_name)
- #
- # blob.download_to_filename(destination_file_name)
- #
- # print('Blob {} downloaded to {}.'.format(
- # source_blob_name,
- # destination_file_name))
|