Skip to content

Commit

Permalink
Merge pull request #17 from maheshmt/synthesys-notebook
Browse files Browse the repository at this point in the history
fixing sparksql
  • Loading branch information
maheshmt committed May 17, 2016
2 parents 3ad18a0 + ab9b30d commit 794a56b
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 109 deletions.
45 changes: 27 additions & 18 deletions spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,38 @@ private boolean useHiveContext() {
}

public SQLContext getSQLContext() {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
final ClassLoader old = Thread.currentThread().getContextClassLoader();
try
{
Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader());
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}

return sqlc;
}finally
{
Thread.currentThread().setContextClassLoader(old);
}

return sqlc;
}

public DependencyResolver getDependencyResolver() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -111,39 +113,36 @@ public void close() {}

@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
SQLContext sqlc = null;
SparkInterpreter sparkInterpreter = getSparkInterpreter();
final ClassLoader old = Thread.currentThread().getContextClassLoader();
try
{
Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader());
SQLContext sqlc = null;
SparkInterpreter sparkInterpreter = getSparkInterpreter();

if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}

if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
return new InterpreterResult(Code.ERROR, "Spark "
+ sparkInterpreter.getSparkVersion().toString() + " is not supported");
}
sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
if (concurrentSQL()) {
sc.setLocalProperty("spark.scheduler.pool", "fair");
} else {
sc.setLocalProperty("spark.scheduler.pool", null);
}

sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
if (concurrentSQL()) {
sc.setLocalProperty("spark.scheduler.pool", "fair");
} else {
sc.setLocalProperty("spark.scheduler.pool", null);
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
DataFrame rdd = sqlc.sql(st);
String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult);
sc.clearJobGroup();
return new InterpreterResult(Code.SUCCESS, msg);
}

sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
Object rdd = null;
try {
// method signature of sqlc.sql() is changed
// from def sql(sqlText: String): SchemaRDD (1.2 and prior)
// to def sql(sqlText: String): DataFrame (1.3 and later).
// Therefore need to use reflection to keep binary compatibility for all spark versions.
Method sqlMethod = sqlc.getClass().getMethod("sql", String.class);
rdd = sqlMethod.invoke(sqlc, st);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
finally
{
Thread.currentThread().setContextClassLoader(old);
}

String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult);
sc.clearJobGroup();
return new InterpreterResult(Code.SUCCESS, msg);
}

@Override
Expand Down
133 changes: 71 additions & 62 deletions spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
Expand Down Expand Up @@ -284,80 +285,88 @@ public static String showDF(ZeppelinContext z, Object df) {
public static String showDF(SparkContext sc,
InterpreterContext interpreterContext,
Object df, int maxResult) {
Object[] rows = null;
Method take;
String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
sc.setJobGroup(jobGroup, "Zeppelin", false);
final ClassLoader old = Thread.currentThread().getContextClassLoader();
try
{
Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader());
Object[] rows = null;
Method take;
String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
sc.setJobGroup(jobGroup, "Zeppelin", false);

try {
take = df.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(df, maxResult + 1);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | ClassCastException e) {
sc.clearJobGroup();
throw new InterpreterException(e);
}
try {
take = df.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(df, maxResult + 1);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | ClassCastException e) {
sc.clearJobGroup();
throw new InterpreterException(e);
}

List<Attribute> columns = null;
// get field names
try {
// Use reflection because of classname returned by queryExecution changes from
// Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
// Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
Object qe = df.getClass().getMethod("queryExecution").invoke(df);
Object a = qe.getClass().getMethod("analyzed").invoke(qe);
scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);

columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
.asJava();
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
List<Attribute> columns = null;
// get field names
try {
// Use reflection because of classname returned by queryExecution changes from
// Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution
// Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution
Object qe = df.getClass().getMethod("queryExecution").invoke(df);
Object a = qe.getClass().getMethod("analyzed").invoke(qe);
scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a);

columns = (List<Attribute>) scala.collection.JavaConverters.seqAsJavaListConverter(seq)
.asJava();
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}

String msg = null;
for (Attribute col : columns) {
if (msg == null) {
msg = col.name();
} else {
msg += "\t" + col.name();
String msg = null;
for (Attribute col : columns) {
if (msg == null) {
msg = col.name();
} else {
msg += "\t" + col.name();
}
}
}

msg += "\n";
msg += "\n";

// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
// NullType, NumericType, ShortType, StringType, StructType
// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
// NullType, NumericType, ShortType, StringType, StructType

try {
for (int r = 0; r < maxResult && r < rows.length; r++) {
Object row = rows[r];
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
Method apply = row.getClass().getMethod("apply", int.class);

for (int i = 0; i < columns.size(); i++) {
if (!(Boolean) isNullAt.invoke(row, i)) {
msg += apply.invoke(row, i).toString();
} else {
msg += "null";
}
if (i != columns.size() - 1) {
msg += "\t";
try {
for (int r = 0; r < maxResult && r < rows.length; r++) {
Object row = rows[r];
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
Method apply = row.getClass().getMethod("apply", int.class);

for (int i = 0; i < columns.size(); i++) {
if (!(Boolean) isNullAt.invoke(row, i)) {
msg += apply.invoke(row, i).toString();
} else {
msg += "null";
}
if (i != columns.size() - 1) {
msg += "\t";
}
}
msg += "\n";
}
msg += "\n";
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}

if (rows.length > maxResult) {
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
if (rows.length > maxResult) {
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
}
sc.clearJobGroup();
return "%table " + msg;
}finally
{
Thread.currentThread().setContextClassLoader(old);
}
sc.clearJobGroup();
return "%table " + msg;
}

/**
Expand Down

0 comments on commit 794a56b

Please sign in to comment.