diff --git a/changelog b/changelog index 0cacdb6..665ed69 100644 --- a/changelog +++ b/changelog @@ -1,3 +1,9 @@ +## [2.9.13] - 2023-03-20 + +## Fixed + +- Remove Werkzeug dependent libraries + ## [2.9.12] - 2022-11-24 ## Fixed diff --git a/docs/.buildinfo b/docs/.buildinfo index e70e306..96dd94d 100644 --- a/docs/.buildinfo +++ b/docs/.buildinfo @@ -1,4 +1,4 @@ # Sphinx build info version 1 # This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. -config: 047595274f1acf162baa7e5230ad15ab +config: 8dcbaee98e8c32193226f31d376b58f4 tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/docs/_modules/index.html b/docs/_modules/index.html deleted file mode 100644 index 7f26ef1..0000000 --- a/docs/_modules/index.html +++ /dev/null @@ -1,112 +0,0 @@ - - -
- - -
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import six
-
-import leancloud
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-PUBLIC_KEY = "*"
-
-
-[文档]class ACL(object):
- def __init__(self, permissions_by_id=None):
- self.permissions_by_id = permissions_by_id or {}
-
-
-
- def _set_access(self, access_type, user_id, allowed):
- if isinstance(user_id, leancloud.User):
- user_id = user_id.id
- elif isinstance(user_id, leancloud.Role):
- user_id = "role:" + user_id.get_name()
- permissions = self.permissions_by_id.get(user_id)
- if permissions is None:
- if not allowed:
- return
- permissions = {}
- self.permissions_by_id[user_id] = permissions
-
- if allowed:
- self.permissions_by_id[user_id][access_type] = True
- elif access_type in self.permissions_by_id[user_id]:
- del self.permissions_by_id[user_id][access_type]
- if not self.permissions_by_id[user_id]:
- del self.permissions_by_id[user_id]
-
- def _get_access(self, access_type, user_id):
- if isinstance(user_id, leancloud.User):
- user_id = user_id.id
- elif isinstance(user_id, leancloud.Role):
- user_id = "role:" + user_id.get_name()
- permissions = self.permissions_by_id.get(user_id)
- if not permissions:
- return False
- return permissions.get(access_type, False)
-
-[文档] def set_read_access(self, user_id, allowed):
- return self._set_access("read", user_id, allowed)
-
-
-
-[文档] def set_write_access(self, user_id, allowed):
- return self._set_access("write", user_id, allowed)
-
-
-
-
-
-
-
-[文档] def set_public_write_access(self, allowed):
- return self.set_write_access(PUBLIC_KEY, allowed)
-
-
-
-[文档] def set_role_read_access(self, role, allowed):
- if isinstance(role, leancloud.Role):
- role = role.get_name()
- if not isinstance(role, six.string_types):
- raise TypeError("role must be a leancloud.Role or str")
- self.set_read_access("role:{0}".format(role), allowed)
-
-[文档] def get_role_read_access(self, role):
- if isinstance(role, leancloud.Role):
- role = role.get_name()
- if not isinstance(role, six.string_types):
- raise TypeError("role must be a leancloud.Role or str")
- return self.get_read_access("role:{0}".format(role))
-
-[文档] def set_role_write_access(self, role, allowed):
- if isinstance(role, leancloud.Role):
- role = role.get_name()
- if not isinstance(role, six.string_types):
- raise TypeError("role must be a leancloud.Role or str")
- self.set_write_access("role:{0}".format(role), allowed)
-
-[文档] def get_role_write_access(self, role):
- if isinstance(role, leancloud.Role):
- role = role.get_name()
- if not isinstance(role, six.string_types):
- raise TypeError("role must be a leancloud.Role or str")
- return self.get_write_access("role:{0}".format(role))
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import os
-import json
-import time
-import hashlib
-import functools
-
-import six
-import requests
-
-import leancloud
-from leancloud import utils
-from leancloud.app_router import AppRouter
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-APP_ID = None
-APP_KEY = None
-MASTER_KEY = None
-HOOK_KEY = None
-if os.getenv("LEANCLOUD_APP_ENV") == "production":
- USE_PRODUCTION = "1"
-elif os.getenv("LEANCLOUD_APP_ENV") == "stage":
- USE_PRODUCTION = "0"
-else: # probably on local machine
- if os.getenv("LEAN_CLI_HAVE_STAGING") == "true":
- USE_PRODUCTION = "0"
- else: # free trial instance only
- USE_PRODUCTION = "1"
-
-USE_HTTPS = True
-# 兼容老版本,如果 USE_MASTER_KEY 为 None ,并且 MASTER_KEY 不为 None,则使用 MASTER_KEY
-# 否则依据 USE_MASTER_KEY 来决定是否使用 MASTER_KEY
-USE_MASTER_KEY = None
-REGION = "CN"
-
-app_router = None
-session = requests.Session()
-request_hooks = {}
-
-SERVER_VERSION = "1.1"
-
-TIMEOUT_SECONDS = 15
-
-
-[文档]def init(app_id, app_key=None, master_key=None, hook_key=None):
- """初始化 LeanCloud 的 AppId / AppKey / MasterKey
-
- :type app_id: string_types
- :param app_id: 应用的 Application ID
- :type app_key: None or string_types
- :param app_key: 应用的 Application Key
- :type master_key: None or string_types
- :param master_key: 应用的 Master Key
- :param hook_key: application's hook key
- :type hook_key: None or string_type
- """
- if (not app_key) and (not master_key):
- raise RuntimeError("app_key or master_key must be specified")
- global APP_ID, APP_KEY, MASTER_KEY, HOOK_KEY
- APP_ID = app_id
- APP_KEY = app_key
- MASTER_KEY = master_key
- if hook_key:
- HOOK_KEY = hook_key
- else:
- HOOK_KEY = os.environ.get("LEANCLOUD_APP_HOOK_KEY")
-
-
-def need_init(func):
- @functools.wraps(func)
- def new_func(*args, **kwargs):
- if APP_ID is None:
- raise RuntimeError("LeanCloud SDK must be initialized")
-
- headers = {
- "Content-Type": "application/json;charset=utf-8",
- "X-LC-Id": APP_ID,
- "X-LC-Hook-Key": HOOK_KEY,
- "X-LC-Prod": USE_PRODUCTION,
- "User-Agent": "AVOS Cloud python-{0} SDK ({1}.{2})".format(
- leancloud.__version__,
- leancloud.version_info.major,
- leancloud.version_info.minor,
- ),
- }
- md5sum = hashlib.md5()
- current_time = six.text_type(int(time.time() * 1000))
- if (USE_MASTER_KEY is None and MASTER_KEY) or USE_MASTER_KEY is True:
- md5sum.update((current_time + MASTER_KEY).encode("utf-8"))
- headers["X-LC-Sign"] = md5sum.hexdigest() + "," + current_time + ",master"
- else:
- # In python 2.x, you can feed this object with arbitrary
- # strings using the update() method, but in python 3.x,
- # you should feed with bytes-like objects.
- md5sum.update((current_time + APP_KEY).encode("utf-8"))
- headers["X-LC-Sign"] = md5sum.hexdigest() + "," + current_time
-
- user = leancloud.User.get_current()
- if user:
- headers["X-LC-Session"] = user._session_token
-
- return func(headers=headers, *args, **kwargs)
-
- return new_func
-
-
-def get_url(part):
- # try to use the base URL from environ
- url = os.environ.get("LC_API_SERVER") or os.environ.get("LEANCLOUD_API_SERVER")
- if url:
- return "{}/{}{}".format(url, SERVER_VERSION, part)
-
- global app_router
- if app_router is None:
- app_router = AppRouter(APP_ID, REGION)
-
- if part.startswith("/push") or part.startswith("/installations"):
- host = app_router.get("push")
- elif part.startswith("/collect"):
- host = app_router.get("stats")
- elif part.startswith("/functions") or part.startswith("/call"):
- host = app_router.get("engine")
- else:
- host = app_router.get("api")
- r = {
- "schema": "https" if USE_HTTPS else "http",
- "version": SERVER_VERSION,
- "host": host,
- "part": part,
- }
- return "{schema}://{host}/{version}{part}".format(**r)
-
-
-[文档]def use_production(flag):
- """调用生产环境 / 开发环境的 cloud func / cloud hook
- 默认调用生产环境。
- """
- global USE_PRODUCTION
- USE_PRODUCTION = "1" if flag else "0"
-
-
-[文档]def use_master_key(flag=True):
- """是否使用 master key 发送请求。
- 如果不调用此函数,会根据 leancloud.init 的参数来决定是否使用 master key。
-
- :type flag: bool
- """
- global USE_MASTER_KEY
- if not flag:
- USE_MASTER_KEY = False
- return
- if not MASTER_KEY:
- raise RuntimeError("LeanCloud SDK master key not specified")
- USE_MASTER_KEY = True
-
-
-def check_error(func):
- @functools.wraps(func)
- def new_func(*args, **kwargs):
- response = func(*args, **kwargs)
- assert isinstance(response, requests.Response)
- if response.headers.get("Content-Type") == "text/html":
- raise leancloud.LeanCloudError(-1, "Bad Request")
-
- content = response.json()
-
- if "error" in content:
- raise leancloud.LeanCloudError(
- content.get("code", 1), content.get("error", "Unknown Error")
- )
-
- return response
-
- return new_func
-
-
-[文档]def use_region(region):
- if region not in ("CN", "US"):
- raise ValueError("currently no nodes in the region")
-
- global REGION
- REGION = region
-
-
-def get_server_time():
- response = check_error(session.get)(get_url("/date"), timeout=TIMEOUT_SECONDS)
- return utils.decode("iso", response.json())
-
-
-def get_app_info():
- return {
- "app_id": APP_ID,
- "app_key": APP_KEY,
- "master_key": MASTER_KEY,
- "hook_key": HOOK_KEY,
- }
-
-
-@need_init
-@check_error
-def get(url, params=None, headers=None):
- if not params:
- params = {}
- else:
- for k, v in six.iteritems(params):
- if isinstance(v, dict):
- params[k] = json.dumps(v, separators=(",", ":"))
- response = session.get(
- get_url(url),
- headers=headers,
- params=params,
- timeout=TIMEOUT_SECONDS,
- hooks=request_hooks,
- )
- return response
-
-
-@need_init
-@check_error
-def post(url, params, headers=None):
- response = session.post(
- get_url(url),
- headers=headers,
- data=json.dumps(params, separators=(",", ":")),
- timeout=TIMEOUT_SECONDS,
- hooks=request_hooks,
- )
- return response
-
-
-@need_init
-@check_error
-def put(url, params, headers=None):
- response = session.put(
- get_url(url),
- headers=headers,
- data=json.dumps(params, separators=(",", ":")),
- timeout=TIMEOUT_SECONDS,
- hooks=request_hooks,
- )
- return response
-
-
-@need_init
-@check_error
-def delete(url, params=None, headers=None):
- response = session.delete(
- get_url(url),
- headers=headers,
- data=json.dumps(params, separators=(",", ":")),
- timeout=TIMEOUT_SECONDS,
- hooks=request_hooks,
- )
- return response
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import six
-
-import leancloud
-from leancloud import utils
-from leancloud.engine import leanengine
-
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]def run(_cloud_func_name, **params):
- """
- 调用 LeanEngine 上的远程代码
- :param name: 需要调用的远程 Cloud Code 的名称
- :type name: string_types
- :param params: 调用参数
- :return: 调用结果
- """
- response = leancloud.client.post(
- "/functions/{0}".format(_cloud_func_name), params=params
- )
- content = response.json()
- return utils.decode(None, content)["result"]
-
-
-def _run_in_local(_cloud_func_name, **params):
- if not leanengine.root_engine:
- return
- result = leanengine.dispatch_cloud_func(
- leanengine.root_engine.app.cloud_codes, {}, _cloud_func_name, False, params
- )
- return utils.decode(None, result)
-
-
-run.remote = run
-run.local = _run_in_local
-
-
-[文档]def rpc(_cloud_rpc_name, **params):
- """
- 调用 LeanEngine 上的远程代码
- 与 cloud.run 类似,但是允许传入 leancloud.Object 作为参数,也允许传入 leancloud.Object 作为结果
- :param name: 需要调用的远程 Cloud Code 的名称
- :type name: basestring
- :param params: 调用参数
- :return: 调用结果
- """
- encoded_params = {}
- for key, value in params.items():
- if isinstance(params, leancloud.Object):
- encoded_params[key] = utils.encode(value._dump())
- else:
- encoded_params[key] = utils.encode(value)
- response = leancloud.client.post(
- "/call/{}".format(_cloud_rpc_name), params=encoded_params
- )
- content = response.json()
- return utils.decode(None, content["result"])
-
-
-def _rpc_in_local(_cloud_rpc_name, **params):
- if not leanengine.root_engine:
- return
- result = leanengine.dispatch_cloud_func(
- leanengine.root_engine.app.cloud_codes, {}, _cloud_rpc_name, True, params
- )
- return utils.decode(None, result)
-
-
-rpc.remote = rpc
-rpc.local = _rpc_in_local
-
-
-[文档]def request_sms_code(
- phone_number,
- idd="+86",
- sms_type="sms",
- validate_token=None,
- template=None,
- sign=None,
- params=None,
-):
- """
- 请求发送手机验证码
- :param phone_number: 需要验证的手机号码
- :param idd: 号码的所在地国家代码,默认为中国(+86)
- :param sms_type: 验证码发送方式,'voice' 为语音,'sms' 为短信
- :param template: 模版名称
- :param sign: 短信签名名称
- :return: None
- """
- if not isinstance(phone_number, six.string_types):
- raise TypeError("phone_number must be a string")
-
- data = {
- "mobilePhoneNumber": phone_number
- if phone_number.startswith("+")
- else idd + phone_number,
- "smsType": sms_type,
- }
-
- if template is not None:
- data["template"] = template
-
- if sign is not None:
- data["sign"] = sign
-
- if validate_token is not None:
- data["validate_token"] = validate_token
-
- if params is not None:
- data.update(params)
-
- leancloud.client.post("/requestSmsCode", params=data)
-
-
-[文档]def verify_sms_code(phone_number, code):
- """
- 获取到手机验证码之后,验证验证码是否正确。如果验证失败,抛出异常。
- :param phone_number: 需要验证的手机号码
- :param code: 接受到的验证码
- :return: None
- """
- params = {
- "mobilePhoneNumber": phone_number,
- }
- leancloud.client.post("/verifySmsCode/{0}".format(code), params=params)
- return True
-
-
-[文档]class Captcha(object):
- """
- 表示图形验证码
- """
-
- def __init__(self, token, url):
- self.token = token
- self.url = url
-
-[文档] def verify(self, code):
- """
- 验证用户输入与图形验证码是否匹配
- :params code: 用户填写的验证码
- """
- return verify_captcha(code, self.token)
-
-
-[文档]def request_captcha(size=None, width=None, height=None, ttl=None):
- """
- 请求生成新的图形验证码
- :return: Captcha
- """
- params = {
- "size": size,
- "width": width,
- "height": height,
- "ttl": ttl,
- }
- params = {k: v for k, v in params.items() if v is not None}
-
- response = leancloud.client.get("/requestCaptcha", params)
- content = response.json()
- return Captcha(content["captcha_token"], content["captcha_url"])
-
-
-[文档]def verify_captcha(code, token):
- """
- 验证用户输入与图形验证码是否匹配
- :params code: 用户填写的验证码
- :params token: 图形验证码对应的 token
- :return: validate token
- """
- params = {
- "captcha_token": token,
- "captcha_code": code,
- }
- response = leancloud.client.post("/verifyCaptcha", params)
- return response.json()["validate_token"]
-
-
-
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import os
-import sys
-import json
-import warnings
-
-from werkzeug.wrappers import Request
-from werkzeug.wrappers import Response
-from werkzeug.serving import run_simple
-
-import leancloud
-from . import utils
-from . import leanengine
-from .authorization import AuthorizationMiddleware
-from .cookie_session import CookieSessionMiddleware # noqa: F401
-from .cors import CORSMiddleware
-from .https_redirect_middleware import HttpsRedirectMiddleware # noqa: F401
-from .leanengine import LeanEngineApplication
-from .leanengine import LeanEngineError
-from .leanengine import after_delete
-from .leanengine import after_save
-from .leanengine import after_update
-from .leanengine import before_delete
-from .leanengine import before_save
-from .leanengine import before_update
-from .leanengine import context
-from .leanengine import current
-from .leanengine import register_cloud_func
-from .leanengine import register_on_bigquery
-from .leanengine import register_on_login
-from .leanengine import register_on_auth_data
-from .leanengine import register_on_verified
-from .leanengine import user
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]class Engine(object):
- """
- LeanEngine middleware.
- """
-
- def __init__(self, wsgi_app=None, fetch_user=True):
- """
- LeanEngine middleware constructor.
-
- :param wsgi_app: wsgi callable
- :param fetch_user:
- should fetch user's data from server while prNoneocessing session token.
- :type fetch_user: bool
- """
- self.current = current
- if wsgi_app:
- leanengine.root_engine = self
- self.origin_app = wsgi_app
- self.app = LeanEngineApplication(fetch_user=fetch_user)
- self.cloud_app = context.local_manager.make_middleware(
- CORSMiddleware(AuthorizationMiddleware(self.app))
- )
-
- def __call__(self, environ, start_response):
- request = Request(environ)
- environ[
- "leanengine.request"
- ] = request # cache werkzeug request for other middlewares
-
- if request.path in ("/__engine/1/ping", "/__engine/1.1/ping/"):
- start_response(
- utils.to_native("200 OK"),
- [
- (
- utils.to_native("Content-Type"),
- utils.to_native("application/json"),
- )
- ],
- )
- version = sys.version_info
- return Response(
- json.dumps(
- {
- "version": leancloud.__version__,
- "runtime": "cpython-{0}.{1}.{2}".format(
- version.major, version.minor, version.micro
- ),
- }
- )
- )(environ, start_response)
- if request.path.startswith("/__engine/"):
- return self.cloud_app(environ, start_response)
- if request.path.startswith("/1/functions") or request.path.startswith(
- "/1.1/functions"
- ):
- return self.cloud_app(environ, start_response)
- if request.path.startswith("/1/call") or request.path.startswith("/1.1/call"):
- return self.cloud_app(environ, start_response)
- return self.origin_app(environ, start_response)
-
-[文档] def wrap(self, wsgi_app):
- if leanengine.root_engine:
- raise RuntimeError("It's forbidden that overwriting wsgi_func.")
- leanengine.root_engine = self
- self.origin_app = wsgi_app
- return self
-
-[文档] def register(self, engine):
- if not isinstance(engine, Engine):
- raise TypeError("Please specify an Engine instance")
- self.app.update_cloud_codes(engine.app.cloud_codes)
-
-[文档] def define(self, *args, **kwargs):
- return register_cloud_func(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def on_verified(self, *args, **kwargs):
- return register_on_verified(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def on_login(self, *args, **kwargs):
- return register_on_login(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def on_auth_data(self, *args, **kwargs):
- return register_on_auth_data(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def on_bigquery(self, *args, **kwargs):
- warnings.warn(
- "on_bigquery is deprecated, please use on_insight instead",
- leancloud.LeanCloudWarning,
- )
- return register_on_bigquery(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def before_save(self, *args, **kwargs):
- return before_save(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def after_save(self, *args, **kwargs):
- return after_save(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def before_update(self, *args, **kwargs):
- return before_update(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def after_update(self, *args, **kwargs):
- return after_update(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def before_delete(self, *args, **kwargs):
- return before_delete(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def after_delete(self, *args, **kwargs):
- return after_delete(self.app.cloud_codes, *args, **kwargs)
-
-[文档] def on_insight(self, *args, **kwargs):
- return register_on_bigquery(self.app.cloud_codes, *args, **kwargs)
-
-
-
-[文档] def start(self):
- from gevent.pywsgi import WSGIServer
-
- if not hasattr(leancloud, "APP_ID"):
- APP_ID = os.environ["LEANCLOUD_APP_ID"]
- APP_KEY = os.environ["LEANCLOUD_APP_KEY"]
- MASTER_KEY = os.environ["LEANCLOUD_APP_MASTER_KEY"]
- HOOK_KEY = os.environ["LEANCLOUD_APP_HOOK_KEY"]
- PORT = int(os.environ.get("LEANCLOUD_APP_PORT"))
- leancloud.init(
- APP_ID, app_key=APP_KEY, master_key=MASTER_KEY, hook_key=HOOK_KEY
- )
-
- def application(environ, start_response):
- start_response(
- "200 OK".encode("utf-8"),
- [("Content-Type".encode("utf-8"), "text/plain".encode("utf-8"))],
- )
- return "This is a LeanEngine application."
-
- class NopLogger(object):
- def write(self, s):
- pass
-
- app = self.wrap(application)
- self.server = WSGIServer(("", PORT), app, log=NopLogger())
- print("LeanEngine Cloud Functions app is running, port:", PORT)
- self.server.serve_forever()
-
-
-
-
-__all__ = ["user", "Engine", "LeanEngineError"]
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-from werkzeug import http
-from werkzeug.wrappers import Request
-from secure_cookie.cookie import SecureCookie
-
-from . import utils
-from leancloud.user import User
-
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]class CookieSessionMiddleware(object):
- """
- 用来在 webhosting 功能中实现自动管理 LeanCloud 用户登录状态的 WSGI 中间件。
- 使用此中间件之后,在处理 web 请求中调用了 `leancloud.User.login()` 方法登录成功后,
- 会将此用户 session token 写入到 cookie 中。
- 后续此次会话都可以通过 `leancloud.User.get_current()` 获取到此用户对象。
-
- :param secret: 对保存在 cookie 中的用户 session token 进行签名时需要的 key,可使用任意方法随机生成,请不要泄漏
- :type secret: str
- :param name: 在 cookie 中保存的 session token 的 key 的名称,默认为 "leancloud:session"
- :type name: str
- :param excluded_paths:
- 指定哪些 URL path 不处理 session token,比如在处理静态文件的 URL path 上不进行处理,防止无谓的性能浪费
- :type excluded_paths: list
- :param fetch_user: 处理请求时是否要从存储服务获取用户数据,
- 如果为 false 的话,
- leancloud.User.get_current() 获取到的用户数据上除了 session_token 之外没有任何其他数据,
- 需要自己调用 fetch() 来获取。
- 为 true 的话,会自动在用户对象上调用 fetch(),这样将会产生一次数据存储的 API 调用。
- 默认为 false
- :type fetch_user: bool
- :param expires: 设置 cookie 的 expires
- :type expires: int or datetime
- :param max_age: 设置 cookie 的 max_age,单位为秒
- :type max_age: int
- """
-
- def __init__(
- self,
- app,
- secret,
- name="leancloud:session",
- excluded_paths=None,
- fetch_user=False,
- expires=None,
- max_age=None,
- ):
- if not secret:
- raise RuntimeError("secret is required")
- self.fetch_user = fetch_user
- self.secret = secret
- self.app = app
- self.name = name
- self.excluded_paths = [
- "/__engine/",
- "/1/functions/",
- "/1.1/functions/",
- "/1/call/",
- "/1.1/call/",
- ]
- self.expires = expires
- self.max_age = max_age
- if excluded_paths:
- self.excluded_paths += excluded_paths
-
- def __call__(self, environ, start_response):
- self.pre_process(environ)
-
- def new_start_response(status, response_headers, exc_info=None):
- self.post_process(environ, response_headers)
- return start_response(status, response_headers, exc_info)
-
- return self.app(environ, new_start_response)
-
-[文档] def pre_process(self, environ):
- request = Request(environ)
- for prefix in self.excluded_paths:
- if request.path.startswith(prefix):
- return
-
- cookie = request.cookies.get(self.name)
- if not cookie:
- return
-
- session = SecureCookie.unserialize(cookie, self.secret)
-
- if "session_token" not in session:
- return
-
- if not self.fetch_user:
- user = User()
- user._session_token = session["session_token"]
- user.id = session["uid"]
- User.set_current(user)
- else:
- user = User.become(session["session_token"])
- User.set_current(user)
-
-[文档] def post_process(self, environ, headers):
- user = User.get_current()
- if not user:
- cookies = http.parse_cookie(environ)
- if self.name in cookies:
- raw = http.dump_cookie(self.name, "", expires=1)
- headers.append((utils.to_native("Set-Cookie"), raw))
- return
- cookie = SecureCookie(
- {"uid": user.id, "session_token": user.get_session_token()}, self.secret
- )
- raw = http.dump_cookie(
- self.name, cookie.serialize(), expires=self.expires, max_age=self.max_age
- )
- headers.append((utils.to_native("Set-Cookie"), raw))
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import os
-
-
-from werkzeug.wrappers import Request
-from werkzeug.utils import redirect
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-is_prod = True if os.environ.get("LEANCLOUD_APP_ENV") == "production" else False
-
-
-[文档]class HttpsRedirectMiddleware(object):
- def __init__(self, wsgi_app):
- self.origin_app = wsgi_app
-
- def __call__(self, environ, start_response):
- request = Request(environ)
- engine_health = "/1.1/functions/_ops/metadatas"
- if (
- is_prod
- and request.path != engine_health
- and request.headers.get("X-Forwarded-Proto") != "https"
- ):
- url = "https://{0}{1}".format(request.host, request.full_path)
- return redirect(url)(environ, start_response)
-
- return self.origin_app(environ, start_response)
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import six
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]@six.python_2_unicode_compatible
-class LeanCloudError(Exception):
- def __init__(self, code, error):
- self.code = code
- self.error = error
-
- def __str__(self):
- error = (
- self.error
- if isinstance(self.error, six.text_type)
- else self.error.encode("utf-8", "ignore")
- )
- return "LeanCloudError: [{0}] {1}".format(self.code, error)
-
-
-
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import os
-import re
-import io
-import hashlib
-import logging
-import threading
-
-import six
-import requests
-
-import leancloud
-from leancloud import client
-from leancloud import utils
-from leancloud.errors import LeanCloudError
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-logger = logging.getLogger(__name__)
-
-
-DEFAULT_TIMEOUT = 30
-
-
-[文档]class File(object):
- _class_name = "_File" # walks like a leancloud.Object
-
- def __init__(self, name="", data=None, mime_type=None):
- self._name = name
- self.key = None
- self.id = None
- self.created_at = None
- self.updated_at = None
- self._url = None
- self._successful_url = None
- self._acl = None
- self.current_user = leancloud.User.get_current()
- self.timeout = 30
- self._metadata = {"owner": "unknown"}
- if (
- self.current_user and self.current_user != None
- ): # NOQA: self.current_user may be a thread_local object
- self._metadata["owner"] = self.current_user.id
-
- pattern = re.compile(r"\.([^.]*)$")
- extension = pattern.findall(name)
- if extension:
- self.extension = extension[0].lower()
- else:
- self.extension = ""
-
- self._mime_type = mime_type
-
- if data is None:
- self._source = None
- return
-
- try:
- data.read
- data.tell
- data.seek(0, os.SEEK_END)
- data.seek(0, os.SEEK_SET)
- except Exception:
- if (six.PY3 and isinstance(data, (memoryview, bytes))) or (
- six.PY2 and isinstance(data, (buffer, memoryview, str)) # noqa: F821
- ):
- data = io.BytesIO(data)
- elif data.read:
- data = io.BytesIO(data.read())
- else:
- raise TypeError(
- "Do not know how to handle data, accepts file like object or bytes"
- )
-
- data.seek(0, os.SEEK_SET)
- checksum = hashlib.md5()
- while True:
- chunk = data.read(4096)
- if not chunk:
- break
-
- try:
- checksum.update(chunk)
- except TypeError:
- checksum.update(chunk.encode("utf-8"))
-
- self._metadata["_checksum"] = checksum.hexdigest()
- self._metadata["size"] = data.tell()
-
- # 3.5MB, 1Mbps * 30s
- # increase timeout
- if self._metadata["size"] > 3750000:
- self.timeout = self.timeout * int(self._metadata["size"] / 3750000)
-
- data.seek(0, os.SEEK_SET)
-
- self._source = data
-
- @utils.classproperty
- def query(self):
- return leancloud.Query(self)
-
-[文档] @classmethod
- def create_with_url(cls, name, url, meta_data=None, mime_type=None):
- f = File(name, None, mime_type)
- if meta_data:
- f._metadata.update(meta_data)
-
- if isinstance(url, six.string_types):
- f._url = url
- else:
- raise ValueError("url must be a str / unicode")
-
- f._metadata["__source"] = "external"
- return f
-
-[文档] @classmethod
- def create_without_data(cls, object_id):
- f = File("")
- f.id = object_id
- return f
-
-
-
-[文档] def set_acl(self, acl):
- if not isinstance(acl, leancloud.ACL):
- raise TypeError("acl must be a leancloud.ACL instance")
- self._acl = acl
-
- @property
- def name(self):
- return self._name
-
- @property
- def url(self):
- return self._successful_url
-
- @property
- def mime_type(self):
- return self._mime_type
-
- @mime_type.setter
- def set_mime_type(self, mime_type):
- self._mime_type = mime_type
-
- @property
- def size(self):
- return self._metadata["size"]
-
- @property
- def owner_id(self):
- return self._metadata["owner"]
-
- @property
- def metadata(self):
- return self._metadata
-
-[文档] def get_thumbnail_url(
- self, width, height, quality=100, scale_to_fit=True, fmt="png"
- ):
- if not self.url:
- raise ValueError("invalid url")
-
- if width < 0 or height < 0:
- raise ValueError("invalid height or width params")
-
- if quality > 100 or quality <= 0:
- raise ValueError("quality must between 0 and 100")
-
- mode = 2 if scale_to_fit else 1
-
- return self.url + "?imageView/{0}/w/{1}/h/{2}/q/{3}/format/{4}".format(
- mode, width, height, quality, fmt
- )
-
-[文档] def destroy(self):
- if not self.id:
- return False
- response = client.delete("/files/{0}".format(self.id))
- if response.status_code != 200:
- raise LeanCloudError(1, "the file is not sucessfully destroyed")
-
-
- def _save_to_qiniu(self, token, key):
- self._source.seek(0)
-
- import qiniu
-
- qiniu.set_default(connection_timeout=self.timeout)
- ret, info = qiniu.put_data(token, key, self._source)
- self._source.seek(0)
-
- if info.status_code != 200:
- self._save_callback(token, False)
- raise LeanCloudError(
- 1,
- "the file is not saved, qiniu status code: {0}".format(
- info.status_code
- ),
- )
- self._save_callback(token, True)
-
- def _save_to_s3(self, token, upload_url):
- self._source.seek(0)
- response = requests.put(
- upload_url, data=self._source, headers={"Content-Type": self.mime_type}
- )
- if response.status_code != 200:
- self._save_callback(token, False)
- raise LeanCloudError(1, "The file is not successfully saved to S3")
- self._source.seek(0)
- self._save_callback(token, True)
-
- def _save_external(self):
- data = {
- "name": self._name,
- "ACL": self._acl,
- "metaData": self._metadata,
- "mime_type": self.mime_type,
- "url": self._url,
- }
- response = client.post("/files".format(self._name), data)
- content = response.json()
-
- self.id = content["objectId"]
-
- self._successful_url = self._url
-
- _created_at = utils.decode_date_string(content.get("createdAt"))
- _updated_at = utils.decode_updated_at(content.get("updatedAt"), _created_at)
- if _created_at is not None:
- self.created_at = _created_at
- if _updated_at is not None:
- self.updated_at = _updated_at
-
- def _save_to_qcloud(self, token, upload_url):
- headers = {
- "Authorization": token,
- }
- self._source.seek(0)
- data = {
- "op": "upload",
- "filecontent": self._source.read(),
- }
- response = requests.post(upload_url, headers=headers, files=data)
- self._source.seek(0)
- info = response.json()
- if info["code"] != 0:
- self._save_callback(token, False)
- raise LeanCloudError(
- 1,
- "this file is not saved, qcloud cos status code: {}".format(
- info["code"]
- ),
- )
- self._save_callback(token, True)
-
- def _save_callback(self, token, successed):
- if not token:
- return
-
- def f():
- try:
- client.post("/fileCallback", {"token": token, "result": successed})
- except LeanCloudError as e:
- logger.warning("call file callback failed, error: %s", e)
-
- threading.Thread(target=f).start()
-
-[文档] def save(self):
- if self._url and self.metadata.get("__source") == "external":
- self._save_external()
- elif not self._source:
- pass
- else:
- content = self._get_file_token()
- self._mime_type = content["mime_type"]
- if content["provider"] == "qiniu":
- self._save_to_qiniu(content["token"], content["key"])
- elif content["provider"] == "qcloud":
- self._save_to_qcloud(content["token"], content["upload_url"])
- elif content["provider"] == "s3":
- self._save_to_s3(content.get("token"), content["upload_url"])
- else:
- raise RuntimeError("The provider field in the fetched content is empty")
- self._update_data(content)
-
- def _update_data(self, server_data):
- if "objectId" in server_data:
- self.id = server_data.get("objectId")
- if "name" in server_data:
- self._name = server_data.get("name")
- if "url" in server_data:
- self._url = server_data.get("url")
- self._successful_url = self._url
- if "key" in server_data:
- self.key = server_data.get("key")
- if "mime_type" in server_data:
- self._mime_type = server_data["mime_type"]
- if "metaData" in server_data:
- self._metadata = server_data.get("metaData")
-
- _created_at = utils.decode_date_string(server_data.get("createdAt"))
- _updated_at = utils.decode_updated_at(server_data.get("updatedAt"), _created_at)
- if _created_at is not None:
- self.created_at = _created_at
- if _updated_at is not None:
- self.updated_at = _updated_at
-
- def _get_file_token(self):
- data = {
- "name": self._name,
- "ACL": self._acl,
- "mime_type": self.mime_type,
- "metaData": self._metadata,
- }
- if self.key is not None:
- data["key"] = self.key
- response = client.post("/fileTokens", data)
- content = response.json()
- self.id = content["objectId"]
- self._url = content["url"]
- self.key = content["key"]
- return content
-
-[文档] def fetch(self):
- response = client.get("/files/{0}".format(self.id))
- content = response.json()
- self._update_data(content)
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import math
-
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]class GeoPoint(object):
- def __init__(self, latitude=0, longitude=0):
- """
-
- :param latitude: 纬度
- :type latitude: int or float
- :param longitude: 经度
- :type longitude: int or float
- :return: GeoPoint
- """
- self._validate(latitude, longitude)
- self._latitude = latitude
- self._longitude = longitude
-
- @classmethod
- def _validate(cls, latitude, longitude):
- if latitude < -90.0:
- raise ValueError("GeoPoint latitude {0} < -90.0".format(latitude))
-
- if latitude > 90.0:
- raise ValueError("GeoPoint latitude {0} > 90.0".format(latitude))
-
- if longitude < -180.0:
- raise ValueError("GeoPoint longitude {0} < -180.0".format(longitude))
-
- if longitude > 180.0:
- raise ValueError("GeoPoint longitude {0} > 180.0".format(longitude))
-
- @property
- def latitude(self):
- """
- 当前对象的纬度
- """
- return self._latitude
-
- @latitude.setter
- def latitude(self, latitude):
- self._validate(latitude, self.longitude)
- self._latitude = latitude
-
- @property
- def longitude(self):
- """
- 当前对象的经度
- """
- return self._longitude
-
- @longitude.setter
- def longitude(self, longitude):
- self._validate(self.latitude, longitude)
- self._longitude = longitude
-
-[文档] def dump(self):
- self._validate(self.latitude, self.longitude)
- return {
- "__type": "GeoPoint",
- "latitude": self.latitude,
- "longitude": self.longitude,
- }
-
-[文档] def radians_to(self, other):
- """
- Returns the distance from this GeoPoint to another in radians.
-
- :param other: point the other GeoPoint
- :type other: GeoPoint
- :rtype: float
- """
- d2r = math.pi / 180.0
- lat1rad = self.latitude * d2r
- long1rad = self.longitude * d2r
-
- lat2rad = other.latitude * d2r
- long2rad = other.longitude * d2r
-
- delta_lat = lat1rad - lat2rad
- delta_long = long1rad - long2rad
-
- sin_delta_lat_div2 = math.sin(delta_lat / 2.0)
- sin_delta_long_div2 = math.sin(delta_long / 2.0)
-
- a = (sin_delta_lat_div2 * sin_delta_lat_div2) + (
- math.cos(lat1rad)
- * math.cos(lat2rad)
- * sin_delta_long_div2
- * sin_delta_long_div2
- )
- a = min(1.0, a)
- return 2 * math.asin(math.sqrt(a))
-
-[文档] def kilometers_to(self, other):
- """
- Returns the distance from this GeoPoint to another in kilometers.
-
- :param other: point the other GeoPoint
- :type other: GeoPoint
- :rtype: float
- """
- return self.radians_to(other) * 6371.0
-
-[文档] def miles_to(self, other):
- """
- Returns the distance from this GeoPoint to another in miles.
-
- :param other: point the other GeoPoint
- :type other: GeoPoint
- :rtype: float
- """
- return self.radians_to(other) * 3958.8
-
- def __eq__(self, other):
- return (
- isinstance(other, GeoPoint)
- and self.latitude == other.latitude
- and self.longitude == other.longitude
- )
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import copy
-import json
-
-import six
-from werkzeug.local import LocalProxy
-
-import leancloud
-from leancloud import utils
-from leancloud import client
-from leancloud import operation
-
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-object_class_map = {}
-
-
-class ObjectMeta(type):
- def __new__(mcs, name, bases, attrs):
- cached_class = object_class_map.get(name)
- if cached_class:
- return cached_class
-
- super_new = super(ObjectMeta, mcs).__new__
-
- # let user define their class_name at subclass-creation stage
- class_name = attrs.pop("class_name", None)
-
- if class_name:
- attrs["_class_name"] = class_name
- elif name == "User":
- attrs["_class_name"] = "_User"
- elif name == "Installation":
- attrs["_class_name"] = "_Installation"
- elif name == "Notification":
- attrs["_class_name"] = "_Notification"
- elif name == "Role":
- attrs["_class_name"] = "_Role"
- elif name == "Conversation":
- attrs["_class_name"] = "_Conversation"
- elif name == "SysMessage":
- attrs["_class_name"] = "_SysMessage"
- else:
- attrs["_class_name"] = name
-
- object_class = super_new(mcs, name, bases, attrs)
- object_class_map[name] = object_class
- return object_class
-
- @property
- def query(cls):
- """
- 获取当前对象的 Query 对象。
-
- :rtype: leancloud.Query
- """
- return leancloud.Query(cls)
-
-
-[文档]class Object(six.with_metaclass(ObjectMeta, object)):
- def __init__(self, **attrs):
- """
- 创建一个新的 leancloud.Object
-
- :param attrs: 对象属性
- :return:
- """
- self.id = None
- self._class_name = self._class_name # for IDE
- self._changes = {}
- self._attributes = {}
- self._flags = {}
- self.created_at = None
- self.updated_at = None
-
- for k, v in six.iteritems(attrs):
- self.set(k, v)
-
-[文档] @classmethod
- def extend(cls, name):
- """
- 派生一个新的 leancloud.Object 子类
-
- :param name: 子类名称
- :type name: string_types
- :return: 派生的子类
- :rtype: ObjectMeta
- """
- if six.PY2 and isinstance(name, six.text_type):
- # In python2, class name must be a python2 str.
- name = name.encode("utf-8")
- return type(name, (cls,), {})
-
-[文档] @classmethod
- def create(cls, class_name, **attributes):
- """
- 根据参数创建一个 leancloud.Object 的子类的实例化对象
-
- :param class_name: 子类名称
- :type class_name: string_types
- :param attributes: 对象属性
- :return: 派生子类的实例
- :rtype: Object
- """
- object_class = cls.extend(class_name)
- return object_class(**attributes)
-
-[文档] @classmethod
- def create_without_data(cls, id_):
- """
- 根据 objectId 创建一个 leancloud.Object,代表一个服务器上已经存在的对象。可以调用 fetch 方法来获取服务器上的数据
-
- :param id_: 对象的 objectId
- :type id_: string_types
- :return: 没有数据的对象
- :rtype: Object
- """
- if cls is Object:
- raise RuntimeError("can not call create_without_data on leancloud.Object")
- obj = cls()
- obj.id = id_
- return obj
-
-[文档] @classmethod
- def save_all(cls, objs):
- """
- 在一个请求中 save 多个 leancloud.Object 对象实例。
-
- :param objs: 需要 save 的对象
- :type objs: list
- """
- if not objs:
- return
- return cls()._deep_save(objs, [])
-
-[文档] @classmethod
- def destroy_all(cls, objs):
- """
- 在一个请求中 destroy 多个 leancloud.Object 对象实例。
-
- :param objs: 需要 destroy 的对象
- :type objs: list
- """
- if not objs:
- return
- if any(x.is_new() for x in objs):
- raise ValueError("Could not destroy unsaved object")
-
- dumped_objs = []
- for obj in objs:
- dumped_obj = {
- "method": "DELETE",
- "path": "/{0}/classes/{1}/{2}".format(
- client.SERVER_VERSION, obj._class_name, obj.id
- ),
- "body": obj._flags,
- }
- dumped_objs.append(dumped_obj)
-
- response = client.post("/batch", params={"requests": dumped_objs}).json()
-
- errors = []
- for idx in range(len(objs)):
- content = response[idx]
- error = content.get("error")
- if error:
- errors.append(
- leancloud.LeanCloudError(error.get("code"), error.get("error"))
- )
-
- if errors:
- # TODO: how to raise list of errors?
- # raise MultipleValidationErrors(errors)
- # add test
- raise errors[0]
-
-
-
- def _dump(self):
- obj = copy.deepcopy(self._attributes)
- for k, v in six.iteritems(obj):
- obj[k] = utils.encode(v)
-
- if self.id is not None:
- obj["objectId"] = self.id
-
- obj["__type"] = "Object"
- obj["className"] = self._class_name
- return obj
-
-[文档] def destroy(self):
- """
- 从服务器上删除这个对象
-
- :rtype: None
- """
- if not self.id:
- return
- client.delete("/classes/{0}/{1}".format(self._class_name, self.id), self._flags)
-
-[文档] def save(self, where=None, fetch_when_save=None):
- """
- 将对象数据保存至服务器
-
- :return: None
- :rtype: None
- """
- if where and not isinstance(where, leancloud.Query):
- raise TypeError(
- "where param type should be leancloud.Query, got %s", type(where)
- )
-
- if where and where._query_class._class_name != self._class_name:
- raise TypeError(
- "where param's class name not equal to the current object's class name"
- )
-
- if where and self.is_new():
- raise TypeError("where params works only when leancloud.Object is saved")
-
- unsaved_children = []
- unsaved_files = []
- self._find_unsaved_children(self._attributes, unsaved_children, unsaved_files)
- if unsaved_children or unsaved_files:
- self._deep_save(unsaved_children, unsaved_files, exclude=self._attributes)
-
- data = self._dump_save()
- fetch_when_save = "true" if fetch_when_save else "false"
-
- if self.is_new():
- response = client.post(
- "/classes/{0}?fetchWhenSave={1}".format(
- self._class_name, fetch_when_save
- ),
- data,
- )
- else:
- url = "/classes/{0}/{1}?fetchWhenSave={2}".format(
- self._class_name, self.id, fetch_when_save
- )
- if where:
- url += "&where=" + json.dumps(
- where.dump()["where"], separators=(",", ":")
- )
- response = client.put(url, data)
-
- self._update_data(response.json())
-
- def _deep_save(self, unsaved_children, unsaved_files, exclude=None):
- if exclude:
- unsaved_children = [x for x in unsaved_children if x != exclude]
-
- for f in unsaved_files:
- f.save()
-
- if not unsaved_children:
- return
- dumped_objs = []
- for obj in unsaved_children:
- if obj.id is None:
- method = "POST"
- path = "/{0}/classes/{1}".format(client.SERVER_VERSION, obj._class_name)
- else:
- method = "PUT"
- path = "/{0}/classes/{1}/{2}".format(
- client.SERVER_VERSION, obj._class_name, obj.id
- )
- body = obj._dump_save()
- dumped_obj = {
- "method": method,
- "path": path,
- "body": body,
- }
- dumped_objs.append(dumped_obj)
-
- response = client.post("/batch", params={"requests": dumped_objs}).json()
-
- errors = []
- for idx, obj in enumerate(unsaved_children):
- content = response[idx]
- error = content.get("error")
- if error:
- errors.append(
- leancloud.LeanCloudError(error.get("code"), error.get("error"))
- )
- else:
- obj._update_data(content["success"])
-
- if errors:
- # TODO: how to raise list of errors?
- # raise MultipleValidationErrors(errors)
- # add test
- raise errors[0]
-
- @classmethod
- def _find_unsaved_children(cls, obj, children, files):
- def callback(o):
- if isinstance(o, Object):
- if o.is_dirty():
- children.append(o)
- return
-
- if isinstance(o, leancloud.File):
- if not o.url or not o.id:
- files.append(o)
- return
-
- utils.traverse_object(obj, callback)
-
-[文档] def is_dirty(self, attr=None):
- # consider renaming to is_changed?
- if attr:
- return attr in self._changes
- else:
- return bool(not self.id or self._changes)
-
- def _to_pointer(self):
- return {
- "__type": "Pointer",
- "className": self._class_name,
- "objectId": self.id,
- }
-
- def _merge_metadata(self, server_data):
- object_id = server_data.get("objectId")
- _created_at = utils.decode_date_string(server_data.get("createdAt"))
- _updated_at = utils.decode_updated_at(server_data.get("updatedAt"), _created_at)
-
- if object_id is not None:
- self.id = object_id
- if _created_at is not None:
- self.created_at = _created_at
- if _updated_at is not None:
- self.updated_at = _updated_at
-
-
-
-[文档] def validate(self, attrs):
- if "ACL" in attrs and not isinstance(attrs["ACL"], leancloud.ACL):
- raise TypeError("acl must be a ACL")
- return True
-
-[文档] def get(self, attr, default=None, deafult=None):
- """
- 获取对象字段的值
-
- :param attr: 字段名
- :type attr: string_types
- :return: 字段值
- """
- # for backward compatibility
- if (deafult is not None) and (default is None):
- default = deafult
-
- # createdAt is stored as string in the cloud but used as datetime object on the client side.
- # We need to make sure that `.created_at` and `.get("createdAt")` return the same value.
- # Otherwise users will get confused.
- if attr == "createdAt":
- if self.created_at is None:
- return None
- else:
- return self.created_at
-
- # Similar to createdAt.
- if attr == "updatedAt":
- if self.updated_at is None:
- return None
- else:
- return self.updated_at
-
- return self._attributes.get(attr, default)
-
-[文档] def relation(self, attr):
- """
- 返回对象上相应字段的 Relation
-
- :param attr: 字段名
- :type attr: string_types
- :return: Relation
- :rtype: leancloud.Relation
- """
- value = self.get(attr)
- if value is not None:
- if not isinstance(value, leancloud.Relation):
- raise TypeError("field %s is not Relation".format(attr))
- value._ensure_parent_and_key(self, attr)
- return value
- return leancloud.Relation(self, attr)
-
-[文档] def has(self, attr):
- """
- 判断此字段是否有值
-
- :param attr: 字段名
- :return: 当有值时返回 True, 否则返回 False
- :rtype: bool
- """
- return attr in self._attributes
-
-[文档] def set(self, key_or_attrs, value=None, unset=False):
- """
- 在当前对象此字段上赋值
-
- :param key_or_attrs: 字段名,或者一个包含 字段名 / 值的 dict
- :type key_or_attrs: string_types or dict
- :param value: 字段值
- :param unset:
- :return: 当前对象,供链式调用
- """
- if isinstance(key_or_attrs, dict) and value is None:
- attrs = key_or_attrs
- keys = attrs.keys()
- for k in keys:
- if isinstance(attrs[k], LocalProxy):
- attrs[k] = attrs[k]._get_current_object()
- else:
- key = key_or_attrs
- if isinstance(value, LocalProxy):
- value = value._get_current_object()
- attrs = {key: utils.decode(key, value)}
-
- if unset:
- for k in attrs.keys():
- attrs[k] = operation.Unset()
-
- self.validate(attrs)
-
- self._merge_metadata(attrs)
-
- keys = list(attrs.keys())
- for k in keys:
- v = attrs[k]
- # TODO: Relation
-
- if not isinstance(v, operation.BaseOp):
- v = operation.Set(v)
-
- self._attributes[k] = v._apply(self._attributes.get(k), self, k)
- if self._attributes[k] == operation._UNSET:
- del self._attributes[k]
- self._changes[k] = v._merge(self._changes.get(k))
-
- return self
-
-[文档] def unset(self, attr):
- """
- 在对象上移除此字段。
-
- :param attr: 字段名
- :return: 当前对象
- """
- return self.set(attr, None, unset=True)
-
-[文档] def increment(self, attr, amount=1):
- """
- 在对象此字段上自增对应的数值,如果数值没有指定,默认为一。
-
- :param attr: 字段名
- :param amount: 自增量
- :return: 当前对象
- """
- return self.set(attr, operation.Increment(amount))
-
-[文档] def add(self, attr, item):
- """
- 在对象此字段对应的数组末尾添加指定对象。
-
- :param attr: 字段名
- :param item: 要添加的对象
- :return: 当前对象
- """
- return self.set(attr, operation.Add([item]))
-
-[文档] def add_unique(self, attr, item):
- """
- 在对象此字段对应的数组末尾添加指定对象,如果此对象并没有包含在字段中。
-
- :param attr: 字段名
- :param item: 要添加的对象
- :return: 当前对象
- """
- return self.set(attr, operation.AddUnique([item]))
-
-[文档] def remove(self, attr, item):
- """
- 在对象此字段对应的数组中,将指定对象全部移除。
-
- :param attr: 字段名
- :param item: 要移除的对象
- :return: 当前对象
- """
- return self.set(attr, operation.Remove([item]))
-
-
-
-
-
-
-
-[文档] def clear(self):
- """
- 将当前对象所有字段全部移除。
-
- :return: 当前对象
- """
- self.set(self._attributes, unset=True)
-
- def _dump_save(self):
- data = {k: v.dump() for k, v in six.iteritems(self._changes)}
- data.update(self._flags)
- return data
-
-[文档] def fetch(self, select=None, include=None):
- """
- 从服务器获取当前对象所有的值,如果与本地值不同,将会覆盖本地的值。
-
- :return: 当前对象
- """
- data = {}
- if select:
- if not isinstance(select, (list, tuple)):
- raise TypeError("select parameter must be a list or a tuple")
- data["keys"] = ",".join(select)
- if include:
- if not isinstance(include, (list, tuple)):
- raise TypeError("include parameter must be a list or a tuple")
- data["include"] = ",".join(include)
- response = client.get(
- "/classes/{0}/{1}".format(self._class_name, self.id), data
- )
- self._update_data(response.json())
-
-[文档] def is_new(self):
- """
- 判断当前对象是否已经保存至服务器。
-
- 该方法为 SDK 内部使用(save 调用此方法 dispatch 保存操作为 REST API 的 POST 和 PUT 请求)。
- 查询对象是否在服务器上存在请使用 is_existed 方法。
-
-
- :rtype: bool
- """
- return False if self.id else True
-
-[文档] def is_existed(self):
- """
- 判断当前对象是否在服务器上已经存在。
-
- :rtype: bool
- """
- return self.has("createdAt")
-
-[文档] def get_acl(self):
- """
- 返回当前对象的 ACL。
-
- :return: 当前对象的 ACL
- :rtype: leancloud.ACL
- """
- return self.get("ACL")
-
-[文档] def set_acl(self, acl):
- """
- 为当前对象设置 ACL
-
- :type acl: leancloud.ACL
- :return: 当前对象
- """
-
- return self.set("ACL", acl)
-
-[文档] def disable_before_hook(self):
- hook_key = client.get_app_info().get("hook_key")
- master_key = client.get_app_info().get("master_key")
- if hook_key or master_key:
- self.ignore_hook("beforeSave")
- self.ignore_hook("beforeUpdate")
- self.ignore_hook("beforeDelete")
- return self
- else:
- raise ValueError("disable_before_hook needs master key or hook key")
-
-[文档] def disable_after_hook(self):
- hook_key = client.get_app_info().get("hook_key")
- master_key = client.get_app_info().get("master_key")
- if hook_key or master_key:
- self.ignore_hook("afterSave")
- self.ignore_hook("afterUpdate")
- self.ignore_hook("afterDelete")
- return self
- else:
- raise ValueError("disable_after_hook needs master key or hook key")
-
-[文档] def ignore_hook(self, hook_name):
- if hook_name not in {
- "beforeSave",
- "afterSave",
- "beforeUpdate",
- "afterUpdate",
- "beforeDelete",
- "afterDelete",
- }:
- raise ValueError("invalid hook name: " + hook_name)
- if "__ignore_hooks" not in self._flags:
- self._flags["__ignore_hooks"] = []
- self._flags["__ignore_hooks"].append(hook_name)
-
- def _update_data(self, server_data):
- self._merge_metadata(server_data)
- for key, value in six.iteritems(server_data):
- self._attributes[key] = utils.decode(key, value)
- self._changes = {}
-
-[文档] @staticmethod
- def as_class(arg):
- def inner_decorator(cls):
- cls._class_name = arg
- return cls
-
- return inner_decorator
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import arrow
-import dateutil.tz as tz
-
-from leancloud.object_ import Object
-from leancloud.errors import LeanCloudError
-from leancloud import client
-
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-
-
-
-[文档]class Notification(Object):
-[文档] def fetch(self, *args, **kwargs):
- """同步服务器的 Notification 数据
- """
- response = client.get("/tables/Notifications/{0}".format(self.id))
- self._update_data(response.json())
-
-[文档] def save(self, *args, **kwargs):
- raise LeanCloudError(code=1, error="Notification does not support modify")
-
-
-def _encode_time(time):
- tzinfo = time.tzinfo
- if tzinfo is None:
- tzinfo = tz.tzlocal()
- return arrow.get(time, tzinfo).to("utc").format("YYYY-MM-DDTHH:mm:ss.SSS") + "Z"
-
-
-[文档]def send(
- data,
- channels=None,
- push_time=None,
- expiration_time=None,
- expiration_interval=None,
- where=None,
- cql=None,
- flow_control=None,
- prod=None,
-):
- """
- 发送推送消息。返回结果为此条推送对应的 _Notification 表中的对象,但是如果需要使用其中的数据,需要调用 fetch() 方法将数据同步至本地。
-
- :param channels: 需要推送的频道
- :type channels: list or tuple
- :param push_time: 推送的时间
- :type push_time: datetime
- :param expiration_time: 消息过期的绝对日期时间
- :type expiration_time: datetime
- :param expiration_interval: 消息过期的相对时间,从调用 API 的时间开始算起,单位是秒
- :type expiration_interval: int
- :param where: 一个查询 _Installation 表的查询条件 leancloud.Query 对象
- :type where: leancloud.Query
- :param cql: 一个查询 _Installation 表的查询条件 CQL 语句
- :type cql: string_types
- :param data: 推送给设备的具体信息,详情查看 https://leancloud.cn/docs/push_guide.html#消息内容_Data
- :rtype: Notification
- :param flow_control: 不为 None 时开启平滑推送,值为每秒推送的目标终端用户数。开启时指定低于 1000 的值,按 1000 计。
- :type: flow_control: int
- :param prod: 仅对 iOS 推送有效,设置将推送发至 APNs 的开发环境(dev)还是生产环境(prod)。
- :type: prod: string
- """
- if expiration_interval and expiration_time:
- raise TypeError("Both expiration_time and expiration_interval can't be set")
-
- params = {
- "data": data,
- }
-
- if prod is None:
- if client.USE_PRODUCTION == "0":
- params["prod"] = "dev"
- else:
- params["prod"] = prod
-
- if channels:
- params["channels"] = channels
- if push_time:
- params["push_time"] = _encode_time(push_time)
- if expiration_time:
- params["expiration_time"] = _encode_time(expiration_time)
- if expiration_interval:
- params["expiration_interval"] = expiration_interval
- if where:
- params["where"] = where.dump().get("where", {})
- if cql:
- params["cql"] = cql
- # Do not change this to `if flow_control`, because 0 is falsy in Python,
- # but `flow_control = 0` will enable smooth push,
- # and it is in fact equivalent to `flow_control = 1000`.
- if flow_control is not None:
- params["flow_control"] = flow_control
-
- result = client.post("/push", params=params).json()
-
- notification = Notification.create_without_data(result["objectId"])
- return notification
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import json
-
-import six
-
-import leancloud
-from leancloud import client
-from leancloud import utils
-from leancloud.file_ import File
-from leancloud.object_ import Object
-from leancloud.errors import LeanCloudError
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-class CQLResult(object):
- """
- CQL 查询结果对象。
-
- Attributes:
- results: 返回的查询结果
-
- count: 如果查询语句包含 count,将保存在此字段
-
- class_name: 查询的 class 名称
- """
-
- __slots__ = ["results", "count", "class_name"]
-
- def __init__(self, results, count, class_name):
- self.results = results
- self.count = count
- self.class_name = class_name
-
-
-class Cursor(object):
- """
- Query.scan 返回结果对象。
- """
-
- def __init__(self, query_class, batch_size, scan_key, params):
- self._params = params
- self._query_class = query_class
-
- if batch_size is not None:
- self._params["limit"] = batch_size
-
- if scan_key is not None:
- self._params["scan_key"] = scan_key
-
- def __iter__(self):
- while True:
- content = client.get(
- "/scan/classes/{}".format(self._query_class._class_name), self._params
- ).json()
- for result in content["results"]:
- obj = self._query_class()
- obj._update_data(result)
- yield obj
-
- if not content.get("cursor"):
- break
-
- self._params["cursor"] = content["cursor"]
-
-
-[文档]class Query(object):
- def __init__(self, query_class):
- """
-
- :param query_class: 要查询的 class 名称或者对象
- :type query_class: string_types or leancloud.ObjectMeta
- """
- if isinstance(query_class, six.string_types):
- if query_class in ("File", "_File"):
- query_class = File
- else:
- query_class = Object.extend(query_class)
-
- if not isinstance(query_class, (type, six.class_types)) or not issubclass(
- query_class, (File, Object)
- ):
- raise ValueError("Query takes string or LeanCloud Object")
-
- self._query_class = query_class
-
- self._where = {}
- self._include = []
- self._include_acl = None
- self._limit = -1
- self._skip = 0
- self._extra = {}
- self._order = []
- self._select = []
-
-[文档] @classmethod
- def or_(cls, *queries):
- """
- 根据传入的 Query 对象,构造一个新的 OR 查询。
-
- :param queries: 需要构造的子查询列表
- :rtype: Query
- """
- if len(queries) < 2:
- raise ValueError("or_ need two queries at least")
- if not all(
- x._query_class._class_name == queries[0]._query_class._class_name
- for x in queries
- ):
- raise TypeError("All queries must be for the same class")
- query = Query(queries[0]._query_class._class_name)
- query._or_query(queries)
- return query
-
-[文档] @classmethod
- def and_(cls, *queries):
- """
- 根据传入的 Query 对象,构造一个新的 AND 查询。
-
- :param queries: 需要构造的子查询列表
- :rtype: Query
- """
- if len(queries) < 2:
- raise ValueError("and_ need two queries at least")
- if not all(
- x._query_class._class_name == queries[0]._query_class._class_name
- for x in queries
- ):
- raise TypeError("All queries must be for the same class")
- query = Query(queries[0]._query_class._class_name)
- query._and_query(queries)
- return query
-
-[文档] @classmethod
- def do_cloud_query(cls, cql, *pvalues):
- """
- 使用 CQL 来构造查询。CQL 语法参考 `这里 <https://cn.avoscloud.com/docs/cql_guide.html>`_。
-
- :param cql: CQL 语句
- :param pvalues: 查询参数
- :rtype: CQLResult
- """
- params = {"cql": cql}
- if len(pvalues) == 1 and isinstance(pvalues[0], (tuple, list)):
- pvalues = json.dumps(pvalues[0])
- if len(pvalues) > 0:
- params["pvalues"] = json.dumps(pvalues)
-
- content = client.get("/cloudQuery", params).json()
-
- objs = []
- query = cls(content["className"])
- for result in content["results"]:
- obj = query._new_object()
- obj._update_data(query._process_result(result))
- objs.append(obj)
-
- return CQLResult(objs, content.get("count"), content.get("className"))
-
-[文档] def dump(self):
- """
- :return: 当前对象的序列化结果
- :rtype: dict
- """
- params = {
- "where": self._where,
- }
- if self._include:
- params["include"] = ",".join(self._include)
- if self._select:
- params["keys"] = ",".join(self._select)
- if self._include_acl is not None:
- params["returnACL"] = json.dumps(self._include_acl)
- if self._limit >= 0:
- params["limit"] = self._limit
- if self._skip > 0:
- params["skip"] = self._skip
- if self._order:
- params["order"] = ",".join(self._order)
- params.update(self._extra)
- return params
-
- def _new_object(self):
- return self._query_class()
-
- def _process_result(self, obj):
- return obj
-
- def _do_request(self, params):
- return client.get(
- "/classes/{0}".format(self._query_class._class_name), params
- ).json()
-
-[文档] def first(self):
- """
- 根据查询获取最多一个对象。
-
- :return: 查询结果
- :rtype: Object
- :raise: LeanCloudError
- """
- params = self.dump()
- params["limit"] = 1
- content = self._do_request(params)
- results = content["results"]
- if not results:
- raise LeanCloudError(101, "Object not found")
- obj = self._new_object()
- obj._update_data(self._process_result(results[0]))
- return obj
-
-[文档] def get(self, object_id):
- """
- 根据 objectId 查询。
-
- :param object_id: 要查询对象的 objectId
- :return: 查询结果
- :rtype: Object
- """
- if not object_id:
- raise LeanCloudError(code=101, error="Object not found.")
- obj = self._query_class.create_without_data(object_id)
- obj.fetch(select=self._select, include=self._include)
- return obj
-
-[文档] def find(self):
- """
- 根据查询条件,获取包含所有满足条件的对象。
-
- :rtype: list
- """
- content = self._do_request(self.dump())
-
- objs = []
- for result in content["results"]:
- obj = self._new_object()
- obj._update_data(self._process_result(result))
- objs.append(obj)
-
- return objs
-
-[文档] def scan(self, batch_size=None, scan_key=None):
- params = self.dump()
- if "skip" in params:
- raise LeanCloudError(1, "Query.scan dose not support skip option")
- if "limit" in params:
- raise LeanCloudError(1, "Query.scan dose not support limit option")
- return Cursor(self._query_class, batch_size, scan_key, params)
-
-[文档] def count(self):
- """
- 返回满足查询条件的对象的数量。
-
- :rtype: int
- """
- params = self.dump()
- params["limit"] = 0
- params["count"] = 1
- content = self._do_request(params)
- return content["count"]
-
-[文档] def skip(self, n):
- """
- 查询条件中跳过指定个数的对象,在做分页时很有帮助。
-
- :param n: 需要跳过对象的个数
- :rtype: Query
- """
- self._skip = n
- return self
-
-[文档] def limit(self, n):
- """
- 设置查询返回结果的数量。如果不设置,默认为 100。最大返回数量为 1000,如果超过这个数量,需要使用多次查询来获取结果。
-
- :param n: 限制结果的数量
- :rtype: Query
- """
- if n > 1000:
- raise ValueError("limit only accept number less than or equal to 1000")
- self._limit = n
- return self
-
-[文档] def include_acl(self, value=True):
- """
- 设置查询结果的对象,是否包含 ACL 字段。需要在控制台选项中开启对应选项才能生效。
-
- :param value: 是否包含 ACL,默认为 True
- :type value: bool
- :rtype: Query
- """
- self._include_acl = value
- return self
-
-[文档] def equal_to(self, key, value):
- """
- 增加查询条件,查询字段的值必须为指定值。
-
- :param key: 查询条件的字段名
- :param value: 查询条件的值
- :rtype: Query
- """
- self._where[key] = utils.encode(value)
- return self
-
-[文档] def size_equal_to(self, key, size):
- """
- 增加查询条件,限制查询结果指定数组字段长度与查询值相同
-
- :param key: 查询条件数组字段名
- :param size: 查询条件值
- :rtype: Query
- """
- self._add_condition(key, "$size", size)
- return self
-
- def _add_condition(self, key, condition, value):
- if not self._where.get(key):
- self._where[key] = {}
- self._where[key][condition] = utils.encode(value)
- return self
-
-[文档] def not_equal_to(self, key, value):
- """
- 增加查询条件,限制查询结果指定字段的值与查询值不同
-
- :param key: 查询条件字段名
- :param value: 查询条件值
- :rtype: Query
- """
- self._add_condition(key, "$ne", value)
- return self
-
-[文档] def less_than(self, key, value):
- """
- 增加查询条件,限制查询结果指定字段的值小于查询值
-
- :param key: 查询条件字段名
- :param value: 查询条件值
- :rtype: Query
- """
- self._add_condition(key, "$lt", value)
- return self
-
-[文档] def greater_than(self, key, value):
- """
- 增加查询条件,限制查询结果指定字段的值大于查询值
-
- :param key: 查询条件字段名
- :param value: 查询条件值
- :rtype: Query
- """
- self._add_condition(key, "$gt", value)
- return self
-
-[文档] def less_than_or_equal_to(self, key, value):
- """
- 增加查询条件,限制查询结果指定字段的值小于等于查询值
-
- :param key: 查询条件字段名
- :param value: 查询条件值
- :rtype: Query
- """
- self._add_condition(key, "$lte", value)
- return self
-
-[文档] def greater_than_or_equal_to(self, key, value):
- """
- 增加查询条件,限制查询结果指定字段的值大于等于查询值
-
- :param key: 查询条件字段名
- :param value: 查询条件值名
- :rtype: Query
- """
- self._add_condition(key, "$gte", value)
- return self
-
-[文档] def contained_in(self, key, values):
- """
- 增加查询条件,限制查询结果指定字段的值在查询值列表中
-
- :param key: 查询条件字段名
- :param values: 查询条件值
- :type values: list or tuple
- :rtype: Query
- """
- self._add_condition(key, "$in", values)
- return self
-
-[文档] def not_contained_in(self, key, values):
- """
- 增加查询条件,限制查询结果指定字段的值不在查询值列表中
-
- :param key: 查询条件字段名
- :param values: 查询条件值
- :type values: list or tuple
- :rtype: Query
- """
- self._add_condition(key, "$nin", values)
- return self
-
-[文档] def contains_all(self, key, values):
- """
- 增加查询条件,限制查询结果指定字段的值全部包含与查询值列表中
-
- :param key: 查询条件字段名
- :param values: 查询条件值
- :type values: list or tuple
- :rtype: Query
- """
- self._add_condition(key, "$all", values)
- return self
-
-[文档] def exists(self, key):
- """
- 增加查询条件,限制查询结果对象包含指定字段
-
- :param key: 查询条件字段名
- :rtype: Query
- """
- self._add_condition(key, "$exists", True)
- return self
-
-[文档] def does_not_exist(self, key):
- """
- 增加查询条件,限制查询结果对象不包含指定字段
-
- :param key: 查询条件字段名
- :rtype: Query
- """
- self._add_condition(key, "$exists", False)
- return self
-
-[文档] def matched(self, key, regex, ignore_case=False, multi_line=False):
- """
- 增加查询条件,限制查询结果对象指定字段满足指定的正则表达式。
-
- :param key: 查询条件字段名
- :param regex: 查询正则表达式
- :param ignore_case: 查询是否忽略大小写,默认不忽略
- :param multi_line: 查询是否匹配多行,默认不匹配
- :rtype: Query
- """
- if not isinstance(regex, six.string_types):
- raise TypeError("matched only accept str or unicode")
- self._add_condition(key, "$regex", regex)
- modifiers = ""
- if ignore_case:
- modifiers += "i"
- if multi_line:
- modifiers += "m"
- if modifiers:
- self._add_condition(key, "$options", modifiers)
- return self
-
-[文档] def matches_query(self, key, query):
- """
- 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果相同。
-
- :param key: 查询条件字段名
- :param query: 查询对象
- :type query: Query
- :rtype: Query
- """
- dumped = query.dump()
- dumped["className"] = query._query_class._class_name
- self._add_condition(key, "$inQuery", dumped)
- return self
-
-[文档] def does_not_match_query(self, key, query):
- """
- 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果不相同。
-
- :param key: 查询条件字段名
- :param query: 查询对象
- :type query: Query
- :rtype: Query
- """
- dumped = query.dump()
- dumped["className"] = query._query_class._class_name
- self._add_condition(key, "$notInQuery", dumped)
- return self
-
-[文档] def matches_key_in_query(self, key, query_key, query):
- """
- 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值相同。
-
- :param key: 查询条件字段名
- :param query_key: 查询对象返回结果的字段名
- :param query: 查询对象
- :type query: Query
- :rtype: Query
- """
- dumped = query.dump()
- dumped["className"] = query._query_class._class_name
- self._add_condition(key, "$select", {"key": query_key, "query": dumped})
- return self
-
-[文档] def does_not_match_key_in_query(self, key, query_key, query):
- """
- 增加查询条件,限制查询结果对象指定字段的值,与另外一个查询对象的返回结果指定的值不相同。
-
- :param key: 查询条件字段名
- :param query_key: 查询对象返回结果的字段名
- :param query: 查询对象
- :type query: Query
- :rtype: Query
- """
- dumped = query.dump()
- dumped["className"] = query._query_class._class_name
- self._add_condition(key, "$dontSelect", {"key": query_key, "query": dumped})
- return self
-
- def _or_query(self, queries):
- dumped = [q.dump()["where"] for q in queries]
- self._where["$or"] = dumped
- return self
-
- def _and_query(self, queries):
- dumped = [q.dump()["where"] for q in queries]
- self._where["$and"] = dumped
-
- def _quote(self, s):
- # return "\\Q" + s.replace("\\E", "\\E\\\\E\\Q") + "\\E"
- return s
-
-[文档] def contains(self, key, value):
- """
- 增加查询条件,限制查询结果对象指定最短的值,包含指定字符串。在数据量比较大的情况下会比较慢。
-
- :param key: 查询条件字段名
- :param value: 需要包含的字符串
- :rtype: Query
- """
- self._add_condition(key, "$regex", self._quote(value))
- return self
-
-[文档] def startswith(self, key, value):
- """
- 增加查询条件,限制查询结果对象指定最短的值,以指定字符串开头。在数据量比较大的情况下会比较慢。
-
- :param key: 查询条件字段名
- :param value: 需要查询的字符串
- :rtype: Query
- """
- value = value if isinstance(value, six.text_type) else value.decode("utf-8")
- self._add_condition(key, "$regex", "^" + self._quote(value))
- return self
-
-[文档] def endswith(self, key, value):
- """
- 增加查询条件,限制查询结果对象指定最短的值,以指定字符串结尾。在数据量比较大的情况下会比较慢。
-
- :param key: 查询条件字段名
- :param value: 需要查询的字符串
- :rtype: Query
- """
- value = value if isinstance(value, six.text_type) else value.decode("utf-8")
- self._add_condition(key, "$regex", self._quote(value) + "$")
- return self
-
-[文档] def ascending(self, key):
- """
- 限制查询返回结果以指定字段升序排序。
-
- :param key: 排序字段名
- :rtype: Query
- """
- self._order = [key]
- return self
-
-[文档] def add_ascending(self, key):
- """
- 增加查询排序条件。之前指定的排序条件优先级更高。
-
- :param key: 排序字段名
- :rtype: Query
- """
- self._order.append(key)
- return self
-
-[文档] def descending(self, key):
- """
- 限制查询返回结果以指定字段降序排序。
-
- :param key: 排序字段名
- :rtype: Query
- """
- self._order = ["-{0}".format(key)]
- return self
-
-[文档] def add_descending(self, key):
- """
- 增加查询排序条件。之前指定的排序条件优先级更高。
-
- :param key: 排序字段名
- :rtype: Query
- """
- self._order.append("-{0}".format(key))
- return self
-
-[文档] def near(self, key, point):
- """
- 增加查询条件,限制返回结果指定字段值的位置与给定地理位置临近。
-
- :param key: 查询条件字段名
- :param point: 需要查询的地理位置
- :rtype: Query
- """
- if point is None:
- raise ValueError("near query does not accept None")
-
- self._add_condition(key, "$nearSphere", point)
- return self
-
-[文档] def within_radians(self, key, point, max_distance, min_distance=None):
- """
- 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
-
- :param key: 查询条件字段名
- :param point: 查询地理位置
- :param max_distance: 最大距离限定(弧度)
- :param min_distance: 最小距离限定(弧度)
- :rtype: Query
- """
- self.near(key, point)
- self._add_condition(key, "$maxDistance", max_distance)
- if min_distance is not None:
- self._add_condition(key, "$minDistance", min_distance)
- return self
-
-[文档] def within_miles(self, key, point, max_distance, min_distance=None):
- """
- 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
-
- :param key: 查询条件字段名
- :param point: 查询地理位置
- :param max_distance: 最大距离限定(英里)
- :param min_distance: 最小距离限定(英里)
- :rtype: Query
- """
- if min_distance is not None:
- min_distance = min_distance / 3958.8
- return self.within_radians(key, point, max_distance / 3958.8, min_distance)
-
-[文档] def within_kilometers(self, key, point, max_distance, min_distance=None):
- """
- 增加查询条件,限制返回结果指定字段值的位置在某点的一段距离之内。
-
- :param key: 查询条件字段名
- :param point: 查询地理位置
- :param max_distance: 最大距离限定(千米)
- :param min_distance: 最小距离限定(千米)
- :rtype: Query
- """
- if min_distance is not None:
- min_distance = min_distance / 6371.0
- return self.within_radians(key, point, max_distance / 6371.0, min_distance)
-
-[文档] def within_geo_box(self, key, southwest, northeast):
- """
- 增加查询条件,限制返回结果指定字段值的位置在指定坐标范围之内。
-
- :param key: 查询条件字段名
- :param southwest: 限制范围西南角坐标
- :param northeast: 限制范围东北角坐标
- :rtype: Query
- """
- self._add_condition(key, "$within", {"$box": [southwest, northeast]})
- return self
-
-[文档] def include(self, *keys):
- """
- 指定查询返回结果中包含关联表字段。
-
- :param keys: 关联子表字段名
- :rtype: Query
- """
- if len(keys) == 1 and isinstance(keys[0], (list, tuple)):
- keys = keys[0]
- self._include += keys
- return self
-
-[文档] def select(self, *keys):
- """
- 指定查询返回结果中只包含某些字段。可以重复调用,每次调用的包含内容都将会被返回。
-
- :param keys: 包含字段名
- :rtype: Query
- """
- if len(keys) == 1 and isinstance(keys[0], (list, tuple)):
- keys = keys[0]
- self._select += keys
- return self
-
-
-[文档]class FriendshipQuery(Query):
- def __init__(self, query_class):
- super(FriendshipQuery, self).__init__(query_class)
- if query_class in ("_Follower", "Follower"):
- self._friendship_tag = "follower"
- elif query_class in ("_Followee", "Followee"):
- self._friendship_tag = "followee"
- else:
- raise TypeError("FriendshipQuery takes only follower or followee")
-
- def _new_object(self):
- return leancloud.User()
-
- def _process_result(self, obj):
- content = obj[self._friendship_tag]
- if content["__type"] == "Pointer" and content["className"] == "_User":
- del content["__type"]
- del content["className"]
- return content
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import leancloud
-from leancloud import operation
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]class Relation(object):
- def __init__(self, parent, key=None):
- self.parent = parent
- self.key = key
- self.target_class_name = None
-
-[文档] @classmethod
- def reverse_query(cls, parent_class, relation_key, child):
- """
- 创建一个新的 Query 对象,反向查询所有指向此 Relation 的父对象。
-
- :param parent_class: 父类名称
- :param relation_key: 父类中 Relation 的字段名
- :param child: 子类对象
- :return: leancloud.Query
- """
- q = leancloud.Query(parent_class)
- q.equal_to(relation_key, child._to_pointer())
- return q
-
- def _ensure_parent_and_key(self, parent=None, key=None):
- if self.parent is None:
- self.parent = parent
- if self.key is None:
- self.key = key
-
- if self.parent != parent:
- raise TypeError("relation retrieved from two different object")
- if self.key != key:
- raise TypeError("relation retrieved from two different object")
-
-[文档] def add(self, *obj_or_objs):
- """
- 添加一个新的 leancloud.Object 至 Relation。
-
- :param obj_or_objs: 需要添加的对象或对象列表
- """
- objs = obj_or_objs
- if not isinstance(obj_or_objs, (list, tuple)):
- objs = (obj_or_objs,)
- change = operation.Relation(objs, ())
- self.parent.set(self.key, change)
- self.target_class_name = change._target_class_name
-
-[文档] def remove(self, *obj_or_objs):
- """
- 从一个 Relation 中删除一个 leancloud.Object 。
-
- :param obj_or_objs: 需要删除的对象或对象列表
- :return:
- """
- objs = obj_or_objs
- if not isinstance(obj_or_objs, (list, tuple)):
- objs = (obj_or_objs,)
- change = operation.Relation((), objs)
- self.parent.set(self.key, change)
- self.target_class_name = change._target_class_name
-
-
-
- @property
- def query(self):
- """
- 获取指向 Relation 内容的 Query 对象。
-
- :rtype: leancloud.Query
- """
-
- if self.target_class_name is None:
- target_class = leancloud.Object.extend(self.parent._class_name)
- query = leancloud.Query(target_class)
- query._extra["redirectClassNameForKey"] = self.key
- else:
- target_class = leancloud.Object.extend(self.target_class_name)
- query = leancloud.Query(target_class)
-
- query._add_condition("$relatedTo", "object", self.parent._to_pointer())
- query._add_condition("$relatedTo", "key", self.key)
-
- return query
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import re
-
-import six
-
-import leancloud
-
-__author__ = "asaka <lan@leancloud.rocks>"
-
-
-[文档]class Role(leancloud.Object):
- def __init__(self, name=None, acl=None):
- super(Role, self).__init__()
- if name:
- self.set_name(name)
- if acl is None:
- acl = leancloud.ACL()
- acl.set_public_read_access(True)
- self.set_acl(acl)
-
- @property
- def name(self):
- return self.get("name")
-
- @name.setter
- def name(self, name):
- return self.set("name", name)
-
-[文档] def get_name(self):
- """
- 获取 Role 的 name,等同于 role.get('name')
- """
- return self.get("name")
-
-[文档] def set_name(self, name):
- """
- 为 Role 设置 name,等同于 role.set('name', name)
- """
- return self.set("name", name)
-
- @property
- def users(self):
- return self.relation("users")
-
-
-
- @property
- def roles(self):
- return self.relation("roles")
-
-
-
-[文档] def validate(self, attrs):
- if "name" in attrs and attrs["name"] != self.get_name():
- new_name = attrs["name"]
- if not isinstance(new_name, six.string_types):
- raise TypeError("role name must be string_types")
- r = re.compile(r"^[0-9a-zA-Z\-_]+$")
- if not r.match(new_name):
- raise TypeError(
- """
- role's name can only contain alphanumeric characters, _, -, and spaces.
- """
- )
-
- return super(Role, self).validate(attrs)
-
-# coding: utf-8
-
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-
-import threading
-from typing import Optional
-
-import six
-
-from leancloud import client
-from leancloud.errors import LeanCloudError
-from leancloud.query import FriendshipQuery
-from leancloud.object_ import Object
-from leancloud.relation import Relation
-
-__author__ = "asaka"
-
-
-thread_locals = threading.local()
-thread_locals.current_user = None
-
-
-[文档]class User(Object):
- def __init__(self, **attrs):
- self._session_token = None
- super(User, self).__init__(**attrs)
-
-
-
- @property
- def session_token(self):
- return self._session_token
-
- def _merge_metadata(self, attrs):
- if "sessionToken" in attrs:
- self._session_token = attrs.pop("sessionToken")
-
- return super(User, self)._merge_metadata(attrs)
-
-[文档] @classmethod
- def create_follower_query(cls, user_id):
- if not user_id or not isinstance(user_id, six.string_types):
- raise TypeError("invalid user_id: {0}".format(user_id))
- query = FriendshipQuery("_Follower")
- query.equal_to("user", User.create_without_data(user_id))
- return query
-
-[文档] @classmethod
- def create_followee_query(cls, user_id):
- if not user_id or not isinstance(user_id, six.string_types):
- raise TypeError("invalid user_id: {0}".format(user_id))
- query = FriendshipQuery("_Followee")
- query.equal_to("user", User.create_without_data(user_id))
- return query
-
-[文档] @classmethod
- def get_current(cls): # type: () -> Optional[User]
- return getattr(thread_locals, "current_user", None)
-
-
-
-[文档] @classmethod
- def become(cls, session_token):
- """
- 通过 session token 获取用户对象
-
- :param session_token: 用户的 session token
- :return: leancloud.User
- """
- response = client.get("/users/me", params={"session_token": session_token})
- content = response.json()
- user = cls()
- user._update_data(content)
- user._handle_save_result(True)
- if "smsCode" not in content:
- user._attributes.pop("smsCode", None)
- return user
-
- @property
- def is_current(self):
- if not getattr(thread_locals, "current_user", None):
- return False
- return self.id == thread_locals.current_user.id
-
- def _cleanup_auth_data(self):
- if not self.is_current:
- return
- auth_data = self.get("authData")
- if not auth_data:
- return
- keys = list(auth_data.keys())
- for key in keys:
- if not auth_data[key]:
- del auth_data[key]
-
- def _handle_save_result(self, make_current=False):
- if make_current:
- User.set_current(self)
- self._cleanup_auth_data()
- # self._sync_all_auth_data()
- self._attributes.pop("password", None)
-
-[文档] def save(self, make_current=False):
- super(User, self).save()
- self._handle_save_result(make_current)
-
-[文档] def sign_up(self, username=None, password=None):
- """
- 创建一个新用户。新创建的 User 对象,应该使用此方法来将数据保存至服务器,而不是使用 save 方法。
- 用户对象上必须包含 username 和 password 两个字段
- """
- if username:
- self.set("username", username)
- if password:
- self.set("password", password)
-
- username = self.get("username")
- if not username:
- raise TypeError("invalid username: {0}".format(username))
- password = self.get("password")
- if not password:
- raise TypeError("invalid password")
-
- self.save(make_current=True)
-
-[文档] def login(self, username=None, password=None, email=None):
- """
- 登录用户。成功登录后,服务器会返回用户的 sessionToken 。
-
- :param username: 用户名
- :param email: 邮箱地址(username 和 email 这两个参数必须传入一个且仅能传入一个)
- :param password: 用户密码
- """
- if username:
- self.set("username", username)
- if password:
- self.set("password", password)
- if email:
- self.set("email", email)
- # 同时传入 username、email、password 的情况下,这三个字段会一起发给后端。
- # 这时后端会忽略 email,等价于只传 username 和 password。
- # 这里的 login 函数的实现依赖后端的这一行为,没有校验 username 和 email 中调用者传入且仅传入了其中一个参数。
- response = client.post("/login", params=self.dump())
- content = response.json()
- self._update_data(content)
- self._handle_save_result(True)
- if "smsCode" not in content:
- self._attributes.pop("smsCode", None)
-
-[文档] def logout(self):
- if not self.is_current:
- return
- self._cleanup_auth_data()
- del thread_locals.current_user
-
-[文档] @classmethod
- def login_with_mobile_phone(cls, phone_number, password):
- user = User()
- params = {"mobilePhoneNumber": phone_number, "password": password}
- user._update_data(params)
- user.login()
- return user
-
-[文档] def follow(self, target_id):
- """
- 关注一个用户。
-
- :param target_id: 需要关注的用户的 id
- """
- if self.id is None:
- raise ValueError("Please sign in")
- response = client.post(
- "/users/{0}/friendship/{1}".format(self.id, target_id), None
- )
- assert response.ok
-
-[文档] def unfollow(self, target_id):
- """
- 取消关注一个用户。
-
- :param target_id: 需要关注的用户的 id
- :return:
- """
- if self.id is None:
- raise ValueError("Please sign in")
- response = client.delete(
- "/users/{0}/friendship/{1}".format(self.id, target_id), None
- )
- assert response.ok
-
-[文档] @classmethod
- def login_with(cls, platform, third_party_auth_data):
- """
- 把第三方平台号绑定到 User 上
-
- :param platform: 第三方平台名称 base string
- """
- user = User()
- return user.link_with(platform, third_party_auth_data)
-
-[文档] def link_with(self, provider, third_party_auth_data):
- if type(provider) != str:
- raise TypeError("input should be a string")
- auth_data = self.get("authData")
- if not auth_data:
- auth_data = {}
- auth_data[provider] = third_party_auth_data
- self.set("authData", auth_data)
- self.save()
- self._handle_save_result(True)
- return self
-
-[文档] def unlink_from(self, provider):
- """
- 解绑特定第三方平台
- """
- if type(provider) != str:
- raise TypeError("input should be a string")
- self.link_with(provider, None)
- # self._sync_auth_data(provider)
- return self
-
-[文档] def is_linked(self, provider):
- try:
- self.get("authData")[provider]
- except KeyError:
- return False
- return True
-
-[文档] @classmethod
- def signup_or_login_with_mobile_phone(cls, phone_number, sms_code):
- """
- param phone_nubmer: string_types
- param sms_code: string_types
-
- 在调用此方法前请先使用 request_sms_code 请求 sms code
- """
- data = {"mobilePhoneNumber": phone_number, "smsCode": sms_code}
- response = client.post("/usersByMobilePhone", data)
- content = response.json()
- user = cls()
- user._update_data(content)
- user._handle_save_result(True)
- if "smsCode" not in content:
- user._attributes.pop("smsCode", None)
- return user
-
-[文档] def update_password(self, old_password, new_password):
- route = "/users/" + self.id + "/updatePassword"
- params = {"old_password": old_password, "new_password": new_password}
- content = client.put(route, params).json()
- self._update_data(content)
- self._handle_save_result(True)
-
-
-
-
-
-[文档] def set_mobile_phone_number(self, phone_number):
- return self.set("mobilePhoneNumber", phone_number)
-
-
-
-
-
-
-
-
-
-
-
-[文档] def refresh_session_token(self):
- """
- 重置当前用户 `session token`。
- 会使其他客户端已登录用户登录失效。
- """
- response = client.put("/users/{}/refreshSessionToken".format(self.id), None)
- content = response.json()
- self._update_data(content)
- self._handle_save_result(False)
-
-[文档] def is_authenticated(self):
- """
- 判断当前用户对象是否已登录。
- 会先检查此用户对象上是否有 `session_token`,如果有的话,会继续请求服务器验证 `session_token` 是否合法。
- """
- session_token = self.get_session_token()
- if not session_token:
- return False
- try:
- response = client.get("/users/me", params={"session_token": session_token})
- except LeanCloudError as e:
- if e.code == 211:
- return False
- else:
- raise
- return response.status_code == 200
-
-[文档] @classmethod
- def request_password_reset(cls, email):
- params = {"email": email}
- client.post("/requestPasswordReset", params)
-
-[文档] @classmethod
- def request_email_verify(cls, email):
- params = {"email": email}
- client.post("/requestEmailVerify", params)
-
-[文档] @classmethod
- def request_mobile_phone_verify(cls, phone_number, validate_token=None):
- params = {"mobilePhoneNumber": phone_number}
- if validate_token is not None:
- params["validate_token"] = validate_token
- client.post("/requestMobilePhoneVerify", params)
-
-[文档] @classmethod
- def request_password_reset_by_sms_code(cls, phone_number, validate_token=None):
- params = {"mobilePhoneNumber": phone_number}
- if validate_token is not None:
- params["validate_token"] = validate_token
- client.post("/requestPasswordResetBySmsCode", params)
-
-[文档] @classmethod
- def reset_password_by_sms_code(cls, sms_code, new_password):
- params = {"password": new_password}
- client.put("/resetPasswordBySmsCode/" + sms_code, params)
-
- # This should be an instance method.
- # However, to be consistent with other similar methods (`request_password_reset_by_sms_code`),
- # it is implemented as a class method.
-[文档] @classmethod
- def request_change_phone_number(cls, phone_number, ttl=None, validate_token=None):
- params = {"mobilePhoneNumber": phone_number}
- if ttl is not None:
- params["ttl"] = ttl
- if validate_token is not None:
- params["validate_token"] = validate_token
- client.post("/requestChangePhoneNumber", params)
-
- # This should be an instance method and update the local date,
- # but it is implemented as a class method for the same reason as above.
-[文档] @classmethod
- def change_phone_number(cls, sms_code, phone_number):
- params = {"mobilePhoneNumber": phone_number, "code": sms_code}
- client.post("/changePhoneNumber", params)
-
-[文档] @classmethod
- def verify_mobile_phone_number(cls, sms_code):
- client.post("/verifyMobilePhone/" + sms_code, {})
-
-[文档] @classmethod
- def request_login_sms_code(cls, phone_number, validate_token=None):
- params = {"mobilePhoneNumber": phone_number}
- if validate_token is not None:
- params["validate_token"] = validate_token
- client.post("/requestLoginSmsCode", params)
-
' + - '' + - Documentation.gettext("Hide Search Matches") + - "
" - ) - ); - }, - - /** - * helper function to hide the search marks again - */ - hideSearchWords: () => { - document - .querySelectorAll("#searchbox .highlight-link") - .forEach((el) => el.remove()); - document - .querySelectorAll("span.highlighted") - .forEach((el) => el.classList.remove("highlighted")); - const url = new URL(window.location); - url.searchParams.delete("highlight"); - window.history.replaceState({}, "", url); - }, - /** * helper function to focus on search bar */ @@ -210,15 +110,11 @@ const Documentation = { ) return; - const blacklistedElements = new Set([ - "TEXTAREA", - "INPUT", - "SELECT", - "BUTTON", - ]); document.addEventListener("keydown", (event) => { - if (blacklistedElements.has(document.activeElement.tagName)) return; // bail for input elements - if (event.altKey || event.ctrlKey || event.metaKey) return; // bail with special keys + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.altKey || event.ctrlKey || event.metaKey) return; if (!event.shiftKey) { switch (event.key) { @@ -240,10 +136,6 @@ const Documentation = { event.preventDefault(); } break; - case "Escape": - if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) break; - Documentation.hideSearchWords(); - event.preventDefault(); } } diff --git a/docs/_static/documentation_options.js b/docs/_static/documentation_options.js index a844c87..cdab159 100644 --- a/docs/_static/documentation_options.js +++ b/docs/_static/documentation_options.js @@ -10,5 +10,5 @@ var DOCUMENTATION_OPTIONS = { SOURCELINK_SUFFIX: '.txt', NAVIGATION_WITH_KEYS: false, SHOW_SEARCH_SUMMARY: true, - ENABLE_SEARCH_SHORTCUTS: false, + ENABLE_SEARCH_SHORTCUTS: true, }; \ No newline at end of file diff --git a/docs/_static/jquery-3.6.0.js b/docs/_static/jquery-3.6.0.js deleted file mode 100644 index fc6c299..0000000 --- a/docs/_static/jquery-3.6.0.js +++ /dev/null @@ -1,10881 +0,0 @@ -/*! - * jQuery JavaScript Library v3.6.0 - * https://jquery.com/ - * - * Includes Sizzle.js - * https://sizzlejs.com/ - * - * Copyright OpenJS Foundation and other contributors - * Released under the MIT license - * https://jquery.org/license - * - * Date: 2021-03-02T17:08Z - */ -( function( global, factory ) { - - "use strict"; - - if ( typeof module === "object" && typeof module.exports === "object" ) { - - // For CommonJS and CommonJS-like environments where a proper `window` - // is present, execute the factory and get jQuery. - // For environments that do not have a `window` with a `document` - // (such as Node.js), expose a factory as module.exports. - // This accentuates the need for the creation of a real `window`. - // e.g. var jQuery = require("jquery")(window); - // See ticket #14549 for more info. - module.exports = global.document ? - factory( global, true ) : - function( w ) { - if ( !w.document ) { - throw new Error( "jQuery requires a window with a document" ); - } - return factory( w ); - }; - } else { - factory( global ); - } - -// Pass this if window is not defined yet -} )( typeof window !== "undefined" ? window : this, function( window, noGlobal ) { - -// Edge <= 12 - 13+, Firefox <=18 - 45+, IE 10 - 11, Safari 5.1 - 9+, iOS 6 - 9.1 -// throw exceptions when non-strict code (e.g., ASP.NET 4.5) accesses strict mode -// arguments.callee.caller (trac-13335). But as of jQuery 3.0 (2016), strict mode should be common -// enough that all such attempts are guarded in a try block. -"use strict"; - -var arr = []; - -var getProto = Object.getPrototypeOf; - -var slice = arr.slice; - -var flat = arr.flat ? function( array ) { - return arr.flat.call( array ); -} : function( array ) { - return arr.concat.apply( [], array ); -}; - - -var push = arr.push; - -var indexOf = arr.indexOf; - -var class2type = {}; - -var toString = class2type.toString; - -var hasOwn = class2type.hasOwnProperty; - -var fnToString = hasOwn.toString; - -var ObjectFunctionString = fnToString.call( Object ); - -var support = {}; - -var isFunction = function isFunction( obj ) { - - // Support: Chrome <=57, Firefox <=52 - // In some browsers, typeof returns "function" for HTML