1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
| import os import base64 import re import binascii import chardet import asyncio import contextvars import hashlib import hmac import aiohttp from datetime import datetime, timezone import mimetypes import aiofiles from . import logger
log = logger.setup_logger(__name__) OBS_DOMAIN = os.environ['obs_domain'] OBS_BUCKET = os.environ['obs_bucket'] OBS_SECRET_KEY = os.environ['OBS_SECRET_KEY'] OBS_ACCESS_KEY = os.environ['OBS_ACCESS_KEY'] OBS_HOST = f"{OBS_BUCKET}.{OBS_DOMAIN}" OBS_BASE_URL = f"https://{OBS_HOST}" FILE_PATH = contextvars.ContextVar("file path") UPLOAD_TO = contextvars.ContextVar("OBS object", default=None) MIME_TYPE = contextvars.ContextVar("mime type") PARTS = contextvars.ContextVar("parts", default=dict()) OBS_SESSION = contextvars.ContextVar("obs session") MAX_LENGTH = 100 * 1024 * 1024 SEM = asyncio.Semaphore(20)
def get_time() -> str: now = datetime.now(tz=timezone.utc) return now.strftime("%a, %d %b %Y %H:%M:%S %Z")
def get_authorization(method, headers, uri) -> str: canonicalizedheaders = get_canonicalizedheaders(headers) canonical_list = [ method, headers.get('Content-MD5', ""), headers.get('Content-Type', ""), headers.get('Date', ""), canonicalizedheaders + f'/{OBS_BUCKET}{uri}' ]
hashed = hmac.new( OBS_SECRET_KEY.encode('UTF-8'), "\n".join(canonical_list).encode('UTF-8'), hashlib.sha1 ) encode_canonical = binascii.b2a_base64( hashed.digest() )[:-1].decode('UTF-8') return f"OBS {OBS_ACCESS_KEY}:{encode_canonical}"
def get_canonicalizedheaders(headers: dict) -> str: result = '' for key in sorted(headers): key = key.lower() if key.startswith('x-obs-'): if any(ord(c) > 127 for c in key): raise Exception("header's key contain non-ascii charactor!") result += f"{key}:{headers[key]}\n" return result
async def get_rfc1864_md5(start=0, end=None, byte_str: bytes = None): hasher = hashlib.md5() if byte_str: hasher.update(byte_str) else: async for data in file_sender(start, end): hasher.update(data) md5_value = hasher.digest() base64_md5_value = base64.b64encode(md5_value).decode('utf-8') return base64_md5_value
async def get_encoding(file): async with aiofiles.open(file, mode='rb') as fd: data = await fd.read() return chardet.detect(data)['encoding']
async def get_mimetype(): file_path = FILE_PATH.get() content_type, _ = mimetypes.guess_type(file_path) if not content_type: content_type = 'application/octet-stream' elif file_path.endswith('.txt'): charset = await get_encoding(file_path) content_type = f"{content_type};charset={charset}" return content_type
async def file_sender(start=0, end=None): length = 64*1024 async with aiofiles.open(FILE_PATH.get(), 'rb') as f: if start: await f.seek(start) while True: if end: tell = await f.tell() assert tell <= end, "read too much data!" if end == tell: break left = end - tell if length > left: length = left data = await f.read(length) if not data: break yield data
def generate_objectname(upload_to=None): if not upload_to: job = os.environ['JOB_NAME'] build = os.environ['BUILD_ID'] file_name = os.path.basename(FILE_PATH.get()) upload_to = f"/{job}/{build}/{file_name}" return upload_to
async def put(size, start: int = None, end: int = None, params: dict = {}): uri = UPLOAD_TO.get() if params: uri = uri + "?" + "&".join([f"{k}={params[k]}" for k in params]) mime = 'application/octet-stream' if start else MIME_TYPE.get() headers = { 'Host': OBS_HOST, 'Content-Length': str(size), 'x-obs-acl': 'public-read', 'Date': get_time(), 'Content-Type': mime, 'Content-MD5': await get_rfc1864_md5(start, end) } headers['Authorization'] = get_authorization("PUT", headers, uri)
OBS = OBS_SESSION.get() async with ( SEM, OBS.put(uri, data=file_sender(start, end), headers=headers) as resp ): assert resp.status == 200, \ f"upload {uri} failed! error: {await resp.text()}" if start is not None: parts = PARTS.get() parts[params["partNumber"]] = resp.headers['ETag'] PARTS.set(parts) return f"{OBS_BASE_URL}{UPLOAD_TO.get()}"
async def del_uploads(upload_id): uri = f"{UPLOAD_TO.get()}?uploadId={upload_id}" headers = {'Host': OBS_HOST, 'Date': get_time()} headers['Authorization'] = get_authorization("DELETE", headers, uri)
async with OBS_SESSION.get().delete(uri, headers=headers) as resp: assert resp.status == 204, \ f"delete uploads {upload_id} failed: {await resp.text()}"
def split_file_yield(size): step = MAX_LENGTH for offset in range(0, size, step): start = offset left = size - offset end, length = (offset + step, step) if left > step else (size, left) yield (start, end, length)
async def init_uploads(): uri = f"{UPLOAD_TO.get()}?uploads" headers = { 'Host': OBS_HOST, 'Content-Type': MIME_TYPE.get(), 'x-obs-acl': 'public-read', 'Date': get_time() } headers['Authorization'] = get_authorization("POST", headers, uri) async with OBS_SESSION.get().post(uri, headers=headers) as resp: assert resp.status == 200, \ f"init uploads {uri} failed! error: {await resp.text()}" result = await resp.text() return re.search(r'UploadId>([^<\s]+)</UploadId', result).group(1)
async def puts(size, upload_id): part_number = 1 tasks = set() async with asyncio.TaskGroup() as tg: for start, end, length in split_file_yield(size): params = {"partNumber": part_number, "uploadId": upload_id} tasks.add(tg.create_task(put(length, start, end, params))) part_number += 1
async def merge_body() -> bytes: parts = PARTS.get() log.debug(parts) sorted_key = sorted(parts) part = "<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>" body = ''.join([part.format(key, parts[key]) for key in sorted_key]) body = f"<CompleteMultipartUpload>{body}</CompleteMultipartUpload>" log.debug(body) bs = body.encode() return bs, await get_rfc1864_md5(byte_str=bs)
async def merge(upload_id): uri = f"{UPLOAD_TO.get()}?uploadId={upload_id}" body, md5 = await merge_body() headers = { 'Host': OBS_HOST, 'Content-Length': str(len(body)), 'Date': get_time(), 'Content-MD5': md5, 'Content-Type': 'application/xml;charset=utf-8', } headers['Authorization'] = get_authorization("POST", headers, uri)
OBS = OBS_SESSION.get() async with OBS.post(uri, headers=headers, data=body) as resp: assert resp.status in [200, 500, 503], \ f"init uploads {uri} failed! error: {await resp.text()}"
async def chunked_put(size: int) -> str: upload_id = await init_uploads() log.info(f"uploadId: {upload_id}")
try: await puts(size, upload_id) except Exception: log.error(f"call 'puts({size}, {upload_id})' failed.") await del_uploads(upload_id) raise
log.debug("finish upload all parts of the file") await merge(upload_id) log.debug("finish merge all parts of the file") return f"{OBS_BASE_URL}{UPLOAD_TO.get()}"
async def upload( session: aiohttp.ClientSession, file_path: str, *, upload_to: str = None ) -> str: """ @session: aiohttp.ClientSession, obs aiohttp client session @file_path: str, the local file path @upload_to: str, the remote path to store, default is: /JOB_NAME/BUILD_ID/basename(file_path) return the file download url """ FILE_PATH.set(file_path) UPLOAD_TO.set(generate_objectname(upload_to)) MIME_TYPE.set(await get_mimetype()) OBS_SESSION.set(session)
size = os.path.getsize(file_path) if MAX_LENGTH < size: url = await chunked_put(size) else: url = await asyncio.create_task(put(size)) return url
|