Skip to content

Initial hack for ractor (incomplete). #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/async/container/best.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ def self.fork?
::Process.respond_to?(:fork) && ::Process.respond_to?(:setpgid)
end

def self.ractor?
defined?(::Ractor)
end

# Determins the best container class based on the underlying Ruby implementation.
# Some platforms, including JRuby, don't support fork. Applications which just want a reasonable default can use this method.
# @returns [Class]
def self.best_container_class
if fork?
return Forked
else
els
return Threaded
end
end
Expand Down
197 changes: 197 additions & 0 deletions lib/async/container/ractor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# frozen_string_literal: true

# Copyright, 2020, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'channel'
require_relative 'error'
require_relative 'notify/pipe'

require 'async/logger'

module Async
module Container
# Represents a running child thread from the point of view of the parent container.
class Ractor < Channel
# Represents a running child thread from the point of view of the child thread.
class Instance < Notify::Pipe
# Wrap an instance around the {Thread} instance from within the threaded child.
# @parameter thread [Thread] The thread intance to wrap.
def self.for(thread)
instance = self.new(thread.out)

return instance
end

def initialize(io)
@name = nil
@thread = ::Thread.current

super
end

# Set the name of the thread.
# @parameter value [String] The name to set.
def name= value
@thread.name = value
end

# Get the name of the thread.
# @returns [String]
def name
@thread.name
end

# Execute a child process using {::Process.spawn}. In order to simulate {::Process.exec}, an {Exit} instance is raised to propagage exit status.
# This creates the illusion that this method does not return (normally).
def exec(*arguments, ready: true, **options)
if ready
self.ready!(status: "(spawn)") if ready
else
self.before_spawn(arguments, options)
end

begin
# TODO prefer **options... but it doesn't support redirections on < 2.7
pid = ::Process.spawn(*arguments, options)
ensure
_, status = ::Process.wait2(pid)

raise Exit, status
end
end
end

def self.fork(**options)
self.new(**options) do |thread|
::Thread.new do
yield Instance.for(thread)
end
end
end

# Initialize the thread.
# @parameter name [String] The name to use for the child thread.
def initialize(name: nil)
super()

@status = nil

@thread = yield(self)
@thread.report_on_exception = false
@thread.name = name

@waiter = ::Thread.new do
begin
@thread.join
rescue Exit => exit
finished(exit.error)
rescue Interrupt
# Graceful shutdown.
finished
rescue Exception => error
finished(error)
else
finished
end
end
end

# Set the name of the thread.
# @parameter value [String] The name to set.
def name= value
@thread.name = value
end

# Get the name of the thread.
# @returns [String]
def name
@thread.name
end

# A human readable representation of the thread.
# @returns [String]
def to_s
"\#<#{self.class} #{@thread.name}>"
end

# Invoke {#terminate!} and then {#wait} for the child thread to exit.
def close
self.terminate!
self.wait
ensure
super
end

# Raise {Interrupt} in the child thread.
def interrupt!
@thread.raise(Interrupt)
end

# Raise {Terminate} in the child thread.
def terminate!
@thread.raise(Terminate)
end

# Wait for the thread to exit and return he exit status.
# @returns [Status]
def wait
if @waiter
@waiter.join
@waiter = nil
end

return @status
end

# A pseudo exit-status wrapper.
class Status
# Initialise the status.
# @parameter error [::Process::Status] The exit status of the child thread.
def initialize(error = nil)
@error = error
end

# Whether the status represents a successful outcome.
# @returns [Boolean]
def success?
@error.nil?
end

# A human readable representation of the status.
def to_s
"\#<#{self.class} #{success? ? "success" : "failure"}>"
end
end

protected

# Invoked by the @waiter thread to indicate the outcome of the child thread.
def finished(error = nil)
if error
Async.logger.error(self) {error}
end

@status = Status.new(error)
self.close_write
end
end
end
end
43 changes: 43 additions & 0 deletions lib/async/container/ractored.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require_relative 'generic'
require_relative 'thread'

module Async
module Container
# A multi-thread container which uses {Ractor}.
class Ractored < Generic
# Indicates that this is not a multi-process container.
def self.multiprocess?
false
end

# Start a named ractor and execute the provided block in it.
# @parameter name [String] The name (title) of the ractor.
# @parameter block [Proc] The block to execute in the ractor.
def start(name, &block)
Ractor.new(name: name, &block)
end
end
end
end