Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.5.0 #64

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 63 additions & 27 deletions src/main/java/fr/insee/trevas/lab/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public static ScriptEngine initEngineWithSpark(Bindings bindings, SparkSession s
public static Bindings getBindings(Bindings input) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) output.put(k, v);
if (!k.startsWith("$")) {
if (v instanceof PersistentDataset) {
output.put(k + "$PersistentDataset", v);
} else {
output.put(k, v);
}
}
;
});
return output;
}
Expand Down Expand Up @@ -69,17 +76,26 @@ public static SparkConf loadSparkConfig(String stringPath) {
public static Bindings getSparkBindings(Bindings input, Integer limit) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
if (ds instanceof SparkDataset) {
Dataset<Row> sparkDs = ((SparkDataset) ds).getSparkDataset();
if (!k.startsWith("$")) {
if ((v instanceof PersistentDataset) || (v instanceof SparkDataset)) {
String name = k;
SparkDataset spDs = null;
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
name = name + "$PersistentDataset";
spDs = (SparkDataset) ds;
}
if (v instanceof SparkDataset) {
spDs = (SparkDataset) v;
}
Dataset<Row> sparkDs = (spDs).getSparkDataset();
if (limit != null) {
SparkDataset sparkDataset = new SparkDataset(sparkDs.limit(limit));
InMemoryDataset im = new InMemoryDataset(
sparkDataset.getDataPoints(),
sparkDataset.getDataStructure());
output.put(k, im);
} else output.put(k, new SparkDataset(sparkDs)); // useless
output.put(name, im);
} else output.put(name, new SparkDataset(sparkDs)); // useless
}
}
});
Expand All @@ -90,34 +106,54 @@ public static void writeSparkDatasetsJDBC(Bindings bindings,
Map<String, QueriesForBindingsToSave> queriesForBindingsToSave
) {
queriesForBindingsToSave.forEach((name, values) -> {
SparkDataset dataset = (SparkDataset) bindings.get(name);
Dataset<Row> dsSpark = dataset.getSparkDataset();
String jdbcPrefix = "";
try {
jdbcPrefix = getJDBCPrefix(values.getDbtype());
} catch (Exception e) {
e.printStackTrace();
Object ds = bindings.get(name);
if (!(ds instanceof PersistentDataset)) {
try {
throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate();
if (dataset instanceof SparkDataset) {
String jdbcPrefix = "";
try {
Dataset<Row> dsSpark = ((SparkDataset) dataset).getSparkDataset();
jdbcPrefix = getJDBCPrefix(values.getDbtype());
dsSpark.write()
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", jdbcPrefix + values.getUrl())
.option("dbtable", values.getTable())
.option("user", values.getUser())
.option("password", values.getPassword())
.save();
} catch (Exception e) {
e.printStackTrace();
}
}
dsSpark.write()
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", jdbcPrefix + values.getUrl())
.option("dbtable", values.getTable())
.option("user", values.getUser())
.option("password", values.getPassword())
.save();
});
}

public static void writeSparkS3Datasets(Bindings bindings, Map<String, S3ForBindings> s3toSave,
ObjectMapper objectMapper,
SparkSession spark) {
s3toSave.forEach((name, values) -> {
SparkDataset dataset = (SparkDataset) bindings.get(name);
try {
writeSparkDataset(objectMapper, spark, values, dataset);
} catch (Exception e) {
e.printStackTrace();
Object ds = bindings.get(name);
if (!(ds instanceof PersistentDataset)) {
try {
throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate();
if (dataset instanceof SparkDataset) {
try {
writeSparkDataset(objectMapper, spark, values, (SparkDataset) dataset);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Expand Down
Loading