Skip to content

Commit

Permalink
Return all ds in bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
NicoLaval committed Jan 24, 2024
1 parent af96a51 commit c707e23
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/main/java/fr/insee/trevas/lab/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public static ScriptEngine initEngineWithSpark(Bindings bindings, SparkSession s
public static Bindings getBindings(Bindings input) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) output.put(k, v);
if (!k.startsWith("$")) {
if (v instanceof PersistentDataset) {
output.put(k + "$PersistentDataset", v);
} else {
output.put(k, v);
}
}
;
});
return output;
}
Expand Down Expand Up @@ -69,17 +76,26 @@ public static SparkConf loadSparkConfig(String stringPath) {
public static Bindings getSparkBindings(Bindings input, Integer limit) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
if (ds instanceof SparkDataset) {
Dataset<Row> sparkDs = ((SparkDataset) ds).getSparkDataset();
if (!k.startsWith("$")) {
if ((v instanceof PersistentDataset) || (v instanceof SparkDataset)) {
String name = k;
SparkDataset spDs = null;
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
name = name + "$PersistentDataset";
spDs = (SparkDataset) ds;
}
if (v instanceof SparkDataset) {
spDs = (SparkDataset) v;
}
Dataset<Row> sparkDs = (spDs).getSparkDataset();
if (limit != null) {
SparkDataset sparkDataset = new SparkDataset(sparkDs.limit(limit));
InMemoryDataset im = new InMemoryDataset(
sparkDataset.getDataPoints(),
sparkDataset.getDataStructure());
output.put(k, im);
} else output.put(k, new SparkDataset(sparkDs)); // useless
output.put(name, im);
} else output.put(name, new SparkDataset(sparkDs)); // useless
}
}
});
Expand Down

0 comments on commit c707e23

Please sign in to comment.