You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.
import ray
import time
from ray.util import ActorPool
from deap import algorithms, base, creator, gp, tools
......
@ray.remote(num_cpus=1)
class Ray_Deap_Map():
def __init__(self, creator_setup=None, pset_creator=None):
# issue 946? Ensure non trivial startup to prevent bad load balance across a cluster
time.sleep(0.01)
# recreate scope from global
# For GA no need to provide pset_creator. Both needed for GP
self.creator_setup = creator_setup
if creator_setup is not None:
self.creator_setup()
self.pset_creator = pset_creator
if pset_creator is not None:
self.pset_creator()
def ray_remote_eval_batch(self, f, zipped_input):
iterable, id_ = zipped_input
# attach id so we can reorder the batches
return [(f(i), id_) for i in iterable]
class Ray_Deap_Map_Manager():
def __init__(self, creator_setup=None, pset_creator=None):
# Can adjust the number of processes in ray.init or when launching cluster
self.n_workers = int(ray.cluster_resources()['CPU'])
# recreate scope from global (for ex need toolbox in gp too)
self.creator_setup = creator_setup
self.pset_creator = pset_creator
def map(self, func, iterable):
if self.n_workers == 1:
# only 1 worker, normal listcomp/map will work fine. Useful for testing code?
##results = [func(item) for item in iterable]
results = list(map(func, iterable)) # forced eval to time it
else:
# many workers, lets use ActorPool
if len(iterable) < self.n_workers:
n_workers = len(iterable)
else:
n_workers = self.n_workers
n_per_batch = int(len(iterable) / n_workers) + 1
batches = [iterable[i:i + n_per_batch] for i in range(0, len(iterable), n_per_batch)]
id_for_reorder = range(len(batches))
eval_pool = ActorPool(
[Ray_Deap_Map.remote(self.creator_setup, self.pset_creator) for _ in range(n_workers)])
unordered_results = list(eval_pool.map_unordered(
lambda actor, input_tuple: actor.ray_remote_eval_batch.remote(func, input_tuple),
zip(batches, id_for_reorder)))
# ensure order of batches
ordered_batch_results = [batch for batch_id in id_for_reorder for batch in unordered_results if
batch_id == batch[0][1]]
# flatten batches to list of fitnes
results = [item[0] for sublist in ordered_batch_results for item in sublist]
return results
# This is what we register as map in deap toolbox.
# For GA no need to provide pset_creator. Both needed for GP
def ray_deap_map(func, pop, creator_setup=None, pset_creator=None):
# Manager will determine if batching is needed and crate remote actors to do work
map_ray_manager = Ray_Deap_Map_Manager(creator_setup, pset_creator)
results = map_ray_manager.map(func, pop)
return results
def Tool(pset, max_c, ids):
toolbox = base.Toolbox()
......
toolbox.register("evaluate", EvaluationBacktest, toolbox=toolbox, max_c=max_c, ids=ids)
toolbox.register("map", ray_deap_map, creator_setup=Creator_Setup,
pset_creator=Pset_Creator)
return toolbox
def EvaluationBacktest(individual, toolbox, max_c, ids):
func = toolbox.compile(expr=individual)
......
try:
df = func(......)
......
except:
result = -100
return result,
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
Here is the error message:
2021-06-10 21:12:18,290 WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
log_monitor.run()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
self.open_closed_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
self.close_all_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
os.kill(file_info.worker_pid, 0)
SystemError: <built-in function kill> returned a result with an error set
2021-06-10 21:12:18,295 WARNING worker.py:1115 -- A worker died or was killed while executing task ffffffffffffffff151a7c83ccb29b66d20c565901000000.
Traceback (most recent call last):
File "D:\Factor\GA\GA_LongShortStrategy.py", line 569, in <module>
pop = algorithms.eaMuPlusLambda(pop, toolbox, mu=mu, lambda_=lambda_,
File "C:\Users\Administrator\anaconda3\lib\site-packages\deap\algorithms.py", line 320, in eaMuPlusLambda
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
File "D:\Factor\GA\RayMap.py", line 89, in ray_deap_map
results = map_ray_manager.map(func, pop)
File "D:\Factor\GA\RayMap.py", line 70, in map
unordered_results = list(eval_pool.map_unordered(
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 91, in map_unordered
yield self.get_next_unordered()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 210, in get_next_unordered
return ray.get(future)
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\worker.py", line 1483, in get
raise value
RayActorError: The actor died unexpectedly before finishing this task.
Any help would be appreciated.
The text was updated successfully, but these errors were encountered:
If we set the log_to_driver to True. Sometimes we get an error report, but we can get results.
2021-06-11 19:05:38,691 WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
log_monitor.run()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
self.open_closed_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
self.close_all_files()
File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
os.kill(file_info.worker_pid, 0)
SystemError: <class 'OSError'> returned a result with an error set
GA 耗时:03小时16分20秒 # GA running time: 03 hours 16 minutes 20 seconds
The code for printing the running time of the program is on the last line
It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.
Here is the error message:
Any help would be appreciated.
The text was updated successfully, but these errors were encountered: