-
Notifications
You must be signed in to change notification settings - Fork 24
/
rray.py
106 lines (81 loc) · 2.76 KB
/
rray.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#! /usr/bin/env python
#
# Copyright 2023 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors: Philip G. Brodrick, [email protected]
# Authors: James Montgomery
"""
Ray Wrapper module to circumvent the ray package while maintaining ray-like
syntax in the code. Borrows directly from the implementation in isofit
(see isofit/isofit/wrappers/ray.py)
To enable, set the environment variable `GHG_DEBUG` to any value before
runtime. For example:
$ export ISOFIT_DEBUG=1
$ python parallel_mf.py ...
Additionally, you may pass it as a temporary environment variable via:
$ ISOFIT_DEBUG=1 python parallel_mf.py ...
"""
import logging
import ray
class Remote:
def __init__(self, obj, *args, **kwargs):
self.obj = obj
self.args = args
self.kwargs = kwargs
def __getattr__(self, key):
"""
Returns a Remote object on the key being requested. This enables
ray.remote(Class).func.remote()
"""
return Remote(getattr(self.obj, key))
def remote(self, *args, **kwargs):
return Remote(self.obj, *args, **kwargs)
def get(self):
return self.obj(*self.args, **self.kwargs)
def __repr__(self):
return f"<Remote({self.obj})>"
def __getattr__(key):
"""
Reports any call to Ray that is not emulated
"""
print(f"__getattr__({key})")
logging.error(f"Unsupported operation: {key!r}")
return lambda *a, **kw: None
def remote(obj):
return Remote(obj)
def init(*args, **kwargs):
logging.debug("Ray has been disabled for this run")
def get(jobs):
if hasattr(jobs, "__iter__"):
return [job.get() for job in jobs]
else:
return jobs.get()
def put(obj):
return obj
def shutdown(*args, **kwargs):
pass
class util:
class ActorPool:
def __init__(self, actors):
"""
Emulates https://docs.ray.io/en/latest/_modules/ray/util/actor_pool.html
Parameters
----------
actors: list
List of Remote objects to call
"""
self.actors = [Remote(actor.get()) for actor in actors]
def map_unordered(self, func, iterable):
return [func(*pair).get() for pair in zip(self.actors, iterable)]