From ab9b30d57403bf42aab3921fe84c7885a8e81e11 Mon Sep 17 00:00:00 2001 From: Mahesh Thundathil Date: Tue, 17 May 2016 13:52:12 -0500 Subject: [PATCH] fixing sparksql --- .../zeppelin/spark/SparkInterpreter.java | 45 +++--- .../zeppelin/spark/SparkSqlInterpreter.java | 57 ++++---- .../zeppelin/spark/ZeppelinContext.java | 133 ++++++++++-------- 3 files changed, 126 insertions(+), 109 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 4d8416dadeb..20eadfd5c48 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -240,29 +240,38 @@ private boolean useHiveContext() { } public SQLContext getSQLContext() { - if (sqlc == null) { - if (useHiveContext()) { - String name = "org.apache.spark.sql.hive.HiveContext"; - Constructor hc; - try { - hc = getClass().getClassLoader().loadClass(name) - .getConstructor(SparkContext.class); - sqlc = (SQLContext) hc.newInstance(getSparkContext()); - } catch (NoSuchMethodException | SecurityException - | ClassNotFoundException | InstantiationException - | IllegalAccessException | IllegalArgumentException - | InvocationTargetException e) { - logger.warn("Can't create HiveContext. Fallback to SQLContext", e); - // when hive dependency is not loaded, it'll fail. - // in this case SQLContext can be used. + final ClassLoader old = Thread.currentThread().getContextClassLoader(); + try + { + Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader()); + if (sqlc == null) { + if (useHiveContext()) { + String name = "org.apache.spark.sql.hive.HiveContext"; + Constructor hc; + try { + hc = getClass().getClassLoader().loadClass(name) + .getConstructor(SparkContext.class); + sqlc = (SQLContext) hc.newInstance(getSparkContext()); + } catch (NoSuchMethodException | SecurityException + | ClassNotFoundException | InstantiationException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warn("Can't create HiveContext. Fallback to SQLContext", e); + // when hive dependency is not loaded, it'll fail. + // in this case SQLContext can be used. + sqlc = new SQLContext(getSparkContext()); + } + } else { sqlc = new SQLContext(getSparkContext()); } - } else { - sqlc = new SQLContext(getSparkContext()); } + + return sqlc; + }finally + { + Thread.currentThread().setContextClassLoader(old); } - return sqlc; } public DependencyResolver getDependencyResolver() { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 5e26fdc94ed..00a08a8fe0e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -23,7 +23,9 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; +import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -111,39 +113,36 @@ public void close() {} @Override public InterpreterResult interpret(String st, InterpreterContext context) { - SQLContext sqlc = null; - SparkInterpreter sparkInterpreter = getSparkInterpreter(); + final ClassLoader old = Thread.currentThread().getContextClassLoader(); + try + { + Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader()); + SQLContext sqlc = null; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + + if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); + } - if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) { - return new InterpreterResult(Code.ERROR, "Spark " - + sparkInterpreter.getSparkVersion().toString() + " is not supported"); - } + sqlc = getSparkInterpreter().getSQLContext(); + SparkContext sc = sqlc.sparkContext(); + if (concurrentSQL()) { + sc.setLocalProperty("spark.scheduler.pool", "fair"); + } else { + sc.setLocalProperty("spark.scheduler.pool", null); + } - sqlc = getSparkInterpreter().getSQLContext(); - SparkContext sc = sqlc.sparkContext(); - if (concurrentSQL()) { - sc.setLocalProperty("spark.scheduler.pool", "fair"); - } else { - sc.setLocalProperty("spark.scheduler.pool", null); + sc.setJobGroup(getJobGroup(context), "Zeppelin", false); + DataFrame rdd = sqlc.sql(st); + String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); + sc.clearJobGroup(); + return new InterpreterResult(Code.SUCCESS, msg); } - - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); - Object rdd = null; - try { - // method signature of sqlc.sql() is changed - // from def sql(sqlText: String): SchemaRDD (1.2 and prior) - // to def sql(sqlText: String): DataFrame (1.3 and later). - // Therefore need to use reflection to keep binary compatibility for all spark versions. - Method sqlMethod = sqlc.getClass().getMethod("sql", String.class); - rdd = sqlMethod.invoke(sqlc, st); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); + finally + { + Thread.currentThread().setContextClassLoader(old); } - - String msg = ZeppelinContext.showDF(sc, context, rdd, maxResult); - sc.clearJobGroup(); - return new InterpreterResult(Code.SUCCESS, msg); } @Override diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 658c5abb7cd..9bb71a0db1a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -30,6 +30,7 @@ import java.util.LinkedList; import java.util.List; +import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.catalyst.expressions.Attribute; @@ -284,80 +285,88 @@ public static String showDF(ZeppelinContext z, Object df) { public static String showDF(SparkContext sc, InterpreterContext interpreterContext, Object df, int maxResult) { - Object[] rows = null; - Method take; - String jobGroup = "zeppelin-" + interpreterContext.getParagraphId(); - sc.setJobGroup(jobGroup, "Zeppelin", false); + final ClassLoader old = Thread.currentThread().getContextClassLoader(); + try + { + Thread.currentThread().setContextClassLoader(SparkConf.class.getClassLoader()); + Object[] rows = null; + Method take; + String jobGroup = "zeppelin-" + interpreterContext.getParagraphId(); + sc.setJobGroup(jobGroup, "Zeppelin", false); - try { - take = df.getClass().getMethod("take", int.class); - rows = (Object[]) take.invoke(df, maxResult + 1); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | ClassCastException e) { - sc.clearJobGroup(); - throw new InterpreterException(e); - } + try { + take = df.getClass().getMethod("take", int.class); + rows = (Object[]) take.invoke(df, maxResult + 1); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException | ClassCastException e) { + sc.clearJobGroup(); + throw new InterpreterException(e); + } - List columns = null; - // get field names - try { - // Use reflection because of classname returned by queryExecution changes from - // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution - // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution - Object qe = df.getClass().getMethod("queryExecution").invoke(df); - Object a = qe.getClass().getMethod("analyzed").invoke(qe); - scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a); - - columns = (List) scala.collection.JavaConverters.seqAsJavaListConverter(seq) - .asJava(); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); - } + List columns = null; + // get field names + try { + // Use reflection because of classname returned by queryExecution changes from + // Spark <1.5.2 org.apache.spark.sql.SQLContext$QueryExecution + // Spark 1.6.0> org.apache.spark.sql.hive.HiveContext$QueryExecution + Object qe = df.getClass().getMethod("queryExecution").invoke(df); + Object a = qe.getClass().getMethod("analyzed").invoke(qe); + scala.collection.Seq seq = (scala.collection.Seq) a.getClass().getMethod("output").invoke(a); + + columns = (List) scala.collection.JavaConverters.seqAsJavaListConverter(seq) + .asJava(); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } - String msg = null; - for (Attribute col : columns) { - if (msg == null) { - msg = col.name(); - } else { - msg += "\t" + col.name(); + String msg = null; + for (Attribute col : columns) { + if (msg == null) { + msg = col.name(); + } else { + msg += "\t" + col.name(); + } } - } - msg += "\n"; + msg += "\n"; - // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType, - // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType, - // NullType, NumericType, ShortType, StringType, StructType + // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType, + // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType, + // NullType, NumericType, ShortType, StringType, StructType - try { - for (int r = 0; r < maxResult && r < rows.length; r++) { - Object row = rows[r]; - Method isNullAt = row.getClass().getMethod("isNullAt", int.class); - Method apply = row.getClass().getMethod("apply", int.class); - - for (int i = 0; i < columns.size(); i++) { - if (!(Boolean) isNullAt.invoke(row, i)) { - msg += apply.invoke(row, i).toString(); - } else { - msg += "null"; - } - if (i != columns.size() - 1) { - msg += "\t"; + try { + for (int r = 0; r < maxResult && r < rows.length; r++) { + Object row = rows[r]; + Method isNullAt = row.getClass().getMethod("isNullAt", int.class); + Method apply = row.getClass().getMethod("apply", int.class); + + for (int i = 0; i < columns.size(); i++) { + if (!(Boolean) isNullAt.invoke(row, i)) { + msg += apply.invoke(row, i).toString(); + } else { + msg += "null"; + } + if (i != columns.size() - 1) { + msg += "\t"; + } } + msg += "\n"; } - msg += "\n"; + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); - } - if (rows.length > maxResult) { - msg += "\nResults are limited by " + maxResult + "."; + if (rows.length > maxResult) { + msg += "\nResults are limited by " + maxResult + "."; + } + sc.clearJobGroup(); + return "%table " + msg; + }finally + { + Thread.currentThread().setContextClassLoader(old); } - sc.clearJobGroup(); - return "%table " + msg; } /**