Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeouts to SSH refillers #1338

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/fix_ssh_refiller_timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Adds a timeout for SSH refillers
* Changes the SSH refiller syntax to use the environment variable `SHESMU_REFILLER_HASH` instead of a command line argument. Use `jq '.refillers = (.refillers[] | .command += " $SHESMU_REFILLER_HASH")'` to restore the command line argument. This change can be safely applied before upgrading since it will be a no-op in the previous version.
20 changes: 14 additions & 6 deletions docs/plugin-sftp.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ To configure an SFTP server, create a file ending in `.sftp` as follows:
"fileRoots": [],
"fileRootsTtl": null,
"functions": {},
"refillers": {}
"refillers": {},
"refillerTimeout": 600
}

Shesmu uses passwordless public key authentication on the remote server. An
Expand Down Expand Up @@ -73,15 +74,17 @@ olive. To create one, add an entry in the `"refillers"` section as follows:
"parameters": {
"count": "i",
"value": "s"
}
},
"timeout": 600
}

This will create `example` as a refiller available to olives. It will take
parameters as defined in the `"parameters"` block; the value of each parameter
is a JSON-enhanced Shesmu type descriptors (see [types in the language
description](language.md#types) for details). When the olive is ready,
Shesmu will compute an order-independent hash from the data. Then, over SSH,
`"command"` will be run with the hash (as a hexadecimal string) after it.
`"command"` will be run with the hash (as a hexadecimal string) in the
environment variable `SHESMU_REFILLER_HASH`.

This program can then decide if the hash matches the last version it has
consumed. If so, it should print: `OK` and exit 0. If it has stale data, it
Expand All @@ -95,21 +98,26 @@ If the program exits non-zero, Shesmu will retry with the same data until
success or the data is updated.

The program should run in a reasonable amount of time. Long-running programs
will have serious performance implications for Shesmu.
will have serious performance implications for Shesmu. A maximum timeout, in
seconds can be specified for the runtime of a refiller. If specified,
`"timeout"` can be set for each refiller, or the top-level `"refillerTimeout"`
can set the default if none is specified per refiller. If both are absent or
`null` a default of 38 minutes is used. This is implemented using the `timeout`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 38?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the maximum amount of time you can keep a Stargate connected unless you're near a blackhole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

command, which is present on UNIX systems.

As an example, this shell script read the data and places it in a file (in the
same directory):


#!/bin/sh
cd $(dirname $0)
if [ -f current_hash ] && [ "${1}" = "$(cat current_hash)" ]; then
if [ -f current_hash ] && [ "${SHESMU_REFILLER_HASH}" = "$(cat current_hash)" ]; then
echo OK
exit 0
fi
echo UPDATE
cat >current_data
echo "${1}" >current_hash
echo "${SHESMU_REFILLER_HASH}" >current_hash

A more sophisticated version of this script is provided as
`shesmu-json-refiller` if it suits your needs.
Expand Down
1 change: 0 additions & 1 deletion plugin-pinery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
4 changes: 4 additions & 0 deletions plugin-sftp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
Expand Down
67 changes: 34 additions & 33 deletions plugin-sftp/shesmu-json-refiller
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,65 @@
#
# This does no processing to the data and therefore can be used with any format.

NEW_CHECKSUM=MISSING
HELP=false
NAME=refiller
TARGET_DIR=.

if [ "x${SHESMU_REFILLER_HASH}" = "x" ]; then
HELP=true
fi
set -eu

TEMP=`getopt c:d:hn: "$@"`
TEMP=$(getopt d:hn: "$@")

if [ $? != 0 ] ; then echo "Terminating..." >&2 ; exit 1 ; fi
if [ $? != 0 ]; then
echo "Terminating..." >&2
exit 1
fi

eval set -- "$TEMP"

while true ; do
while true; do
case "$1" in
-c)
NEW_CHECKSUM="$2"
shift 2
;;
-d)
TARGET_DIR="$2"
shift 2
;;
-h)
HELP=true
shift
;;
-n)
NAME="$2"
shift 2
;;
--)
shift
break
;;
*) echo "Internal error!" ; exit 1 ;;
-d)
TARGET_DIR="$2"
shift 2
;;
-h)
HELP=true
shift
;;
-n)
NAME="$2"
shift 2
;;
--)
shift
break
;;
*)
echo "Internal error!"
exit 1
;;
esac
done

if $HELP || [ $# -ne 0 ] || [ "${NEW_CHECKSUM}" = "MISSING" ]
then
echo $0 '-c checksum [-d outputdir] [-n name]'
echo ' -c The checksum provided by Shesmu'
if $HELP || [ $# -ne 0 ]; then
echo $0 '[-d outputdir] [-n name]'
echo ' -d Directory to place output in (default is current directory)'
echo ' -h Display help'
echo ' -n The prefix of the output file (default is refiller)'
echo 'SHESMU_REFILLER_HASH must also be set'
exit 1
fi


cd "${TARGET_DIR}"
if [ -f "${NAME}.checksum" ]; then
EXISTING_CHECKSUM="$(cat "${NAME}.checksum")"
if [ "${NEW_CHECKSUM}" = "${EXISTING_CHECKSUM}" ]; then
if [ "${SHESMU_REFILLER_HASH}" = "${EXISTING_CHECKSUM}" ]; then
echo OK
exit 0
fi
fi
echo UPDATE
cat >"${NAME}.json"
echo "${NEW_CHECKSUM}" >"${NAME}.checksum"
echo "${SHESMU_REFILLER_HASH}" >"${NAME}.checksum"
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class Configuration {
private List<JsonDataSource> jsonSources = List.of();
private String listCommand;
private int port;
private Integer refillerTimeout;
private Map<String, RefillerConfig> refillers = Map.of();
private String user;

Expand Down Expand Up @@ -42,6 +43,10 @@ public int getPort() {
return port;
}

public Integer getRefillerTimeout() {
return refillerTimeout;
}

public Map<String, RefillerConfig> getRefillers() {
return refillers;
}
Expand Down Expand Up @@ -78,6 +83,10 @@ public void setPort(int port) {
this.port = port;
}

public void setRefillerTimeout(Integer refillerTimeout) {
this.refillerTimeout = refillerTimeout;
}

public void setRefillers(Map<String, RefillerConfig> refillers) {
this.refillers = refillers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
public class RefillerConfig {
private String command;
private Map<String, Imyhat> parameters;
private Integer timeout;

public String getCommand() {
return command;
Expand All @@ -15,11 +16,19 @@ public Map<String, Imyhat> getParameters() {
return parameters;
}

public Integer getTimeout() {
return timeout;
}

public void setCommand(String command) {
this.command = command;
}

public void setParameters(Map<String, Imyhat> parameters) {
this.parameters = parameters;
}

public void setTimeout(Integer timeout) {
this.timeout = timeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ca.on.oicr.gsi.shesmu.sftp;

import static org.apache.commons.text.StringEscapeUtils.ESCAPE_XSI;

import ca.on.oicr.gsi.Pair;
import ca.on.oicr.gsi.prometheus.LatencyHistogram;
import ca.on.oicr.gsi.shesmu.plugin.AlgebraicValue;
Expand All @@ -24,7 +26,6 @@
import ca.on.oicr.gsi.shesmu.plugin.refill.Refiller;
import ca.on.oicr.gsi.shesmu.plugin.types.Imyhat;
import ca.on.oicr.gsi.status.SectionRenderer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import io.prometheus.client.Counter;
Expand All @@ -37,6 +38,7 @@
import java.nio.file.Path;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Consumer;
Expand Down Expand Up @@ -191,14 +193,7 @@ public InputStream fileSystemData() throws Exception {
c -> {
final var roots =
c.getFileRoots().stream()
.map(
p -> {
try {
return MAPPER.writeValueAsString(p);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.map(ESCAPE_XSI::translate)
.collect(Collectors.joining(" "));
return new SshJsonInputSource(
connections,
Expand Down Expand Up @@ -427,7 +422,12 @@ public synchronized Optional<Integer> update(Configuration configuration) {
configuration.getHost(), configuration.getPort(), configuration.getUser());
fileAttributes.invalidateAll();
definer.clearRefillers();
final var defaultRefillerTimeout =
Objects.requireNonNullElse(configuration.getRefillerTimeout(), 38 * 60);
for (final var entry : configuration.getRefillers().entrySet()) {
final var timeout =
Math.max(
Objects.requireNonNullElse(entry.getValue().getTimeout(), defaultRefillerTimeout), 1);
definer.defineRefiller(
entry.getKey(),
String.format(
Expand All @@ -436,10 +436,12 @@ public synchronized Optional<Integer> update(Configuration configuration) {
new Definer.RefillDefiner() {
@Override
public <I> Definer.RefillInfo<I, SshRefiller<I>> info(Class<I> rowType) {

return new Definer.RefillInfo<>() {
@Override
public SshRefiller<I> create() {
return new SshRefiller<>(definer, entry.getKey(), entry.getValue().getCommand());
return new SshRefiller<>(
definer, entry.getKey(), entry.getValue().getCommand(), timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ca.on.oicr.gsi.shesmu.sftp;

import static org.apache.commons.text.StringEscapeUtils.ESCAPE_XSI;

import ca.on.oicr.gsi.shesmu.plugin.Utils;
import ca.on.oicr.gsi.shesmu.plugin.refill.Refiller;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -16,12 +18,14 @@ public class SshRefiller<T> extends Refiller<T> {
private final Supplier<SftpServer> server;
private final String command;
private final String name;
private final int timeout;
private String lastHash = "";

public SshRefiller(Supplier<SftpServer> server, String name, String command) {
public SshRefiller(Supplier<SftpServer> server, String name, String command, int timeout) {
this.server = server;
this.command = command;
this.name = name;
this.timeout = timeout;
}

@Override
Expand Down Expand Up @@ -67,7 +71,14 @@ public void consume(Stream<T> items) {
if (totalHash.equals(lastHash)) {
return;
}
if (server.get().refill(name, command + " " + totalHash, output)) {
if (server
.get()
.refill(
name,
String.format(
"export SHESMU_REFILLER_HASH=%s; timeout %ds sh -c %s",
totalHash, timeout, ESCAPE_XSI.translate(command)),
output)) {
lastHash = totalHash;
}

Expand Down
9 changes: 5 additions & 4 deletions plugin-sftp/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
requires ca.on.oicr.gsi.shesmu;
requires com.fasterxml.jackson.databind;
requires com.hierynomus.sshj;
requires simpleclient;
requires org.apache.commons.text;
requires org.bouncycastle.pkix;
requires org.bouncycastle.provider;
requires org.bouncycastle.util;
requires org.slf4j.jul;
requires org.slf4j;
requires org.bouncycastle.util;
requires org.bouncycastle.provider;
requires org.bouncycastle.pkix;
requires simpleclient;

provides PluginFileType with
SftpPluginType;
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@
<artifactId>commons-csv</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
</dependencyManagement>
<modules>
Expand Down