Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[actor_pool] Using Actorpool.map_unordered in Deap causes A worker died or was killed while executing task #3

Open
yinlinzhan opened this issue Jun 11, 2021 · 2 comments

Comments

@yinlinzhan
Copy link

It's mentioned in worker. get that this method will issue a warning if it's running inside async context. The problem I encounter is likely to be caused by this reason.

import ray
import time
from ray.util import ActorPool
from deap import algorithms, base, creator, gp, tools
......

@ray.remote(num_cpus=1)
class Ray_Deap_Map():
    def __init__(self, creator_setup=None, pset_creator=None):
        # issue 946? Ensure non trivial startup to prevent bad load balance across a cluster
        time.sleep(0.01)

        # recreate scope from global
        # For GA no need to provide pset_creator. Both needed for GP
        self.creator_setup = creator_setup
        if creator_setup is not None:
            self.creator_setup()

        self.pset_creator = pset_creator
        if pset_creator is not None:
            self.pset_creator()

    def ray_remote_eval_batch(self, f, zipped_input):
        iterable, id_ = zipped_input
        # attach id so we can reorder the batches
        return [(f(i), id_) for i in iterable]

class Ray_Deap_Map_Manager():
    def __init__(self, creator_setup=None, pset_creator=None):

        # Can adjust the number of processes in ray.init or when launching cluster
        self.n_workers = int(ray.cluster_resources()['CPU'])

        # recreate scope from global (for ex need toolbox in gp too)
        self.creator_setup = creator_setup
        self.pset_creator = pset_creator

    def map(self, func, iterable):

        if self.n_workers == 1:
            # only 1 worker, normal listcomp/map will work fine. Useful for testing code?
            ##results = [func(item) for item in iterable]
            results = list(map(func, iterable))  # forced eval to time it
        else:
            # many workers, lets use ActorPool

            if len(iterable) < self.n_workers:
                n_workers = len(iterable)
            else:
                n_workers = self.n_workers

            n_per_batch = int(len(iterable) / n_workers) + 1
            batches = [iterable[i:i + n_per_batch] for i in range(0, len(iterable), n_per_batch)]
            id_for_reorder = range(len(batches))

            eval_pool = ActorPool(
                [Ray_Deap_Map.remote(self.creator_setup, self.pset_creator) for _ in range(n_workers)])

            unordered_results = list(eval_pool.map_unordered(
                lambda actor, input_tuple: actor.ray_remote_eval_batch.remote(func, input_tuple),
                zip(batches, id_for_reorder)))

            # ensure order of batches
            ordered_batch_results = [batch for batch_id in id_for_reorder for batch in unordered_results if
                                     batch_id == batch[0][1]]

            # flatten batches to list of fitnes
            results = [item[0] for sublist in ordered_batch_results for item in sublist]

        return results

# This is what we register as map in deap toolbox.
# For GA no need to provide pset_creator. Both needed for GP
def ray_deap_map(func, pop, creator_setup=None, pset_creator=None):
    # Manager will determine if batching is needed and crate remote actors to do work
    map_ray_manager = Ray_Deap_Map_Manager(creator_setup, pset_creator)
    results = map_ray_manager.map(func, pop)
    return results

def Tool(pset, max_c, ids):
    toolbox = base.Toolbox()
    ......
    toolbox.register("evaluate", EvaluationBacktest, toolbox=toolbox, max_c=max_c, ids=ids)
    toolbox.register("map", ray_deap_map, creator_setup=Creator_Setup,
                     pset_creator=Pset_Creator)
    return toolbox

def EvaluationBacktest(individual, toolbox, max_c, ids):
    func = toolbox.compile(expr=individual)
    ......
    try:
        df = func(......)
        ......
    except:
        result = -100
    return result,


fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

Here is the error message:

2021-06-10 21:12:18,290	WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
    log_monitor.run()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
    self.open_closed_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
    self.close_all_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
    os.kill(file_info.worker_pid, 0)
SystemError: <built-in function kill> returned a result with an error set

2021-06-10 21:12:18,295	WARNING worker.py:1115 -- A worker died or was killed while executing task ffffffffffffffff151a7c83ccb29b66d20c565901000000.
Traceback (most recent call last):

  File "D:\Factor\GA\GA_LongShortStrategy.py", line 569, in <module>
    pop = algorithms.eaMuPlusLambda(pop, toolbox, mu=mu, lambda_=lambda_,

  File "C:\Users\Administrator\anaconda3\lib\site-packages\deap\algorithms.py", line 320, in eaMuPlusLambda
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)

  File "D:\Factor\GA\RayMap.py", line 89, in ray_deap_map
    results = map_ray_manager.map(func, pop)

  File "D:\Factor\GA\RayMap.py", line 70, in map
    unordered_results = list(eval_pool.map_unordered(

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 91, in map_unordered
    yield self.get_next_unordered()

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\util\actor_pool.py", line 210, in get_next_unordered
    return ray.get(future)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)

  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\worker.py", line 1483, in get
    raise value

RayActorError: The actor died unexpectedly before finishing this task.

Any help would be appreciated.

@DMTSource
Copy link
Owner

I wonder if the 'ALPHA fail' error you are seeing is similar to this:
ray-project/ray#13511 (comment)

Can you try to initialize ray like they suggest in that thread:
ray.init(log_to_driver=False)

@yinlinzhan
Copy link
Author

We've set the log_to_driver to False.

num_cpus = min(cpu_count(), 64 - 3)
num_gpus = 0
log_to_driver = False
rayparam = [num_cpus, num_gpus, log_to_driver]
ray.init(num_cpus=rayparam[0], num_gpus=rayparam[1], log_to_driver=rayparam[2])

If we set the log_to_driver to True. Sometimes we get an error report, but we can get results.

2021-06-11 19:05:38,691 WARNING worker.py:1115 -- The log monitor on node ALPHA failed with the following error:
OSError: [WinError 87] 参数错误。

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 364, in <module>
    log_monitor.run()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 285, in run
    self.open_closed_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 172, in open_closed_files
    self.close_all_files()
  File "C:\Users\Administrator\anaconda3\lib\site-packages\ray\_private\log_monitor.py", line 107, in close_all_files
    os.kill(file_info.worker_pid, 0)
SystemError: <class 'OSError'> returned a result with an error set

GA 耗时:03小时16分20秒  # GA running time: 03 hours 16 minutes 20 seconds

The code for printing the running time of the program is on the last line

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants