Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

julep: a @select statement #13763

Open
malmaud opened this issue Oct 26, 2015 · 58 comments
Open

julep: a @select statement #13763

malmaud opened this issue Oct 26, 2015 · 58 comments
Labels
julep Julia Enhancement Proposal multithreading Base.Threads and related functionality parallelism Parallel or distributed computation

Comments

@malmaud
Copy link
Contributor

malmaud commented Oct 26, 2015

Motivation

Now that we have channels, the utility for a mechanism to wait on multiple events is especially high. Go has proven that the combination of channels and select is a strong unifying mechanism for asynchronous programming such as fine-grained synchronization and building complex parallel pipelines. Here's one simple demonstration of how select works in Go and the kinds of program it enabled; I won't fully go into the utlity of select here since so much is already generally available.

Syntax

I propose taking inspiration from Go's syntax:

    @select begin
        if c1 |> x
            "Got $x from c1"
        elseif c2
            "Got a message from c2"
        elseif c3 <| :write_test
            "Wrote to c3"
        elseif task |> z
            "Task finished and returned $z"
        end
    end

This will wait for a value to be on available on channel c1 or c2, or capacity for a value to be available on channel c3, or for task task to complete. Whichever completes first will then go on to execute its corresponding body. I'll call statements such as c3 <: :write_test a "clause". I call the actual values that will be waited-upon (like "c3") an "event" (I'm not attached to this nomenclature at all).

A complimentary non-blocking form is available by providing a default else block to @select, which will execute immediately if none of the clauses are immediately available.

A clause has three possible forms:

  1. event |> value (a "take" clause kind)

If event is an AbstractChannel, wait for a value to become available in the channel and assign take!(event) to value.
if event is a Task, wait for the task to complete and assign value the return value of the task.

  1. event |< value (a "put" clause kind)

Only supported for AbstractChannels. Wait for the channel to have capacity to store an element, and then call put!(event, value).

  1. event ("a "wait" clause kind)

Calls wait on event, discarding the return value. Usable on any "waitable" events", which include with a wait method defined (most commonly, channels, tasks,Condition` objects, and processes.)

A functional form of the macro, select, will be provided as well in the case that the number of clauses is not known statically. However, as in go, the macro variant is the preferred style whenever feasible.

Implementation

In #13661, I provide a simple but functional and reasonably performant implementation. I suspect that this implementation will go through rounds of optimization and gain support for multithreading before Julia 1.0, but it gets the ball rolling.

Non-blocking case

This case is simpler: a function _isready is defined that decides if a clause is immediately available for execution. For abstract channels, it will check if it has a value that is ready to be taken for a "take" clause kind; or ready to have a value put into it for a "put" clause type; for tasks, it will check if the task is done. No other event types are supported. This is a limitation of wait being edge-triggered: for general objects with wait defined, there isn't a pre-defined notion of what it means to have a value "available".

Blocking case

One task is scheduled for each clause (this set is called the "waiters"):

  1. Each task calls _wait on its event within a try-catch block. _wait called on channel events will wait for the channel to have a value available for taking or putting, depending on the clause type. _wait for a task will wait for the task to finish, returning the tasks return value. Otherwise, _wait will just call wait on the event.

  2. When a task gets past its _wait (and hence becomes the "winner"), it signals to all its "rival" tasks (all other waiters) to terminate before modifying the state of their events. To signal the blocked tasks, it throws to the rival a custom error type that interrupts the riva's _wait statement, and also passes to the rival a reference to the winner task. The rival processes the error in a "catch" block, returns control to the winner, and then exists.

If the rival hasn't yet been scheduled, it is deleted from the workqueue. This scheme ensures only one waiter will end up manipulating the state of its event, no matter how the waiters are scheduled or what other tasks happen to be scheduled.

For channels, it is important that between the call to _wait and the call to take! or put!, no other task manipulates the channel. In this scheme, the scheduler is never run between those calls: all that happens is a sequence of yieldto calls amongst the waiters.

This correctness is with respect to Julia's current cooperative multitasking; additional locking will have to be done to be thread-safe. As the complexity of go's select implementation shows, getting correct and high-performant select functionality in the presence of preemptive multithreading is a subtle problem that I don't think should block us from a reasonable @select implementation that works in Julia today.

@malmaud malmaud added parallelism Parallel or distributed computation julep Julia Enhancement Proposal labels Oct 26, 2015
@ssfrr
Copy link
Contributor

ssfrr commented Oct 26, 2015

I really like the idea of a nice way to do select, but I wonder if there's a way to do it with a bit less special syntax? In particular the |> and <| operators seem easy to confuse. What do you think about a @case macro that takes a (maybe blocking) expression and an expression that gets evaluated when the first expression finishes? That way anything that blocks can be used (e.g. streams). If one of the cases doesn't block (e.g. when there's data available in a channel) then it just wins and the other cases don't get run.

@select begin
    @case x = read(c1) begin
        "Got $x from c1"
    end
    @case take!(c2) begin
        "Got a message from c2"
    end
    @case write(c3, :write_test) begin
        "Wrote to c3"
    end
    @case z = wait(task) begin
        "Task finished and returned $z"
    end
    @case sleep(3) begin
        "waited for 3 seconds and nothing happened"
    end
end

Downsides to this:

  1. less concise.
  2. problems if one of the waiting expressions swallows exceptions because the throwto won't work properly to cancel that task
  3. problems if one of the waiting expressions modifies some state before calling a nested blocking function

2 and 3 may be a good argument against something that works generally on anything that blocks, as people might not realize the requirements for working within a @select.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 26, 2015

  1. Ya, I definitely like how that is less magical. But I do worry that for people writing a lot of select statements, as could certainly happen if you're using them to implement a multilayer pipeline, all those begins and ends could get cumbersome. A hybrid is also possible might bring back some conciseness to that, where the if/else part is kept but the clause syntax uses the kind of normal-looking Julia expressions you're proposing:
@select begin
  if x=take!(c2)
    ...
  elseif put!(c3, :test)
   ...
  end
end

Or if we really don't want to overload the meaning of if/else, maybe there's still some way to not have to use so many begin/ends. @parallel essentially overloads the meaning of for, so there is already precedent for multiprocessing macros to overload the meaning of control structures.

  1. Since select, as I proposed it, works on tasks, you could just wrap your clause in a task:
@case @schedule(write(...))

so I think this point really amounts to a change in syntax (whether the macro will automatically do that, essentially) but not semantics. By allowing arbitrary objects that have wait defined to be used in clauses, we're already accepting that non-selected cases can have arbitrary side effects.

  1. I think that situation is mitigated by the above point about implicitly wrapping in tasks. The exception will always be caught by the waiter, interrupting its wait(::Task) call. However, that doesn't affect the underlying task.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 26, 2015

Just to clarify though, even if you make the syntax for clauses look like a regular Julia expression like x=take!(channel), the macro must still do some magic and translate it to something like

try
  wait(channel)
catch err
  isa(err, SelectInterrupt) && return
end
kill_rivals()
x=take!(channel)

since

try
  x=take!(channel)
catch SelectInterrupt
   ...

can lead to race conditions involving other rivals that are directly or indirectly waiting to put into channel.

That said, I still do like that more normal-looking syntax.

@Tetralux
Copy link
Contributor

@malmaud

quote
    if x = take!(c)
        println(x)
    end
end

This raises a syntax error due to the assignment: ERROR: syntax: unexpected "=".
Although, I'm not sure why. Seems like an oversight.
I mean, an expression for a macro doesn't need to be valid Julia expression, right?

@malmaud
Copy link
Contributor Author

malmaud commented Oct 26, 2015

That's a good point actually - expressions passed to a macro do have to syntactically valid Julia expressions, since it is the AST that is passed to the macro. So if x=take!(c) ... syntax is off the table.

@durcan
Copy link

durcan commented Oct 27, 2015

I would love a go style select statement, but why not use Julia's Pair type to express the cases (sort of like how Match.jl expresses control flow)? This syntax is concise, allows variable binding, does not overload if / else, and is very easy to compose. Here is what I mean:

@select begin
    x = read(c1)           =>  "Got $x from c1"
    take!(c2)              =>  "Got a message from c2"
    write(c3, :write_test) =>  "Wrote to c3"
    z = wait(task)         =>  "Task finished and returned $z"
    sleep(3)               =>  "waited for 3 seconds and nothing happened"
end

The obvious major problem with this in my view is the non-standard semantic of variable binding. The line

 x = read(c1)         =>  "Got $x from c1"

In the context of the macro, is obviously trying to mean

 Pair(x = read(c1) ,  "Got $x from c1")

But gets parsed by Julia as

 x = Pair(read(c1) ,  "Got $x from c1")

This can be fixed in the macro, but may cause a lot of confusion. I personally have no problem using some other infix operator (like |>) for variable binding, but I suppose the orthodox way of doing this is to realize that Julia provides a built in local binding mechanism and just use that:

@select begin
    let x = read(c1)
        x                  =>  "Got $x from c1"
    end
    take!(c2)              =>  "Got a message from c2"
    write(c3, :write_test) =>  "Wrote to c3"
    let z = wait(task)
        z                  =>  "Task finished and returned $z"
    end
    sleep(3)               =>  "waited for 3 seconds and nothing happened"
end

@malmaud
Copy link
Contributor Author

malmaud commented Oct 27, 2015

Like you say, the precedence of => seems to make this unworkable (although
it does look good and I think some people are still debating what
precedence => should end up having).

With the 'let' notion, how would you express 'put' clauses? Eg, try to put
'x' into 'c', and if that happens, execute this block of code.
On Tue, Oct 27, 2015 at 5:46 PM Brett Cornell [email protected]
wrote:

I would love a go style select statement, but why not use Julia's Pair
type to express the cases (sort of like how Match.jl
https://github.com/kmsquire/Match.jl expresses control flow)? This
syntax is concise, allows variable binding, does not overload if / else,
and is very easy to compose. Here is what I mean:

@select begin
x = read(c1) => "Got $x from c1"
take!(c2) => "Got a message from c2"
write(c3, :write_test) => "Wrote to c3"
z = wait(task) => "Task finished and returned $z"
sleep(3) => "waited for 3 seconds and nothing happened"end

The obvious major problem with this in my view is the non-standard
semantic of variable binding. The line

x = read(c1) => "Got $x from c1"

In the context of the macro, is obviously trying to mean

Pair(x = read(c1) , "Got $x from c1")

But gets parsed by Julia as

x = Pair(read(c1) , "Got $x from c1")

This can be fixed in the macro, but may cause a lot of confusion. I
personally have no problem using some other infix operator (like |>) for
variable binding, but I suppose the orthodox way of doing this is to
realize that Julia provides a built in local binding mechanism and just use
that:

@select begin
let x = read(c1)
x => "Got $x from c1"
end
take!(c2) => "Got a message from c2"
write(c3, :write_test) => "Wrote to c3"
let z = wait(task)
z => "Task finished and returned $z"
end
sleep(3) => "waited for 3 seconds and nothing happened"end


Reply to this email directly or view it on GitHub
#13763 (comment).

@durcan
Copy link

durcan commented Oct 27, 2015

Unless I am missing something, 'put' does not require binding a new name.

@select begin
    put!(c1, :a)  =>  "put :a in c1"
    put!(c2, :b)  =>  "put :b in c2"
end

Even if it did somehow did, what would prevent you from using a let block? I disagree that the current precedence of = vs => makes this unworkable. It would just mean that using = for variable binding without a let block might be a bad idea. We could go with your original binding idea after all:

@select begin
    take!(c1) |> x  =>  "took $x from c1"
    take!(c2) |> y  =>  "took $y from c2"
end

I just happen to like let

@StefanKarpinski
Copy link
Member

Let's avoid focusing on the syntax since if we want this in the language it will get its own keyword. Just make it work with a macro and the most basic syntax possible.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 27, 2015

Yes, I definitely like that =>-based syntax more than overloading if. The problem with using let here is it's a bit of a lie, because you couldn't write something like

@select begin
  let x=take!(c1), y=1
    x=>"got $x and y is $y")
   end
end

You could only use exactly the form

@select begin
  let x=...
     x=> ...
   end
end

which seems a violation of DRY, as well as a bit awkward for the common case where clauses are take! statements on channels where the return value is going to be used in a code block.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 27, 2015

@StefanKarpinski Well, #13661 has a working macro with my original proposed syntax. Are you suggesting we just talk about critiquing implementation strategy, or what are the next steps you're thinking of? I didn't realize it was going to be a keyword, since @parallel is a macro and that's been working out.

@StefanKarpinski
Copy link
Member

Well, maybe it won't – but it does seem like a pretty good candidate for a keyword.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 27, 2015

Sure, whichever. But how should we proceed from here? Maybe get @amitmurthy's opinion once he's back from his travels?

@durcan
Copy link

durcan commented Oct 28, 2015

For ease of use by those who want to test this functionality out, it might be a good idea to make this @select macro available as a package rather than as a pull request for inclusion in Base.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 28, 2015

You can always just merge it to your local Julia checkout.
On Tue, Oct 27, 2015 at 8:30 PM Brett Cornell [email protected]
wrote:

For ease of use by those who want to test this functionality out, it might
be a good idea to make this @select macro available as a package rather
than as a pull request for inclusion in Base.


Reply to this email directly or view it on GitHub
#13763 (comment).

@durcan
Copy link

durcan commented Oct 28, 2015

I don't have a built-from-source version of Julia readily available so I went ahead and packaged up your select code. I did take the liberty of changing your overloaded if /else to => but I left pretty much everything else alone. So, your original example would be:

@select begin
    c1 |> x           => "Got $x from c1"
    c2                => "Got a message from c2"
    c3 <| :write_test => "Wrote to c3"
    task |> z         => "Task finished and returned $z"
end

Anyway I have run into some problems with the actual functionality of the macro. Here is the closest translation I was able to achieve of that go example linked earlier:

function fibonacci(c, q)
    x, y = 0, 1
    ret = true
    while ret
        @select begin
            c <| x => x, y = y, x+y
            q      => begin
                println("quit")
                ret = false
            end
        end
    end
end

function main()
    c = Channel{Int64}(32)
    q = Channel{Int64}(32)

    @schedule begin
        for i in 0:9
            take!(c) |> println
        end
        put!(q, 0)
    end

    fibonacci(c, q)
end

main()

A more literal rendition of the fibonacci function:

function fibonacci(c, q)
    x, y = 0, 1
    while true
        @select begin
            c <| x => x, y = y, x+y
            q      => begin
                println("quit")
                return
            end
        end
    end
end

Results in a misplaced return statement error. Using a break instead of return results in a "break or continue outside loop" error. It seems that as is, the select macro limits the kind of control flow that can happen from the inside of the chosen body.

@durcan
Copy link

durcan commented Oct 29, 2015

This problem stems from the context in which the macro places the selected expression. Currently, a set of branch tasks is created, one for each case. These branches wait on their events, and as soon as one "wins" it:

  1. Kills all its rival branches.
  2. Evaluates the selected expression.
  3. Sends the result of the selected expression back to the original context via a Channel.

This is why return or break statements are a problem. I have fixed this by simply moving the evaluation of the selected expression back into the original context. Now, after a task wins, the ordering looks sort of like:

  1. Kills all its rival branches.
  2. Sends its branch ID number back to the original context via a Channel.
  3. Evaluate the expression that corresponds to the winning branch ID in the original context.

I have spent less time thinking about the non-blocking case, but at this point they do most of the things I naively expect them to.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 29, 2015

I think this is vulnerable to a race condition where if you're waiting for a channel to have a value for taking, it might notify the waiter waiting for it but by the time the take! statement is executed, another task has already taken the value before the body executed. That's what motivated this design in the first place.

@durcan
Copy link

durcan commented Oct 29, 2015

You are absolutely right. put! and take! need to be called from inside the winning branch task. The evaluation of the associated expression, however, really needs to happen in the original parent task. This also means that any variable binding that results from a take! needs to happen in the original parent task. This is fixable.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

Agreed
On Thu, Oct 29, 2015 at 7:34 PM Brett Cornell [email protected]
wrote:

You are absolutely right. put! and take! need to be called from inside
the winning branch task. The evaluation of the associated expression,
however, really needs to happen in the original parent task. This also
means that any variable binding that results from a take! needs to happen
in the original parent task. This is fixable.


Reply to this email directly or view it on GitHub
#13763 (comment).

@durcan
Copy link

durcan commented Oct 30, 2015

This should be fixed now.

@malmaud
Copy link
Contributor Author

malmaud commented Oct 30, 2015

Great. Do you want to submit it as a PR against the select branch? Or I
can get around to it sometime and mark you as the author of the commit.
On Fri, Oct 30, 2015 at 3:12 AM Brett Cornell [email protected]
wrote:

This should be fixed now.


Reply to this email directly or view it on GitHub
#13763 (comment).

@durcan
Copy link

durcan commented Oct 31, 2015

I am not super familiar with git, but I am sure I can figure out how to submit a PR against the select branch. It may come as one giant commit though.

Before I do there are three other strange behaviors of the non-blocking form (like the one with default cases) that I am going to address.

  1. Right now the default case needs to come last. This:
c1 = Channel()
c2 = Channel()
put!(c1, 0)
@select begin
    c1 |> x => "c1: $x"
    c2 |> x => "c2: $x"
    _ => "default"
end

returns "c1: 0" as expected. However if we place the default case first:

c1 = Channel()
c2 = Channel()
put!(c1, 0)
@select begin
    _ => "default"
    c1 |> x => "c1: $x"
    c2 |> x => "c2: $x"
end

it returns "default". This could be the intended behavior, but I think it makes more sense to have the default case behave the same regardless of position (that is, only use the default if nothing else if ready).

  1. Currently 2 or more default cases are allowed. I would want this to throw an error, but all it does now is default to the first one.

  2. Also, the chosen expression must evaluate to a valid rvalue (so an expression that ends in return does not work for example). I took some care to avoid this problem in the blocking case, and I will use the same fix to deal with it here.

@ArchRobison
Copy link
Contributor

+1 for giving it a language level syntax, since it's the if-else of message-passing programming. One thing I didn't know until recently is that this style also enables timeouts without littering APIs with timeout parameters.

@malmaud
Copy link
Contributor Author

malmaud commented Nov 6, 2015

Indeed, using select for timeouts is what inspired me initially to push for this.

My question for making it language level syntax is whether there's actually some syntax someone had in mind that isn't possible with a macro, or if it's really just about eliminating the @ to convey a social signal that people should think of select as "first-class" (not that there anything's wrong with that, per se).

But if it's the latter, then why should @parallel and @spawn, which are the primary entry points to data parallelism and task parallelism, remain macros? Is that an inconsistency that will end up confusing people?

@toivoh
Copy link
Contributor

toivoh commented Nov 6, 2015

We should consider a syntax for importing a macro under a different name than the original.

@durcan
Copy link

durcan commented Nov 6, 2015

@vtjnash, I agree that if q in the above example had been a Condition instead of a Channel there could have been a race condition. I really think that this sort of a select construct should be limited to level-triggering things like AbstractChannels.

to me, the ability to handle several conditions without accidentally missing a message while trying to service some other message, or creating a task to handle each one (which might throw an error and die instead of actually forwarding the desired message) were my biggest motivations for creating #6563

I am not sure exactly what you are asking for here. If you want to use select with a Condition just wrap it in a task to turn it into a level trigger:

c1 = Condition()
c2 = Condition()
t = @schedule wait(c2)
while true
    @select begin
        c1 =>  println("this could be missed")
        t  =>  println("no race condition")
        ...
        ...
    end
end

If you really don't want to create a task to handle each Condition I am not really sure how to proceed with the language as it is ( I mean, all notify does is re-schedule waiting tasks after all). How would you like something like this to be implemented?

As for the naming thing, it would be very convenient to be able to re-name imports that conflict with stuff in Base, but that may be a separate discussion.

@NHDaly
Copy link
Member

NHDaly commented Jan 28, 2019

Hey! :)

As multithreading support approaches, has there been any progress on this?

@durcan's Select.jl package hasn't been updated in a while. Is there an alternative being used in the wild? Or are we making do without it for now (since it's maybe not as important without parallel Tasks?)..

@NHDaly
Copy link
Member

NHDaly commented Jan 28, 2019

As far as I can tell from reading the above, we seemed to have settled on a pretty good implementation that is safe as long as you restrict it to "level-triggering things like AbstractChannels".

There is a potential naming conflict with SQL-like select operations, which still needs to be addressed.
It seems not unreasonable to me to choose an entirely different name for this than Go does, instead of select. Especially considering that we made a new name for our coroutines (Tasks), and we spawn them with @async, not go (😉).

  • Maybe we could do something that's just a more verbose variant of select? Like selectfirst or selectready? Or selecttask, selectasync, or selectchannel?
  • Or something different entirely like doready or dofirst or runfirst or onready or even just on?
    • I kinda like onready, actually?
      onready
          take!(c1) => 4
          put!(c2, true) => take!(c3)
      end
      Or the macro variant, @onready begin ... end

Is there anything left to decide on besides the name? It sounds like we were mostly happy with the implementation presented above?

@NHDaly
Copy link
Member

NHDaly commented Jan 29, 2019

TBH i don't really necessarily think select is that great of a name... It's not clear at all what it does just from reading a block of code; you need to have had the concept explained to you before you can understand it. So i kind of like something like onready/dofirst/runfirst/doready since I think it is a bit less mysterious!

Or something different entirely

Another option:

  • proceed?
    I think this is the most similar to select that I've come up with. It has the same feel (and is maybe equally vague/unclear haha.) This came from listening to Rob Pike explain select here, where he said "control the behavior of your program based on what communications are able to proceed at any moment."
    • Or related: onproceed, proceedable, canproceed, canrun

@JeffBezanson JeffBezanson added the multithreading Base.Threads and related functionality label Jun 4, 2019
@StefanKarpinski
Copy link
Member

Multi-argument wait would be a nice way to write this, i.e. wait(ch1, ch2, ch3) would return when one of the arguments is ready and returns the channel that’s ready. Unfortunately, the signature of wait for remote channels is wait(r::RemoteChannel, args...). Why?!? 😭

@NHDaly
Copy link
Member

NHDaly commented Jul 22, 2019

would return when one of the arguments is ready and returns the channel that’s ready

So to do the other half of the select statement, would you just write an if-else to check which channel returned?

I.e.

ch = wait(ch1, ch2, ch3)
if ch == ch1
    ...
elseif ch == ch2
    ...
elseif ch == ch3
    ...
else
    # Default / timeout case
end

?

It seems not quite as cute/handy/pithy as the select statement i guess

@StefanKarpinski
Copy link
Member

I guess that's bad if you want to do different things when different things are ready, but often you want to do the same thing for all of the things you're waiting on, which is annoying to express with select.

@StefanKarpinski
Copy link
Member

Maybe

@select begin
    ch1 => expr1
    ch2 => expr2
    ch3 => expr3
end

@amitmurthy
Copy link
Contributor

Multi-argument wait would be a nice way to write this, i.e. wait(ch1, ch2, ch3) would return when one of the arguments is ready and returns the channel that’s ready. Unfortunately, the signature of wait for remote channels is wait(r::RemoteChannel, args...). Why?!? 😭

Answering the Why?!? 😭 - If I remember correctly, it is because the backing store of a RemoteChannel need only be an AbstractChannel. You can create a RemoteChannel backed by a Dict. An example of a dictionary that can be used as the backing store for a RemoteChannel can be seen here - https://github.com/JuliaAttic/Examples/blob/master/dictchannel.jl

For such a RemoteChannel, you could actually wait for a specific key becoming available.

The thinking at that time was to allow for folks to build different communication patterns (for example publish-subscribe where you may publish/subscribe to a specific "topic") by making the backing store flexible enough to support them.

@StefanKarpinski
Copy link
Member

Thanks for the explanation, @amitmurthy.

@NHDaly
Copy link
Member

NHDaly commented Sep 3, 2019

Maybe

@select begin
    ch1 => expr1
    ch2 => expr2
    ch3 => expr3
end

@StefanKarpinski, reading through the discussions above, it sounds like a lot of the syntax-specific questions were around the best way to refer to specify all the ways one might want to wait on a channel: wait on a take!, wait on a put!, wait until there's any value (maybe? does go have this one?). In particular, needing a nice way to give a name to whatever value is read out of the channel.

That's what led to the syntaxes proposed in #13763 (comment) and #13763 (comment), namely:

@select begin
    c1 |> x           => "Got $x from c1"
    c2                => "Got a message from c2"
    c3 <| :write_test => "Wrote to c3"
    task |> z         => "Task finished and returned $z"
    _                 => "default"
end

Which is pretty similar to your suggestion.
It seems like the next step would be to update https://github.com/durcan/Select.jl for the latest julia, and start playing with it?

@NHDaly
Copy link
Member

NHDaly commented Sep 4, 2019

It seems like the next step would be to update https://github.com/durcan/Select.jl for the latest julia, and start playing with it?

@StefanKarpinski okay I've gone ahead and done that here! :)
NHDaly/Select.jl#1

I've updated that package to Julia 1.3+, multithreaded it (@spawn instead of @async the clauses), made it thread-safe (grab a separate "clause-lock" so only one of the clauses can proceed even if multiple finish waiting at the same time), and fixed a couple weird concurrency bugs. :)

So next, i think we play with it, see if it works, fix bugs, and start brainstorming about the syntax a bit more concretely?

@gfZeng
Copy link

gfZeng commented Oct 13, 2019

I more like Clojure's glossary, use @alts instead of @select

@gfZeng
Copy link

gfZeng commented Oct 23, 2019

Maybe we can take the syntax from Clojure.

First, let's look at following definitions:

  • fn is an callback function when take!(or put!) executed succeed
  • for the same exclusive_lock make sure only have one take!(or put!) executed.
take!(fn, c, exclusive_lock)
put!(fn, c, v, exclusive_lock)

Now, we do select like this:

begin
    lck = ExclusiveLock()

    @async take!(c, lck) do val
    end

    @async put!(c, val2, lck) do val2
    end
    
end

Well, use macro more convenient:

@alt begin
    take!(c1) do val
    end

    put!(c1, val2) do val2
    end
end

Futher more, sometimes, we need to select on channels that counts is uncertain:

ChannelOp{T} = Union{Channel, Tuple{Channel,T}, Tuple{Symbol,T}} where T
function alts(fn::Function, chops::Vector{ChannelOp}) end

alts usage:

chops::Vector{ChannelOp} = [c1]
push!(chops, (c2, val2))
push!(chops, (:default, val3))
alts(chops) do ch, val
end

@tkf
Copy link
Member

tkf commented Jan 27, 2020

FYI, there are interesting arguments that select may not be a useful building block for concurrent programming. It may be better to get structured concurrency #33248 working first to see how much code really need select in practice.

Some interesting arguments against select as a primitive:

--- python-trio/trio#242 (comment)

This article seems relevant here: https://medium.com/@elizarov/deadlocks-in-non-hierarchical-csp-e5910d137cc

--- python-trio/trio#242 (comment)

The last link (Deadlocks in non-hierarchical CSP - Roman Elizarov - Medium) explains how you can get deadlocks in rather "innocent" code using select.

@c42f
Copy link
Member

c42f commented Sep 7, 2021

Just to link some things up, the latest on this is @tkf's cool extensible select in Julio.jl, based on Reagents.jl:

@tkf
Copy link
Member

tkf commented Sep 8, 2021

I realized that what I did sounds like very opposite of my comment just above :)

The main observation for me was that supporting select of arbitrary synchronizations ("events") is a superset of supporting robust cancellation for these synchronizations (which I wanted to have for structured concurrency). I explained some of this in Cancellable containers · Reagents. Furthermore, select lets us build other interesting concurrent programs. (For more contexts, see JuliaConcurrent/Reagents.jl#5)

Of course, "interesting" often means "more bugs." So, I think the links I quoted still explain very good points. select is maybe like @generated.

@JanisErdmanis
Copy link

I got bit by this issue when I tried to write a persistence layer for two communicating finite state machines; thus, I tried to tackle this issue. The proposed Select.jl interface is a bit restrictive as multiple dispatch can sometimes be more convenient for branching. Also, looking at the code samples on how channels and select statements are used in practice, they, for most cases, are within loops and most of the instances receives messages [1]. Thus I propose an API a bit similar to Erlang's:

let

state = 0

receive(quit, results) do ch, value

    if ch == quit
        close(quit)
        return Break()
    end

    state = value

end

end

Note that it would be great to write a simple break in the do block statement instead of return Break() as it is done now.

Implementing such receive was the hard part. With Select.jl such implementation would create and kill rival tasks at each iteration which I found wasteful. There is also a starvation issue in such implementation that prevents receiving a message from one channel while another one is flooded. Hence I also tried implementation where rival tasks are permanent and push results to the main channel [2].

I got curious about how Go implements the select statement and found a rather good high-level explanation on StackExchange [3] which also pointed me to the code [4]. I attempted to do something similar (I guess) by setting wait_cond for every channel in the receive function to point to the same condition, thus enabling to get out of a blocking wait when a put! on any of the channels is called. To enter and exit such receive without losing any inputs, I needed to patch the put_unbuffered function with a simple check on whether between waiting on a lock and getting one it had already changed.

The performance seems excellent, and I get about 5x speedup over the implementation with Select.jl and about 2x speedup over the CSP version. I found it comparable with a single channel performance and two producers, as previously suggested in discourse [5]. I am also guaranteed to not suffer from starvation issues as I permute order for iterations. One downside is that the implementation is a heck of a code, and it seems a bit racy, which could put it in a deadlock. However, many cases work as expected [6], and perhaps we can resolve these potential unforeseen issues.

To summarize, there are two internal changes which would be needed:

  • Adding relocking for put_unbuffered method like done in [7]. Perhaps the same is needed for put_buffered.
  • receive needs exclusive access for taking values from a channel. Thus, there is a way to store such information within a channel so that runtime could throw an error to a user in case of misuse. Something like adding ch.status = :taken seems like the easiest way to go forward.
  • Also, it would be nice if break and continue statements within a do block could transform to some return value.

Any thoughts on whether such changes can be made in Base?

[1] https://github.com/nomad-software/go-channel-compendium
[2] https://github.com/akels/Receive.jl/blob/main/tests/benchmarks/csp.jl
[3] https://stackoverflow.com/questions/37021194/how-are-golang-select-statements-implemented
[4] https://github.com/golang/go/blob/master/src/runtime/select.go
[5] https://discourse.julialang.org/t/best-way-for-a-task-to-read-from-two-channels/68704
[6] https://github.com/akels/Receive.jl/blob/main/tests/runtests.jl
[7] https://github.com/akels/Receive.jl/blob/main/src/patch.jl

@vtjnash
Copy link
Member

vtjnash commented Oct 14, 2021

In Julia, you can close ch (optionally with a particular message or value), which creates an OOB (out-of-band) quit message, without the need to mux unrelated data into the same stream.

@JanisErdmanis
Copy link

Quitting tasks is only one of the uses. For my application, the network is highly unreliable, and messages get dropped, manipulated or reordered. To deal with such a situation, I need to authenticate every message|counter with HMAC and let the state machine deal with it. However, sometimes messages would not come, and I shall assume that the last response did not get delivered; thus, I need to get out of being blocked by take! and repeat. Currently, the only option is to close a socket and reconnect, but I want to keep it alive as long as possible. With the receive function, I could feed a heartbeat to one of the channels to unblock and retry.

@vtjnash
Copy link
Member

vtjnash commented Oct 14, 2021

You can also have the heartbeat task itself enqueue the message, rather than having the timer send a message to the receiver to send a message to the client?

@tkf
Copy link
Member

tkf commented Oct 14, 2021

Erlang is more of an actor language than a CSP. As such, it doesn't (cannot) support a generic selective communication like the languages with CSP flavor (e.g., Go) because the messages are asynchronous. I think Julia is too mature to switch to actor model at this point. So, I don't think Erlang is a good language to take (sole) inspiration from. If we were to add selective communication, I think it should be roughly at least as general as Go's.

@JanisErdmanis
Copy link

I hadn't thought of using Timer that way, which indeed seems like a viable option for retrying and timeouts.

Another difficulty I faced was connecting the socket, with a persistence layer and a protocol state machine. From the testing perspective, it makes sense to model a protocol like
run(state, in::Channel, out::Channel) so that I can test the core of two-party protocol as simple as:

@async resultA = run(stateA, chA, chB)
resultB = run(stateB, chB, chA)

Then another component is a network layer which collects bytes and forms a chunk of messages which are put in a channel ch_bytes_out when ready. The last part is the middle persistence layer which I wanted to write as:

run(state, ch_bytes_out, events_out, ch_bytes_in, events_in)

which I could test with arbitrary events before putting in a particular protocol. Using a union channel to combine ch_bytes_out and events_out would be undesirable as in case the persistence layer would want to close a socket and try a different route, it would also close events_out. In addition, when I define the convert method for my message types, I can pass Channel{MyType} to my protocol state machine and expect it to work.

Currently, I have dropped this design in favour of stacking state machines together in the persistence layer. There are some benefits to doing that, like having fewer tasks floating around. But I feel that I lost some ability to be sure that after sticking my components together, the whole system would work.

@Vikasg7
Copy link

Vikasg7 commented Jul 9, 2022

Can we safely say this for buffered channels?

function poll!(ch)
   !isbuffered(ch) && error("Not implemented for unbuffered channels")
   length(ch.data) > 0 && return take!(ch)
   return false
end

function offer!(ch, val)
   !isbuffered(ch) && error("Not implemented for unbuffered channels")
   if length(ch.data) < ch.sz_max
      put!(ch, val)
      return true
   end
  return false
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
julep Julia Enhancement Proposal multithreading Base.Threads and related functionality parallelism Parallel or distributed computation
Projects
None yet
Development

No branches or pull requests