-
-
Notifications
You must be signed in to change notification settings - Fork 600
/
Copy pathclient.py
285 lines (213 loc) · 8.59 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import logging
import typing
from zeep.proxy import AsyncServiceProxy, ServiceProxy
from zeep.settings import Settings
from zeep.transports import AsyncTransport, Transport
from zeep.wsdl import Document
logger = logging.getLogger(__name__)
class Factory:
def __init__(self, types, kind, namespace):
self._method = getattr(types, "get_%s" % kind)
if namespace in types.namespaces:
self._ns = namespace
else:
self._ns = types.get_ns_prefix(namespace)
def __getattr__(self, key):
"""Return the complexType or simpleType for the given localname.
:rtype: zeep.xsd.ComplexType or zeep.xsd.AnySimpleType
"""
return self[key]
def __getitem__(self, key):
"""Return the complexType or simpleType for the given localname.
:rtype: zeep.xsd.ComplexType or zeep.xsd.AnySimpleType
"""
return self._method("{%s}%s" % (self._ns, key))
class Client:
"""The zeep Client.
:param wsdl: Url/local WSDL location or preparsed WSDL Document
:param wsse:
:param transport: Custom transport class.
:param service_name: The service name for the service binding. Defaults to
the first service in the WSDL document.
:param port_name: The port name for the default binding. Defaults to the
first port defined in the service element in the WSDL
document.
:param plugins: a list of Plugin instances
:param settings: a zeep.Settings() object
"""
_default_transport: typing.Union[Transport, AsyncTransport] = Transport
def __init__(
self,
wsdl,
wsse=None,
transport=None,
service_name=None,
port_name=None,
plugins=None,
settings=None,
):
if not wsdl:
raise ValueError("No URL given for the wsdl")
self.settings = settings or Settings()
self.transport = (
transport if transport is not None else self._default_transport()
)
if isinstance(wsdl, Document):
self.wsdl = wsdl
else:
self.wsdl = Document(wsdl, self.transport, settings=self.settings)
self.wsse = wsse
self.plugins = plugins if plugins is not None else []
self._default_service = None
self._default_service_name = service_name
self._default_port_name = port_name
self._default_soapheaders = None
@property
def namespaces(self):
return self.wsdl.types.prefix_map
@property
def service(self):
"""The default ServiceProxy instance
:rtype: ServiceProxy
"""
if self._default_service:
return self._default_service
self._default_service = self.bind(
service_name=self._default_service_name, port_name=self._default_port_name
)
if not self._default_service:
raise ValueError(
"There is no default service defined. This is usually due to "
"missing wsdl:service definitions in the WSDL"
)
return self._default_service
def bind(
self,
service_name: typing.Optional[str] = None,
port_name: typing.Optional[str] = None,
):
"""Create a new ServiceProxy for the given service_name and port_name.
The default ServiceProxy instance (`self.service`) always referes to
the first service/port in the wsdl Document. Use this when a specific
port is required.
"""
if not self.wsdl.services:
return
service = self._get_service(service_name)
port = self._get_port(service, port_name)
return ServiceProxy(self, port.binding, **port.binding_options)
def create_service(self, binding_name, address):
"""Create a new ServiceProxy for the given binding name and address.
:param binding_name: The QName of the binding
:param address: The address of the endpoint
"""
try:
binding = self.wsdl.bindings[binding_name]
except KeyError:
raise ValueError(
"No binding found with the given QName. Available bindings "
"are: %s" % (", ".join(self.wsdl.bindings.keys()))
)
return ServiceProxy(self, binding, address=address)
def create_message(self, service, operation_name, *args, **kwargs):
"""Create the payload for the given operation.
:rtype: lxml.etree._Element
"""
envelope, http_headers = service._binding._create(
operation_name, args, kwargs, client=self
)
return envelope
def type_factory(self, namespace):
"""Return a type factory for the given namespace.
Example::
factory = client.type_factory('ns0')
user = factory.User(name='John')
:rtype: Factory
"""
return Factory(self.wsdl.types, "type", namespace)
def get_type(self, name):
"""Return the type for the given qualified name.
:rtype: zeep.xsd.ComplexType or zeep.xsd.AnySimpleType
"""
return self.wsdl.types.get_type(name)
def get_element(self, name):
"""Return the element for the given qualified name.
:rtype: zeep.xsd.Element
"""
return self.wsdl.types.get_element(name)
def set_ns_prefix(self, prefix, namespace):
"""Set a shortcut for the given namespace."""
self.wsdl.types.set_ns_prefix(prefix, namespace)
def set_default_soapheaders(self, headers):
"""Set the default soap headers which will be automatically used on
all calls.
Note that if you pass custom soapheaders using a list then you will
also need to use that during the operations. Since mixing these use
cases isn't supported (yet).
"""
self._default_soapheaders = headers
def _get_port(self, service, name):
if name:
port = service.ports.get(name)
if not port:
raise ValueError("Port not found")
else:
port = list(service.ports.values())[0]
return port
def _get_service(self, name: typing.Optional[str]) -> str:
if name:
service = self.wsdl.services.get(name)
if not service:
raise ValueError("Service not found")
else:
service = next(iter(self.wsdl.services.values()), None)
return service
def __enter__(self):
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if hasattr(self.transport, "close"):
self.transport.close()
class AsyncClient(Client):
_default_transport = AsyncTransport
def bind(
self,
service_name: typing.Optional[str] = None,
port_name: typing.Optional[str] = None,
):
"""Create a new AsyncServiceProxy for the given service_name and port_name.
The default AsyncServiceProxy instance (`self.service`) always referes to
the first service/port in the wsdl Document. Use this when a specific
port is required.
"""
if not self.wsdl.services:
return
service = self._get_service(service_name)
port = self._get_port(service, port_name)
return AsyncServiceProxy(self, port.binding, **port.binding_options)
def create_service(self, binding_name, address):
"""Create a new AsyncServiceProxy for the given binding name and address.
:param binding_name: The QName of the binding
:param address: The address of the endpoint
"""
try:
binding = self.wsdl.bindings[binding_name]
except KeyError:
raise ValueError(
"No binding found with the given QName. Available bindings "
"are: %s" % (", ".join(self.wsdl.bindings.keys()))
)
return AsyncServiceProxy(self, binding, address=address)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None) -> None:
await self.transport.aclose()
class CachingClient(Client):
"""Shortcut to create a caching client, for the lazy people.
This enables the SqliteCache by default in the transport as was the default
in earlier versions of zeep.
"""
def __init__(self, *args, **kwargs):
# Don't use setdefault since we want to lazily init the Transport cls
from zeep.cache import SqliteCache
kwargs["transport"] = kwargs.get("transport") or Transport(cache=SqliteCache())
super().__init__(*args, **kwargs)