From 23029f9324e473d927a56c6860ae2dc0faf1a894 Mon Sep 17 00:00:00 2001 From: pmeinhardt Date: Wed, 3 Feb 2021 22:11:44 +0100 Subject: [PATCH] Iterate on tarpipe upload implementation --- examples/tarpipe.exs | 109 +++++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 39 deletions(-) diff --git a/examples/tarpipe.exs b/examples/tarpipe.exs index 30cd978..01c6d30 100644 --- a/examples/tarpipe.exs +++ b/examples/tarpipe.exs @@ -3,59 +3,90 @@ ctx = SSHKit.Context.new() |> SSHKit.Context.path("/tmp") - |> SSHKit.Context.user("other") - |> SSHKit.Context.group("other") + # |> SSHKit.Context.user("other") + # |> SSHKit.Context.group("other") |> SSHKit.Context.umask("0077") -defmodule Xfer do - # https://github.com/erlang/otp/blob/OTP-23.2.1/lib/ssh/src/ssh.hrl - def to_binary(data) when is_list(data) do - :erlang.iolist_to_binary(data) - catch - _ -> :unicode.characters_to_binary(data) - end +defmodule TP do + def upload!(conn, source, dest, opts \\ []) do + ctx = Keyword.get(opts, :context, SSHKit.Context.new()) - def to_binary(data) when is_binary(data) do - data - end -end + Stream.resource( + fn -> + {:ok, chan} = SSHKit.Channel.open(conn, []) + command = SSHKit.Context.build(ctx, "tar -x") + :success = SSHKit.Channel.exec(chan, command) -source = "test/fixtures" + owner = self() -:ok = - with {:ok, chan} <- SSHKit.Channel.open(conn, []) do - command = SSHKit.Context.build(ctx, "tar -x") + tarpipe = spawn(fn -> + {:ok, tar} = :erl_tar.init(chan, :write, fn + :position, {^chan, position} -> + # IO.inspect(position, label: "position") + {:ok, 0} - case SSHKit.Channel.exec(chan, command) do - :success -> - # In case of failed upload, check command output: - # IO.inspect(SSHKit.Channel.recv(chan)) + :write, {^chan, data} -> + # TODO: Send data in chunks based on channel window size? + # IO.inspect(data, label: "write") + # In case of failing upload, check command output: + # IO.inspect(SSHKit.Channel.recv(chan, 0)) + chunk = to_binary(data) - {:ok, tar} = :erl_tar.init(chan, :write, fn - :position, {^chan, position} -> - # IO.write("tar position: #{inspect(position)}") - {:ok, 0} + receive do + :cont -> + :ok = SSHKit.Channel.send(chan, chunk) + end + send(owner, {:write, chan, self(), chunk}) + :ok - :write, {^chan, data} -> - # TODO: Send data in chunks based on channel window size? - :ok = SSHKit.Channel.send(chan, Xfer.to_binary(data)) - :ok + :close, ^chan -> + # IO.puts("close") + :ok = SSHKit.Channel.eof(chan) + send(owner, {:close, chan, self()}) + :ok + end) - :close, ^chan -> - :ok = SSHKit.Channel.eof(chan) - :ok + :ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), []) + :ok = :erl_tar.close(tar) end) - :ok = :erl_tar.add(tar, to_charlist(source), to_charlist(Path.basename(source)), []) + {chan, tarpipe} + end, + fn {chan, tarpipe} -> + send(tarpipe, :cont) - :ok = :erl_tar.close(tar) + receive do + {:write, ^chan, ^tarpipe, data} -> + {[{:write, chan, data}], {chan, tarpipe}} - :failure -> - {:error, :failure} + {:close, ^chan, ^tarpipe} -> + {:halt, {chan, tarpipe}} + end + end, + fn {chan, tarpipe} -> + :ok = SSHKit.Channel.close(chan) + :ok = SSHKit.Channel.flush(chan) + end + ) + end - other -> - other - end + # https://github.com/erlang/otp/blob/OTP-23.2.1/lib/ssh/src/ssh.hrl + def to_binary(data) when is_list(data) do + :erlang.iolist_to_binary(data) + catch + _ -> :unicode.characters_to_binary(data) end + def to_binary(data) when is_binary(data) do + data + end +end + +stream = TP.upload!(conn, "test/fixtures", "upload", context: ctx) + +Enum.each(stream, fn + {:write, chan, data} -> + IO.puts("Upload, sent #{byte_size(data)} bytes") +end) + :ok = SSHKit.close(conn)