Skip to content

Commit

Permalink
apacheGH-38828: [R] Ensure that streams can be written to socket conn…
Browse files Browse the repository at this point in the history
…ections (apache#38897)

### Rationale for this change

Currently we can't write to socket connection from R. This is a very useful way to send Arrow data around and should work!

### What changes are included in this PR?

Implements `Tell()` for non-seekable output streams. Apparently some Arrow code calls this to figure out how many bytes have been written.

### Are these changes tested?

I'm not quite sure how to test this...all output streams we can easily test are seekable. We could try to spin up a socket server on another thread (like the reprex below) but I'm worried that will be flaky.

### Are there any user-facing changes?

Yes (something that should have previously worked now works), although there is no place where we currently document anything about how connections can be used.

``` r
tmp <- tempfile()
proc <- callr::r_bg(function() {
  server <- function() {
    library(arrow)
    
    while (TRUE) {
      writeLines("Listening...")
      con <- socketConnection(host = "localhost", port = 6011, blocking = TRUE,
                              server = TRUE, open = "r+b")
      socketTimeout(con, 3600)
      
      data <- arrow::read_ipc_stream(con, as_data_frame = FALSE)
      print(head(as.data.frame(data)))
      
    }
  }
  
  server()
}, stdout = tmp)

Sys.sleep(0.5)

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
rb <- arrow::record_batch(iris)

socketDriver <- socketConnection(host = "localhost", 
                                 port = "6011",
                                 blocking = TRUE,
                                 server = FALSE,
                                 open = "w+b")

write_ipc_stream(rb, socketDriver)
Sys.sleep(0.5)
cat(brio::read_file(tmp))
#> Listening...
#>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1          5.1         3.5          1.4         0.2  setosa
#> 2          4.9         3.0          1.4         0.2  setosa
#> 3          4.7         3.2          1.3         0.2  setosa
#> 4          4.6         3.1          1.5         0.2  setosa
#> 5          5.0         3.6          1.4         0.2  setosa
#> 6          5.4         3.9          1.7         0.4  setosa
#> Listening...

# Shutdown server
proc$interrupt()
#> [1] TRUE
Sys.sleep(0.5)
proc$is_alive()
#> [1] FALSE
```

<sup>Created on 2023-11-27 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>
* Closes: apache#38828
* GitHub Issue: apache#38828

Authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
  • Loading branch information
paleolimbot authored Apr 3, 2024
1 parent 469430f commit e0d73c5
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 33 deletions.
7 changes: 4 additions & 3 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@
#' `col_names`, and the CSV file has a header row that would otherwise be used
#' to identify column names, you'll need to add `skip = 1` to skip that row.
#'
#' @param file A character file name or URI, literal data (either a single string or a [raw] vector),
#' an Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
#' @param file A character file name or URI, connection, literal data (either a
#' single string or a [raw] vector), an Arrow input stream, or a `FileSystem`
#' with path (`SubTreeFileSystem`).
#'
#' If a file name, a memory-mapped Arrow [InputStream] will be opened and
#' closed when finished; compression will be detected from the file extension
Expand Down Expand Up @@ -894,7 +895,7 @@ readr_to_csv_convert_options <- function(na,
#' Write CSV file to disk
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' @param sink A string file path, connection, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param file file name. Specify this or `sink`, not both.
#' @param include_header Whether to write an initial header line with column names
Expand Down
2 changes: 1 addition & 1 deletion r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#' [write_ipc_file()] can only write V2 files.
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' @param sink A string file path, connection, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param version integer Feather file version, Version 1 or Version 2. Version 2 is the default.
#' @param chunk_size For V2 files, the number of rows that each chunk of data
Expand Down
4 changes: 2 additions & 2 deletions r/R/ipc-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ write_to_raw <- function(x, format = c("stream", "file")) {
#' a "stream" format and a "file" format, known as Feather. `read_ipc_stream()`
#' and [read_feather()] read those formats, respectively.
#'
#' @param file A character file name or URI, `raw` vector, an Arrow input stream,
#' or a `FileSystem` with path (`SubTreeFileSystem`).
#' @param file A character file name or URI, connection, `raw` vector, an
#' Arrow input stream, or a `FileSystem` with path (`SubTreeFileSystem`).
#' If a file name or URI, an Arrow [InputStream] will be opened and
#' closed when finished. If an input stream is provided, it will be left
#' open.
Expand Down
2 changes: 1 addition & 1 deletion r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ read_parquet <- function(file,
#' article} for examples of this.
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' @param sink A string file path, connection, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param chunk_size how many rows of data to write to disk at once. This
#' directly corresponds to how many rows will be in each row group in
Expand Down
5 changes: 3 additions & 2 deletions r/man/read_delim_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions r/man/read_json_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/man/read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_csv_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_feather.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_ipc_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 45 additions & 12 deletions r/src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,16 @@ void io___BufferOutputStream__Write(
class RConnectionFileInterface : public virtual arrow::io::FileInterface {
public:
explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
: connection_sexp_(connection_sexp), closed_(false) {
: connection_sexp_(connection_sexp),
closed_(false),
seekable_(false),
bytes_written_(0),
bytes_read_(0) {
check_closed();
seekable_ = check_seekable();
}

arrow::Status Close() {
arrow::Status Close() override {
if (closed_) {
return arrow::Status::OK();
}
Expand All @@ -227,11 +232,21 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {
"close() on R connection");
}

arrow::Result<int64_t> Tell() const {
arrow::Result<int64_t> Tell() const override {
if (closed()) {
return arrow::Status::IOError("R connection is closed");
}

// R connections use seek() with no additional arguments as a tell()
// implementation; however, non-seekable connections will error if you
// do this. This heuristic allows Tell() to return a reasonable value
// (used by at least the IPC writer).
if (!seekable_ && bytes_written_ > 0) {
return bytes_written_;
} else if (!seekable_) {
return bytes_read_;
}

return SafeCallIntoR<int64_t>(
[&]() {
cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_);
Expand All @@ -240,7 +255,7 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {
"tell() on R connection");
}

bool closed() const { return closed_; }
bool closed() const override { return closed_; }

protected:
cpp11::sexp connection_sexp_;
Expand All @@ -261,13 +276,14 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {
return SafeCallIntoR<int64_t>(
[&] {
cpp11::function read_bin = cpp11::package("base")["readBin"];
cpp11::writable::raws ptype((R_xlen_t)0);
cpp11::writable::raws ptype(static_cast<R_xlen_t>(0));
cpp11::integers n = cpp11::as_sexp<int>(static_cast<int>(nbytes));

cpp11::sexp result = read_bin(connection_sexp_, ptype, n);

int64_t result_size = cpp11::safe[Rf_xlength](result);
memcpy(out, cpp11::safe[RAW](result), result_size);
bytes_read_++;
return result_size;
},
"readBin() on R connection");
Expand All @@ -294,6 +310,7 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {

cpp11::function write_bin = cpp11::package("base")["writeBin"];
write_bin(data_raw, connection_sexp_);
bytes_written_ += nbytes;
},
"writeBin() on R connection");
}
Expand All @@ -312,6 +329,9 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {

private:
bool closed_;
bool seekable_;
int64_t bytes_written_;
int64_t bytes_read_;

bool check_closed() {
if (closed_) {
Expand All @@ -333,6 +353,15 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface {

return closed_;
}

bool check_seekable() {
auto is_seekable_result = SafeCallIntoR<bool>([&] {
cpp11::sexp result = cpp11::package("base")["isSeekable"](connection_sexp_);
return cpp11::as_cpp<bool>(result);
});

return is_seekable_result.ok() && *is_seekable_result;
}
};

class RConnectionInputStream : public virtual arrow::io::InputStream,
Expand All @@ -341,9 +370,11 @@ class RConnectionInputStream : public virtual arrow::io::InputStream,
explicit RConnectionInputStream(cpp11::sexp connection_sexp)
: RConnectionFileInterface(connection_sexp) {}

arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); }
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
return ReadBase(nbytes, out);
}

arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
return ReadBase(nbytes);
}
};
Expand Down Expand Up @@ -373,13 +404,15 @@ class RConnectionRandomAccessFile : public arrow::io::RandomAccessFile,
}
}

arrow::Result<int64_t> GetSize() { return size_; }
arrow::Result<int64_t> GetSize() override { return size_; }

arrow::Status Seek(int64_t pos) { return SeekBase(pos); }
arrow::Status Seek(int64_t pos) override { return SeekBase(pos); }

arrow::Result<int64_t> Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); }
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override {
return ReadBase(nbytes, out);
}

arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) {
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
return ReadBase(nbytes);
}

Expand All @@ -393,7 +426,7 @@ class RConnectionOutputStream : public arrow::io::OutputStream,
explicit RConnectionOutputStream(cpp11::sexp connection_sexp)
: RConnectionFileInterface(connection_sexp) {}

arrow::Status Write(const void* data, int64_t nbytes) {
arrow::Status Write(const void* data, int64_t nbytes) override {
return WriteBase(data, nbytes);
}
};
Expand Down

0 comments on commit e0d73c5

Please sign in to comment.