From 13f5f91fd848d9bd0f62a9681416943123669e8c Mon Sep 17 00:00:00 2001 From: KurimuzonAkuma Date: Tue, 17 Dec 2024 14:55:56 +0300 Subject: [PATCH] Add QR Login support Thanks to Lonami --- pyrogram/client.py | 33 ++++++++++ pyrogram/methods/utilities/start.py | 28 ++++++++- pyrogram/qrlogin.py | 97 +++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 2 deletions(-) create mode 100644 pyrogram/qrlogin.py diff --git a/pyrogram/client.py b/pyrogram/client.py index beab6ae212..8db15d0e9b 100644 --- a/pyrogram/client.py +++ b/pyrogram/client.py @@ -54,6 +54,7 @@ from pyrogram.storage import Storage, FileStorage, MemoryStorage from pyrogram.types import User, TermsOfService from pyrogram.utils import ainput +from pyrogram.qrlogin import QRLogin from .connection import Connection from .connection.transport import TCP, TCPAbridged from .dispatcher import Dispatcher @@ -507,6 +508,38 @@ async def authorize(self) -> User: return signed_up + async def authorize_qr(self, except_ids: List[int] = []) -> User: + from qrcode import QRCode + qr_login = QRLogin(self, except_ids) + + while True: + try: + log.info("Waiting for QR code being scanned.") + + signed_in = await qr_login.wait() + + if signed_in: + return signed_in + except asyncio.TimeoutError: + log.info("Recreating QR code.") + await qr_login.recreate() + print("\x1b[2J") + print(f"Welcome to Pyrogram (version {__version__})") + print(f"Pyrogram is free software and comes with ABSOLUTELY NO WARRANTY. Licensed\n" + f"under the terms of the {__license__}.\n\n") + print("Scan the QR code below to login") + print("Settings -> Privacy and Security -> Active Sessions -> Scan QR Code.\n") + + qrcode = QRCode(version=1) + qrcode.add_data(qr_login.url) + qrcode.print_ascii(invert=True) + except SessionPasswordNeeded: + print(f"Password hint: {await self.get_password_hint()}") + await self.check_password( + await ainput("Enter 2FA password: ", hide=self.hide_password) + ) + continue + def set_parse_mode(self, parse_mode: Optional["enums.ParseMode"]): """Set the parse mode to be used globally by the client. diff --git a/pyrogram/methods/utilities/start.py b/pyrogram/methods/utilities/start.py index 922ea72bc2..045891f960 100644 --- a/pyrogram/methods/utilities/start.py +++ b/pyrogram/methods/utilities/start.py @@ -17,6 +17,7 @@ # along with Pyrogram. If not, see . import logging +from typing import List import pyrogram from pyrogram import raw @@ -26,13 +27,28 @@ class Start: async def start( - self: "pyrogram.Client" + self: "pyrogram.Client", + use_qr: bool = False, + except_ids: List[int] = [], ): """Start the client. This method connects the client to Telegram and, in case of new sessions, automatically manages the authorization process using an interactive prompt. + .. note:: + + You should install ``qrcode`` package if you want to use QR code authorization. + + Parameters: + use_qr (``bool``, *optional*): + Use QR code authorization instead of the interactive prompt. + For new authorizations only. + Defaults to False. + + except_ids (List of ``int``, *optional*): + List of already logged-in user IDs, to prevent logging in twice with the same user. + Returns: :obj:`~pyrogram.Client`: The started client itself. @@ -59,7 +75,15 @@ async def main(): try: if not is_authorized: - await self.authorize() + if use_qr: + try: + import qrcode + await self.authorize_qr(except_ids=except_ids) + except ImportError: + log.warning("qrcode package not found, falling back to authorization prompt") + await self.authorize() + else: + await self.authorize() if self.takeout and not await self.storage.is_bot(): self.takeout_id = (await self.invoke(raw.functions.account.InitTakeoutSession())).id diff --git a/pyrogram/qrlogin.py b/pyrogram/qrlogin.py new file mode 100644 index 0000000000..52565f38f6 --- /dev/null +++ b/pyrogram/qrlogin.py @@ -0,0 +1,97 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017-present Dan +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +import asyncio +import base64 +import datetime +import logging +from typing import List, Optional + +import pyrogram +from pyrogram import filters, handlers, raw, types +from pyrogram.methods.messages.inline_session import get_session + +log = logging.getLogger(__name__) + +class QRLogin: + def __init__(self, client, except_ids: List[int] = []): + self.client = client + self.request = raw.functions.auth.ExportLoginToken( + api_id=client.api_id, + api_hash=client.api_hash, + except_ids=except_ids + ) + self.r = None + + async def recreate(self): + self.r = await self.client.invoke(self.request) + + return self.r + + async def wait(self, timeout: float = None) -> Optional["types.User"]: + if timeout is None: + if not self.r: + raise asyncio.TimeoutError + + timeout = self.r.expires - int(datetime.datetime.now().timestamp()) + + event = asyncio.Event() + + async def raw_handler(client, update, users, chats): + event.set() + + await self.client.dispatcher.start() + + handler = self.client.add_handler( + handlers.RawUpdateHandler( + raw_handler, + filters=filters.create( + lambda _, __, u: isinstance(u, raw.types.UpdateLoginToken) + ) + ) + ) + + try: + await asyncio.wait_for(event.wait(), timeout=timeout) + finally: + self.client.remove_handler(*handler) + await self.client.dispatcher.stop() + + await self.recreate() + + if isinstance(self.r, raw.types.auth.LoginTokenMigrateTo): + session = await get_session(self.client, self.r.dc_id) + self.r = await session.invoke( + raw.functions.auth.ImportLoginToken( + token=self.token + ) + ) + + if isinstance(self.r, raw.types.auth.LoginTokenSuccess): + user = types.User._parse(self.client, self.r.authorization.user) + + await self.client.storage.user_id(user.id) + await self.client.storage.is_bot(False) + + return user + + raise TypeError('Unexpected login token response: {}'.format(self.r)) + + @property + def url(self) -> str: + return f"tg://login?token={base64.urlsafe_b64encode(self.r.token).decode('utf-8')}"