From 5ff80e2f6171e50b6ca8cd5031e2ba2c1514d064 Mon Sep 17 00:00:00 2001 From: hlf20010508 Date: Wed, 13 Sep 2023 09:48:51 +0800 Subject: [PATCH] feat: support onedrive session, upload from url, work in memory - set default delete_flag in env - auto relogin - add from_users to filter user, need tg_user_name - add /url file_url to upload from url - add /clear to clear history - can reply error - no temp file anymore, all jobs will be working in memory. --- .dockerignore | 1 - README.md | 12 +- auth_server.py | 34 +-- bot.py | 388 ++++++++++++++++++-------------- docker-compose.yml | 2 + onedrive.py | 214 ++++++++++++++++-- onedrive_large_file_uploader.py | 184 --------------- 7 files changed, 438 insertions(+), 397 deletions(-) delete mode 100644 onedrive_large_file_uploader.py diff --git a/.dockerignore b/.dockerignore index 4e2fd06..e529f83 100755 --- a/.dockerignore +++ b/.dockerignore @@ -4,5 +4,4 @@ !auth_server.py !ssl !templates -!onedrive_large_file_uploader.py !log.py \ No newline at end of file diff --git a/README.md b/README.md index f7bc3e6..a1080f0 100755 --- a/README.md +++ b/README.md @@ -22,25 +22,29 @@ That's why you need to prepare a lot of things to use this bot. - `/start` to start with bot. - `/auth` to authorize telegram and onedrive. - `/status` to show pinned status message. +- `/clear` to clear all history except status message. - `/links message_link range` to transfer sequential restricted content. +- `/url file_url` to upload file through url. - `/autoDelete true/false` decides whether bot can auto delete message. - `/help` for help. Example: `/links https://t.me/c/xxxxxxx/100 2` will transfer `https://t.me/c/xxxxxxx/100` and `https://t.me/c/xxxxxxx/101`. +`/url https://example.com/file.txt` will upload `file.txt` to Onedrive. It calls Onedrive's API, which means Onedrive's server will visit the url and download the file for you. ## Authorization Steps - Send `/auth`. - Wait and you'll receive the login code from telegram. - Visit the uri the bot sends, and submit the code. -- After submission, it will redirect to the authorization uri for OneDrive. Login and authorize. -- If the bot says `Authorization successful!`, everything is done. +- After submission, it will send the authorization uri for OneDrive. Visit, login and authorize. +- If the bot says `Onedrive authorization successful!`, everything is done. ## Usage - Add this bot to a group or channel. - In the group or channel, forward or upload files(or videos, photos). - If you want to transfer restricted content from a group or channel, right click the content, copy the message link, and send the link. - Wait until the transfer completes. You can check status on pinned status message. +- Use `/help` for more information about other commands. ## Preparation - Open `docker-compose.yml` and edit the environment config. @@ -48,6 +52,7 @@ Example: - Create a Telegram bot through [BotFather](https://t.me/BotFather). Record `token` as `tg_bot_token`. - Create a Telegram application on [my.telegram.org](https://my.telegram.org). See [details](https://docs.telethon.dev/en/stable/basic/signing-in.html). Record `api_id` as `tg_api_id`, `api_hash` as `tg_api_hash`. - `tg_user_phone` is the phone number you just used to login to my.telegram.org. +- `tg_user_name` is your telegram user name. Check your profile, find your user name, it should be like `@user`, then record `user` as `tg_user_name`. Optional, default to void. If you don't set this parameter, every one can control your bot. - Create a OneDrive application on [portal.azure.com](https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/ApplicationsListBlade) App registrations. - Press `New registrations`. - Fill `Name`. @@ -57,7 +62,8 @@ Example: - Press `Register`. - In application's `Overview`, record `Application (client) ID` as `od_client_id`. - Go to application's `Certificates & secrets`, press `Client secrets`, and press `New client secret`. Then fill `Description`, and choose an `Expires`. Finnaly, press `Add`. Record `Value` as `od_client_secret`. -- `remote_root_path` is a directory on OneDrive. Like `/MyFiles/Telegram`. +- `remote_root_path` is a directory on OneDrive. Like `/MyFiles/Telegram`. Default to `/`. +- `delete_flag` decides whether bot can auto delete message. Pass `true` or `false`. Optional, default to `false`. ## Launch Through Docker ```sh diff --git a/auth_server.py b/auth_server.py index dc119c8..ac9ffcf 100644 --- a/auth_server.py +++ b/auth_server.py @@ -10,10 +10,8 @@ app = Flask(__name__) -temp_dir = "temp" - -if not os.path.exists(temp_dir): - os.mkdir(temp_dir) +code_tg = '' +code_od = '' @app.route("/") @@ -23,40 +21,28 @@ def telegram_code_index(): @app.route("/tg", methods=["GET", "POST"]) def telegram_code(): - code_path = os.path.join(temp_dir, "tg_code") + global code_tg if request.method == "POST": - code = request.json["code"] - with open(code_path, "w") as file: - file.write(code) + code_tg = request.json["code"] return jsonify({"success": True}) if request.method == "GET": - if not os.path.exists(code_path): + if not code_tg: return jsonify({"success": False}) else: - code = "" - with open(code_path, "r") as file: - code = file.read() - os.remove(code_path) - return jsonify({"success": True, "code": code}) + return jsonify({"success": True, "code": code_tg}) @app.route("/auth") def onedrive_code(): - code_path = os.path.join(temp_dir, "od_code") + global code_od if not request.args.get("get"): - code = request.args.get("code") - with open(code_path, "w") as file: - file.write(code) + code_od = request.args.get("code") return "Authorization Successful!" else: - if not os.path.exists(code_path): + if not code_od: return jsonify({"success": False}) else: - code = "" - with open(code_path, "r") as file: - code = file.read() - os.remove(code_path) - return jsonify({"success": True, "code": code}) + return jsonify({"success": True, "code": code_od}) if __name__ == "__main__": diff --git a/bot.py b/bot.py index 8842460..2f168d1 100644 --- a/bot.py +++ b/bot.py @@ -9,10 +9,10 @@ import urllib3 import asyncio import math -from time import sleep import subprocess import re import inspect +from io import BytesIO from telethon import TelegramClient, events from telethon.tl import types import requests @@ -21,30 +21,39 @@ urllib3.disable_warnings() -temp_dir = "temp" status_bar = None -delete_flag = False + +cmd_helper = '''- /auth: Authorize for Telegram and OneDrive. +- /status: Show pinned status message. +- /clear: Clear all history except status message. + +- `/links` message_link range: Transfer sequential restricted content. +- `/url` file_url: Upload file through url. +- `/autoDelete true` to auto delete message. +- `/autoDelete false` to not auto delete message. +''' # auth server server_uri = os.environ["server_uri"] - # telegram api tg_api_id = int(os.environ["tg_api_id"]) tg_api_hash = os.environ["tg_api_hash"] tg_user_phone = os.environ["tg_user_phone"] - +tg_user_name = os.environ.get("tg_user_name", None) # telegram bot tg_bot_token = os.environ["tg_bot_token"] - # onedrive od_client_id = os.environ["od_client_id"] od_client_secret = os.environ["od_client_secret"] remote_root_path = os.environ.get("remote_root_path", "/") +delete_flag = True if os.environ.get("delete_flag", "false") == 'true' else False + # clients tg_bot = TelegramClient("bot", tg_api_id, tg_api_hash, sequential_updates=True).start( bot_token=tg_bot_token ) + tg_client = TelegramClient("user", tg_api_id, tg_api_hash, sequential_updates=True) onedrive = Onedrive( @@ -54,23 +63,24 @@ remote_root_path=remote_root_path, ) -def clear_temp(): - for file in os.listdir(temp_dir): - os.remove(os.path.join(temp_dir, file)) - -if not os.path.exists(temp_dir): - os.mkdir(temp_dir) -else: - clear_temp() def cmd_parser(event): return event.text.split()[1:] + +async def clear_history(event): + ids = [] + async for message in tg_client.iter_messages(event.chat_id): + ids.append(message.id) + await tg_client.delete_messages(event.chat_id, ids) + + async def delete_message(message): global delete_flag if delete_flag: await message.delete() + async def check_in_group(event): if isinstance(event.message.peer_id, types.PeerUser): await event.respond(''' @@ -80,23 +90,100 @@ async def check_in_group(event): ''') raise events.StopPropagation + +async def check_login(event): + try: + if await tg_client.get_me(): + return True + else: + await res_not_login(event) + return False + except: + await res_not_login(event) + return False + + async def res_not_login(event): await event.respond(''' You haven't logined to Telegram. - -Use /auth to login. ''') - raise events.StopPropagation + await auth(event) -cmd_helper = '''/auth: Authorize for Telegram and OneDrive. -/status: Show pinned status message. -`/links` message_link range: Transfer sequential restricted content. -`/autoDelete true` to auto delete message. -`/autoDelete false` to not auto delete message. -''' +async def download_part(client, input_location, offset, part_size): + stream = client.iter_download( + input_location, offset=offset, request_size=part_size, limit=part_size + ) + part = await stream.__anext__() + await stream.close() + return part + + +async def multi_parts_uploader( + client, document, name, conn_num=5, progress_callback=None +): + input_location = types.InputDocumentFileLocation( + id=document.id, + access_hash=document.access_hash, + file_reference=document.file_reference, + thumb_size="", + ) + + upload_session = onedrive.multipart_upload_session_builder(name) + uploader = onedrive.multipart_uploader(upload_session, document.size) + + task_list = [] + part_size = 2 * 1024 * 1024 + total_part_num = ( + 1 if part_size >= document.size else math.ceil(document.size / part_size) + ) + current_part_num = 0 + current_size = 0 + offset = 0 + pre_offset = 0 + if progress_callback: + cor = progress_callback(current_size, document.size) + if inspect.isawaitable(cor): + await cor + + buffer = BytesIO() + while current_part_num < total_part_num: + task_list.append( + asyncio.ensure_future(download_part(client, input_location, offset, part_size)) + ) + current_part_num += 1 + if current_part_num < total_part_num: + offset += part_size + if current_part_num % conn_num == 0 or current_part_num == total_part_num: + for part in await asyncio.gather(*task_list): + buffer.write(part) + current_size += len(part) + task_list.clear() + buffer.seek(0) + await onedrive.multipart_upload(uploader, buffer, pre_offset, buffer.getbuffer().nbytes) + pre_offset = offset + buffer = BytesIO() + if progress_callback: + cor = progress_callback(current_size, document.size) + if inspect.isawaitable(cor): + await cor + buffer.close() -@tg_bot.on(events.NewMessage(pattern="/start", incoming=True)) + +def get_link(string): + regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" + url = re.findall(regex,string) + try: + link = [x[0] for x in url][0] + if link: + return link + else: + return False + except Exception: + return False + + +@tg_bot.on(events.NewMessage(pattern="/start", incoming=True, from_users=tg_user_name)) async def start(event): """Send a message when the command /start is issued.""" await event.respond(''' @@ -105,24 +192,25 @@ async def start(event): Forward or upload files to me, or pass message link to transfer restricted content from group or channel. %s -/help: Ask for help. +- /help: Ask for help. '''%cmd_helper) raise events.StopPropagation -@tg_bot.on(events.NewMessage(pattern="/help", incoming=True)) +@tg_bot.on(events.NewMessage(pattern="/help", incoming=True, from_users=tg_user_name)) async def help(event): """Send a message when the command /help is issued.""" await event.respond(''' %s -To transfer files, forward or upload to me. -To transfer restricted content, right click the content, copy the message link, and send to me. +- To transfer files, forward or upload to me. +- To transfer restricted content, right click the content, copy the message link, and send to me. +- Uploading through url will call Onedrive's API, which means Onedrive's server will visit the url and download the file for you. '''%cmd_helper) raise events.StopPropagation -@tg_bot.on(events.NewMessage(pattern="/auth", incoming=True)) +@tg_bot.on(events.NewMessage(pattern="/auth", incoming=True, from_users=tg_user_name)) async def auth(event): await check_in_group(event) auth_server = subprocess.Popen(('python', 'auth_server.py')) @@ -134,20 +222,20 @@ async def tg_code_callback(): ) res = requests.get(url=os.path.join(server_uri, "tg"), verify=False).json() while not res["success"]: - sleep(1) + await asyncio.sleep(1) res = requests.get( url=os.path.join(server_uri, "tg"), verify=False ).json() return res["code"] - def od_code_callback(): + async def od_code_callback(): res = requests.get( url=os.path.join(server_uri, "auth"), params={"get": True}, verify=False, ).json() while not res["success"]: - sleep(1) + await asyncio.sleep(1) res = requests.get( url=os.path.join(server_uri, "auth"), params={"get": True}, @@ -160,24 +248,26 @@ def od_code_callback(): tg_client = await tg_client.start(tg_user_phone, code_callback=tg_code_callback) await conv.send_message("Login to Telegram successful!") + try: + onedrive.load_session() + except: + auth_url = onedrive.get_auth_url() + await conv.send_message( + "Here are the authorization url of OneDrive:\n\n%s" % auth_url + ) + code = await od_code_callback() + onedrive.auth(code) + await conv.send_message("Onedrive authorization successful!") + async for message in tg_client.iter_messages(event.chat_id, filter=types.InputMessagesFilterPinned()): await tg_client.unpin_message(event.chat_id, message) - - auth_url = onedrive.get_auth_url() - await conv.send_message( - "Here are the authorization url of OneDrive:\n\n%s" % auth_url - ) - code = od_code_callback() - onedrive.auth(code) - await conv.send_message("Authorization successful!") - global status_bar status_bar = await conv.send_message("Status:\n\nNo job yet.") await tg_bot.pin_message(event.chat_id, status_bar) auth_server.kill() - raise events.StopPropagation -@tg_bot.on(events.NewMessage(pattern="/autoDelete", incoming=True)) + +@tg_bot.on(events.NewMessage(pattern="/autoDelete", incoming=True, from_users=tg_user_name)) async def auto_delete(event): global delete_flag error_message = ''' @@ -199,82 +289,77 @@ async def auto_delete(event): await event.respond(error_message) raise events.StopPropagation -@tg_bot.on(events.NewMessage(pattern="/status", incoming=True)) + +@tg_bot.on(events.NewMessage(pattern="/status", incoming=True, from_users=tg_user_name)) async def status(event): await check_in_group(event) - global status_bar - try: + if await check_login(event): + global status_bar async for message in tg_client.iter_messages(event.chat_id, filter=types.InputMessagesFilterPinned()): await tg_client.unpin_message(event.chat_id, message) - except: - await res_not_login(event) - status_bar = await event.respond("Status:\n\nNo job yet.") - await tg_bot.pin_message(event.chat_id, status_bar) + status_bar = await event.respond("Status:\n\nNo job yet.") + await tg_bot.pin_message(event.chat_id, status_bar) raise events.StopPropagation - -async def multi_parts_downloader( - client, document, path, conn_num=10, progress_callback=None -): - async def download_part(input_location, offset, part_size): - stream = client.iter_download( - input_location, offset=offset, request_size=part_size, limit=part_size - ) - part = await stream.__anext__() - await stream.close() - return part - with open(path, "wb") as file: - input_location = types.InputDocumentFileLocation( - id=document.id, - access_hash=document.access_hash, - file_reference=document.file_reference, - thumb_size="", - ) - task_list = [] - part_size = 5 * 1024 * 1024 - total_part_num = ( - 1 if part_size >= document.size else math.ceil(document.size / part_size) - ) - current_part_num = 0 - current_size = 0 - offset = 0 - if progress_callback: - cor = progress_callback(current_size, document.size) - if inspect.isawaitable(cor): - await cor - while current_part_num < total_part_num: - task_list.append( - asyncio.ensure_future(download_part(input_location, offset, part_size)) - ) - current_part_num += 1 - if current_part_num < total_part_num: - offset += part_size - if current_part_num % conn_num == 0 or current_part_num == total_part_num: - for part in await asyncio.gather(*task_list): - file.write(part) - current_size += len(part) - task_list.clear() - if progress_callback: - cor = progress_callback(current_size, document.size) - if inspect.isawaitable(cor): - await cor +@tg_bot.on(events.NewMessage(pattern="/clear", incoming=True, from_users=tg_user_name)) +async def clear(event): + await check_in_group(event) + await check_login(event) + await clear_history(event) + await status(event) + raise events.StopPropagation + + +@tg_bot.on(events.NewMessage(pattern="/url", incoming=True, from_users=tg_user_name)) +async def url(event): + await check_in_group(event) + await check_login(event) -def get_link(string): - regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" - url = re.findall(regex,string) try: - link = [x[0] for x in url][0] - if link: - return link - else: - return False - except Exception: - return False + cmd = cmd_parser(event) + _url = cmd[0] + name = _url.split('/')[-1] + except: + await delete_message(event) + await event.respond(''' +Command `/url` format wrong. + +Usage: `/url` file_url + ''') + raise events.StopPropagation -@tg_bot.on(events.NewMessage(pattern="/links", incoming=True)) + try: + progress_url = onedrive.upload_from_url(_url) + print(progress_url) + progress = onedrive.upload_from_url_progress(progress_url) + while progress['status'] in ['notStarted', 'inProgress']: + status = "Uploaded: %.2f%%" % float(progress['percentageComplete']) + logger(status) + msg_link = 'https://t.me/c/%d/%d'%(event.message.peer_id.channel_id, event.message.id) + await tg_bot.edit_message(status_bar, 'Status:\n\n%s\n\n%s'%(msg_link, status)) + await asyncio.sleep(5) + progress = onedrive.upload_from_url_progress(progress_url) + + status = "Uploaded: %.2f%%" % float(progress['percentageComplete']) + logger(status) + logger("File uploaded to %s"%os.path.join(onedrive.remote_root_path, name)) + msg_link = 'https://t.me/c/%d/%d'%(event.message.peer_id.channel_id, event.message.id) + await tg_bot.edit_message(status_bar, 'Status:\n\n%s\n\n%s'%(msg_link, status)) + if not delete_flag: + await event.reply('Done.') + await delete_message(event) + await tg_bot.edit_message(status_bar, 'Status:\n\nNo job yet.') + except Exception as e: + logger(e) + await event.reply('Error: %s'%e) + raise events.StopPropagation + + +@tg_bot.on(events.NewMessage(pattern="/links", incoming=True, from_users=tg_user_name)) async def links(event): await check_in_group(event) + await check_login(event) try: cmd = cmd_parser(event) link = cmd[0] @@ -282,12 +367,8 @@ async def links(event): link_body = '/'.join(link.split('/')[:-1]) offsets = int(cmd[1]) await delete_message(event) - try: - for offset in range(offsets): - await tg_client.send_message(event.chat_id, message='%s/%d'%(link_body, head_message_id + offset)) - except: - await delete_message(event) - await res_not_login(event) + for offset in range(offsets): + await tg_client.send_message(event.chat_id, message='%s/%d'%(link_body, head_message_id + offset)) except: await delete_message(event) await event.respond(''' @@ -298,36 +379,22 @@ async def links(event): raise events.StopPropagation raise events.StopPropagation -@tg_bot.on(events.NewMessage(incoming=True)) + +@tg_bot.on(events.NewMessage(incoming=True, from_users=tg_user_name)) async def transfer(event): - clear_temp() - up_or_down = 'Downloaded' + await check_in_group(event) + await check_login(event) + async def callback(current, total): current = current / (1024 * 1024) total = total / (1024 * 1024) - status = "%s %.2fMB out of %.2fMB: %.2f%%"% (up_or_down, current, total, current / total * 100) + status = "Uploaded %.2fMB out of %.2fMB: %.2f%%"% (current, total, current / total * 100) logger(status) msg_link = 'https://t.me/c/%d/%d'%(event.message.peer_id.channel_id, event.message.id) await tg_bot.edit_message(status_bar, 'Status:\n\n%s\n\n%s'%(msg_link, status)) - async def upload(local_path): - nonlocal up_or_down - up_or_down = "Uploaded" - remote_path = await onedrive.upload(local_path, upload_status=callback) - logger("File uploaded to %s"%remote_path) - clear_temp() - if not delete_flag: - await event.reply('Done.') - await tg_bot.edit_message(status_bar, 'Status:\n\nNo job yet.') - - await check_in_group(event) - if event.media and not isinstance(event.media, types.MessageMediaWebPage): - try: - message = await tg_client.get_messages(event.message.peer_id, ids=event.message.id) - except: - await delete_message(event) - await res_not_login(event) + message = await tg_client.get_messages(event.message.peer_id, ids=event.message.id) try: if "document" in event.media.to_dict().keys(): @@ -335,29 +402,24 @@ async def upload(local_path): if "document" in message.media.to_dict().keys(): if event.media.document.id == message.media.document.id: name = "%d%s" % (event.media.document.id, event.file.ext) - local_path = os.path.join(temp_dir, name) - await multi_parts_downloader( - tg_client, - message.media.document, - local_path, - progress_callback=callback, - ) - logger("File saved to %s"%local_path) - await upload(local_path) + await multi_parts_uploader(tg_client, message.media.document, name, progress_callback=callback) + logger("File uploaded to %s" % os.path.join(remote_root_path, name)) await delete_message(message) + await tg_bot.edit_message(status_bar, "Status:\n\nNo job yet.") if "photo" in event.media.to_dict().keys(): if message.media: if "photo" in message.media.to_dict().keys(): if event.media.photo.id == message.media.photo.id: name = "%d%s" % (event.media.photo.id, event.file.ext) - local_path = os.path.join(temp_dir, name) - await message.download_media(file=local_path, progress_callback=callback) - logger("File saved to %s"%local_path) - await upload(local_path) + buffer = await message.download_media(file=bytes, progress_callback=callback) + onedrive.stream_upload(buffer, name) + logger("File uploaded to %s" % os.path.join(remote_root_path, name)) await delete_message(message) + await tg_bot.edit_message(status_bar, "Status:\n\nNo job yet.") except Exception as e: logger(e) + await event.reply('Error: %s'%e) else: msg_link = get_link(event.text) @@ -371,38 +433,34 @@ async def upload(local_path): chat = str(msg_link.split("/")[-2]) else: chat = int('-100' + str(msg_link.split("/")[-2])) - try: - message = await tg_client.get_messages(chat, ids=msg_id) - except: - await delete_message(event) - await res_not_login(event) + + message = await tg_client.get_messages(chat, ids=msg_id) + if message: try: if "document" in message.media.to_dict().keys(): name = "%d%s" % (message.media.document.id, message.file.ext) - local_path = os.path.join(temp_dir, name) - await multi_parts_downloader( - tg_client, - message.media.document, - local_path, - progress_callback=callback, - ) - logger("File saved to %s"%local_path) - await upload(local_path) + await multi_parts_uploader(tg_client, message.media.document, name, progress_callback=callback) + logger("File uploaded to %s" % os.path.join(remote_root_path, name)) await delete_message(event) + await tg_bot.edit_message(status_bar, "Status:\n\nNo job yet.") + if "photo" in message.media.to_dict().keys(): name = "%d%s" % (message.media.photo.id, message.file.ext) - local_path = os.path.join(temp_dir, name) - await message.download_media(file=local_path, progress_callback=callback) - logger("File saved to %s"%local_path) - await upload(local_path) + buffer = await message.download_media(file=bytes, progress_callback=callback) + onedrive.stream_upload(buffer, name) + logger("File uploaded to %s" % os.path.join(remote_root_path, name)) await delete_message(event) + await tg_bot.edit_message(status_bar, "Status:\n\nNo job yet.") + except Exception as e: logger(e) + await event.reply('Error: %s'%e) else: await event.reply("Message not found.") raise events.StopPropagation + def main(): tg_bot.run_until_disconnected() diff --git a/docker-compose.yml b/docker-compose.yml index e6730ec..fe207d5 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,9 @@ services: - tg_api_id=$tg_api_id - tg_api_hash=$tg_api_hash - tg_user_phone=$tg_user_phone + - tg_user_name=$tg_user_name - od_client_id=$od_client_id - od_client_secret=$od_client_secret - remote_root_path=$remote_root_path + - delete_flag=$delete_flag command: python bot.py \ No newline at end of file diff --git a/onedrive.py b/onedrive.py index 4c167f5..ccb790c 100644 --- a/onedrive.py +++ b/onedrive.py @@ -5,26 +5,56 @@ :license: MIT, see LICENSE for more details. """ -import onedrivesdk -import os -import onedrive_large_file_uploader # overwrite `ItemRequestBuilder.upload_async` method +from onedrivesdk import HttpProvider, AuthProvider, OneDriveClient +from onedrivesdk.options import HeaderOption +from onedrivesdk.error import OneDriveError +from onedrivesdk.model.upload_session import UploadSession +from onedrivesdk.model.item import Item +from onedrivesdk.request.item_create_session import ItemCreateSessionRequestBuilder +from onedrivesdk.request.item_request_builder import ItemRequestBuilder +from onedrivesdk.request_builder_base import RequestBuilderBase +from onedrivesdk.request_base import RequestBase +import json +import asyncio +def authenticate_request(self, request): + if self._session is None: + raise RuntimeError("""Session must be authenticated + before applying authentication to a request.""") + + if self._session.is_expired() and 'offline_access' in self.scopes: + self.refresh_token() + self.save_session(path='onedrive.session') + + request.append_option( + HeaderOption("Authorization", + "bearer {}".format(self._session.access_token))) + +def create_session(self, item=None): + return ItemCreateSessionRequestBuilder(self.append_to_request_url("createUploadSession"), self._client, item=item) class Onedrive: def __init__(self, client_id, client_secret, redirect_uri, remote_root_path): - api_base_url = "https://api.onedrive.com/v1.0/" - scopes = ["wl.signin", "wl.offline_access", "onedrive.readwrite"] + api_base_url = "https://graph.microsoft.com/v1.0/" + auth_server_url = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize" + + scopes = ["offline_access", "Files.ReadWrite"] - http_provider = onedrivesdk.HttpProvider() - auth_provider = onedrivesdk.AuthProvider( - http_provider=http_provider, client_id=client_id, scopes=scopes + http_provider = HttpProvider() + auth_provider = AuthProvider( + http_provider=http_provider, + client_id=client_id, + scopes=scopes, + auth_server_url=auth_server_url ) self.remote_root_path = remote_root_path self.client_secret = client_secret self.redirect_uri = redirect_uri - self.client = onedrivesdk.OneDriveClient( - api_base_url, auth_provider, http_provider + self.client = OneDriveClient( + api_base_url, + auth_provider, + http_provider ) def get_auth_url(self): @@ -32,17 +62,161 @@ def get_auth_url(self): def auth(self, auth_code): self.client.auth_provider.authenticate( - auth_code, self.redirect_uri, self.client_secret + auth_code, + self.redirect_uri, + self.client_secret ) + self.save_session() + + def save_session(self): + self.client.auth_provider.save_session(path='onedrive.session') - async def upload(self, file_path, upload_status=None): - name = file_path.split("/")[-1] - if upload_status: - await self.client.item(path=self.remote_root_path).children[name].upload_async( - file_path, upload_status=upload_status - ) + def load_session(self): + self.client.auth_provider.load_session(path='onedrive.session') + + def stream_upload(self, buffer, name): + request = self.client.item(path=self.remote_root_path).children[name].content.request() + request.method = 'PUT' + request.send(data=buffer) + + def multipart_upload_session_builder(self, name): + item = Item({}) + session = self.client.item(path=self.remote_root_path).children[name].create_session(item).post() + return session + + def multipart_uploader(self, session, total_length): + return ItemUploadFragmentBuilder(session.upload_url, self.client, total_length) + + async def multipart_upload(self, uploader, buffer, offset, part_size): + tries = 0 + while True: + try: + tries += 1 + uploader.post(offset, part_size, buffer) + except OneDriveError as exc: + if exc.status_code in (408, 500, 502, 503, 504) and tries < 5: + await asyncio.sleep(0.1) + continue + elif exc.status_code == 416: + # Fragment already received + break + elif exc.status_code == 401: + self._client.auth_provider.refresh_token() + continue + else: + raise exc + except ValueError: + # Swallow value errors (usually JSON error) and try again. + continue + break # while True + + def upload_from_url(self, url): + name = url.split("/")[-1] + opts = [ + HeaderOption('Prefer', 'respond-async'), + ] + + request = self.client.item(path=self.remote_root_path).children.request(options=opts) + request.content_type = 'application/json' + request.method = 'POST' + + data = { + "@microsoft.graph.sourceUrl": url, + "name": name, + "file": {} + } + response = request.send(content=data) + + if response.status == 202: + progress_url = response.headers['Location'] + return progress_url else: - await self.client.item(path=self.remote_root_path).children[name].upload_async( - file_path + response_dict = { + 'Status': response.status, + 'Headers': response.headers, + 'Content': response.content + } + raise OneDriveError(response_dict, response.status) + + def upload_from_url_progress(self, url): + response = self.client.http_provider.send( + method="GET", + headers={}, + url=url + ) + response = json.loads(response.content) + return response + + +class ItemUploadFragment(RequestBase): + def __init__(self, request_url, client, options, buffer): + super(ItemUploadFragment, self).__init__(request_url, client, options) + self.method = "PUT" + self._buffer = buffer + + def post(self): + """Sends the POST request + + Returns: + :class:`UploadSession`: + The resulting entity from the operation + """ + entity = UploadSession(json.loads(self.send(data=self._buffer).content)) + return entity + + +class ItemUploadFragmentBuilder(RequestBuilderBase): + def __init__(self, request_url, client, total_length): + super(ItemUploadFragmentBuilder, self).__init__(request_url, client) + self._method_options = {} + self._total_length = total_length + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self._buffer.close() + + def request(self, begin, length, buffer, options=None): + """Builds the request for the ItemUploadFragment + + Args: + begin (int): First byte in range to be uploaded + length (int): Number of bytes in range to be uploaded + options (list of :class:`Option`): + Default to None, list of options to include in the request + + Returns: + :class:`ItemUploadFragment`: + The request + """ + if not (options is None or len(options) == 0): + opts = options.copy() + else: + opts = [] + + self.content_type = "application/octet-stream" + + opts.append( + HeaderOption( + "Content-Range", + "bytes %d-%d/%d" % (begin, begin + length - 1, self._total_length), ) - return os.path.join(self.remote_root_path, name) + ) + opts.append(HeaderOption("Content-Length", str(length))) + + req = ItemUploadFragment(self._request_url, self._client, opts, buffer) + return req + + def post(self, begin, length, buffer, options=None): + """Sends the POST request + + Returns: + :class:`UploadedFragment`: + The resulting UploadSession from the operation + """ + return self.request(begin, length, buffer, options).post() + +# Overwrite the standard upload operation to use this one +AuthProvider.authenticate_request = authenticate_request +ItemRequestBuilder.create_session = create_session \ No newline at end of file diff --git a/onedrive_large_file_uploader.py b/onedrive_large_file_uploader.py deleted file mode 100644 index 33b36ff..0000000 --- a/onedrive_large_file_uploader.py +++ /dev/null @@ -1,184 +0,0 @@ -# -*- coding: utf-8 -*- -""" ------------------------------------------------------------------------------- - Copyright (c) 2015 Microsoft Corporation - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in - all copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. ------------------------------------------------------------------------------- -""" - -from onedrivesdk.error import OneDriveError -from onedrivesdk.model.upload_session import UploadSession -from onedrivesdk.model.item import Item -from onedrivesdk.options import HeaderOption -from onedrivesdk.request.item_request_builder import ItemRequestBuilder -from onedrivesdk.request_builder_base import RequestBuilderBase -from onedrivesdk.request_base import RequestBase -from onedrivesdk.helpers.file_slice import FileSlice -import json -import math -import os -import inspect -import asyncio - -__PART_SIZE = 10 * 1024 * 1024 -__MAX_SINGLE_FILE_UPLOAD = 10 * 1024 * 1024 - - -class ItemUploadFragment(RequestBase): - def __init__(self, request_url, client, options, file_handle): - super(ItemUploadFragment, self).__init__(request_url, client, options) - self.method = "PUT" - self._file_handle = file_handle - - def post(self): - """Sends the POST request - - Returns: - :class:`UploadSession`: - The resulting entity from the operation - """ - entity = UploadSession(json.loads(self.send(data=self._file_handle).content)) - return entity - - -class ItemUploadFragmentBuilder(RequestBuilderBase): - def __init__(self, request_url, client, content_local_path): - super(ItemUploadFragmentBuilder, self).__init__(request_url, client) - self._method_options = {} - self._file_handle = open(content_local_path, "rb") - self._total_length = os.stat(content_local_path).st_size - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self._file_handle.close() - - def request(self, begin, length, options=None): - """Builds the request for the ItemUploadFragment - - Args: - begin (int): First byte in range to be uploaded - length (int): Number of bytes in range to be uploaded - options (list of :class:`Option`): - Default to None, list of options to include in the request - - Returns: - :class:`ItemUploadFragment`: - The request - """ - if not (options is None or len(options) == 0): - opts = options.copy() - else: - opts = [] - - self.content_type = "application/octet-stream" - - opts.append( - HeaderOption( - "Content-Range", - "bytes %d-%d/%d" % (begin, begin + length - 1, self._total_length), - ) - ) - opts.append(HeaderOption("Content-Length", str(length))) - - file_slice = FileSlice(self._file_handle, begin, length=length) - req = ItemUploadFragment(self._request_url, self._client, opts, file_slice) - return req - - def post(self, begin, length, options=None): - """Sends the POST request - - Returns: - :class:`UploadedFragment`: - The resulting UploadSession from the operation - """ - return self.request(begin, length, options).post() - - -async def fragment_upload_async(self, local_path, conflict_behavior=None, upload_status=None): - """Uploads file using PUT using multipart upload if needed. - - Args: - local_path (str): The path to the local file to upload. - conflict_behavior (str): conflict behavior if the file is already - uploaded. Use None value if file should be replaced or "rename", if - the new file should get a new name - upload_status (func): function(current_part, total_parts) to be called - with upload status for each 10MB part - - Returns: - Created entity. - """ - file_size = os.stat(local_path).st_size - if file_size <= __MAX_SINGLE_FILE_UPLOAD: - # fallback to single shot upload if file is small enough - return self.content.request().upload(local_path) - else: - # multipart upload needed for larger files - if conflict_behavior: - item = Item({"@name.conflictBehavior": conflict_behavior}) - else: - item = Item({}) - - session = self.create_session(item).post() - - with ItemUploadFragmentBuilder( - session.upload_url, self._client, local_path - ) as upload_builder: - total_parts = math.ceil(file_size / __PART_SIZE) - for i in range(total_parts): - if upload_status: - cor = upload_status(i * __PART_SIZE, file_size) - if inspect.isawaitable(cor): - await cor - - length = min(__PART_SIZE, file_size - i * __PART_SIZE) - tries = 0 - while True: - try: - tries += 1 - resp = upload_builder.post(i * __PART_SIZE, length) - except OneDriveError as exc: - if exc.status_code in (408, 500, 502, 503, 504) and tries < 5: - await asyncio.sleep(0.1) - continue - elif exc.status_code == 416: - # Fragment already received - break - elif exc.status_code == 401: - self._client.auth_provider.refresh_token() - continue - else: - raise exc - except ValueError: - # Swallow value errors (usually JSON error) and try again. - continue - break # while True - if upload_status: - cor = upload_status(file_size, file_size) # job completed - if inspect.isawaitable(cor): - await cor - # return last response - return resp - - -# Overwrite the standard upload operation to use this one -ItemRequestBuilder.upload_async = fragment_upload_async