Skip to content

Commit

Permalink
Iterate on tarpipe implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pmeinhardt authored and andreasknoepfle committed Jan 11, 2023
1 parent 23029f9 commit 77f66db
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions examples/tarpipe.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,35 @@ defmodule TP do

Stream.resource(
fn ->
{:ok, chan} = SSHKit.Channel.open(conn, [])
command = SSHKit.Context.build(ctx, "tar -x")
:success = SSHKit.Channel.exec(chan, command)

owner = self()

tarpipe = spawn(fn ->
{:ok, chan} = SSHKit.Channel.open(conn, [])
command = SSHKit.Context.build(ctx, "tar -x")
:success = SSHKit.Channel.exec(chan, command)

# TODO: What if command immediately exits or does not exist?
# IO.inspect(SSHKit.Channel.recv(chan, 1000))

{:ok, tar} = :erl_tar.init(chan, :write, fn
:position, {^chan, position} ->
# IO.inspect(position, label: "position")
{:ok, 0}

:write, {^chan, data} ->
# TODO: Send data in chunks based on channel window size?
# IO.inspect(data, label: "write")
IO.inspect(data, label: "write")
# In case of failing upload, check command output:
# IO.inspect(SSHKit.Channel.recv(chan, 0))
chunk = to_binary(data)

receive do
:cont ->
:ok = SSHKit.Channel.send(chan, chunk)
case SSHKit.Channel.send(chan, chunk) do
:ok -> send(owner, {:write, chan, self(), chunk})
other -> send(owner, {:error, chan, self(), other})
end
end
send(owner, {:write, chan, self(), chunk})
:ok

:close, ^chan ->
Expand All @@ -48,24 +53,34 @@ defmodule TP do

:ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), [])
:ok = :erl_tar.close(tar)

:ok = SSHKit.Channel.close(chan)
end)

{chan, tarpipe}
tarpipe
end,
fn {chan, tarpipe} ->
fn tarpipe ->
send(tarpipe, :cont)

receive do
{:write, ^chan, ^tarpipe, data} ->
{[{:write, chan, data}], {chan, tarpipe}}
{:write, chan, ^tarpipe, data} ->
{[{:write, chan, data}], tarpipe}

{:close, chan, ^tarpipe} ->
{:halt, tarpipe}

{:close, ^chan, ^tarpipe} ->
{:halt, {chan, tarpipe}}
{:error, chan, ^tarpipe, error} ->
IO.inspect(error, label: "received error")
{:halt, tarpipe}
end

# case Tarpipe.proceed(tarpipe) do
# {:write, …} -> {[], tarpipe}
# {:error, …} -> raise
# end
end,
fn {chan, tarpipe} ->
:ok = SSHKit.Channel.close(chan)
:ok = SSHKit.Channel.flush(chan)
fn tarpipe ->
nil # :ok = Tarpipe.close(tarpipe)
end
)
end
Expand Down

0 comments on commit 77f66db

Please sign in to comment.