Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TG-990 feat: Upgrade Spark version to 3.0 #126

Open
wants to merge 60 commits into
base: release-4.2.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
fc66dbe
Issue SB-23962 feat: Github jira integration.
Apr 1, 2021
a61aea8
Merge pull request #114 from manjudr/git-jira-integration
SanthoshVasabhaktula Apr 1, 2021
5d6f83c
TG-990: Upgrade scala to 2.12 and spark to 3.0.0 versions
anandp504 Jun 29, 2021
8bbe762
TG-990: Upgrade hadoop-client to 2.7.4 version
anandp504 Jul 5, 2021
48cf890
TG-990: Remove scala.App extension for JobManager
anandp504 Jul 5, 2021
574ce9c
TG-990: Remove scala.App extension for JobExecutor
anandp504 Jul 5, 2021
54aa386
TG-990: Use spark-cassandra-connector 3.0.1
anandp504 Jul 26, 2021
dfc20cf
Issue #TG-1089 fix: Druid Exhaust jobs refactoring to read config fro…
sowmya-dixit Sep 20, 2021
e32a6bb
Issue #TG-1089 fix: Druid Exhaust jobs refactoring to read config fro…
sowmya-dixit Sep 21, 2021
d6b7c2a
Issue #TG-1089 fix: Coverage improvement
sowmya-dixit Sep 21, 2021
e8fe964
Issue #TG-1089 fix: Coverage improvement
sowmya-dixit Sep 21, 2021
9460ecc
Issue #TG-1089 fix: Coverage improvement
sowmya-dixit Sep 26, 2021
ec4da3e
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
d684248
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
85414b3
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
712e362
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
7c47278
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
6433f43
Issue #TG-1089 fix: Refactoring
sowmya-dixit Sep 26, 2021
4dc5578
Issue #TG-1089 fix: Refactoring and Coverage improvement
sowmya-dixit Sep 26, 2021
a6fb685
Merge pull request #130 from sowmya-dixit/release-4.3.0
SanthoshVasabhaktula Sep 27, 2021
1de5ff3
Revert "Issue #TG-1089 fix: Druid Exhaust jobs refactoring to read co…
sowmya-dixit Sep 28, 2021
6043e0f
Merge pull request #131 from project-sunbird/revert-130-release-4.3.0
RevathiKotla Sep 29, 2021
17c8863
Revert "TG-1089 Revert " Druid Exhaust jobs refactoring to read confi…
sowmya-dixit Oct 4, 2021
c1590bb
Merge pull request #132 from project-sunbird/revert-131-revert-130-re…
sowmya-dixit Oct 4, 2021
6aed810
Issue #TG-1100 fix: Druid On-demand Exhaust Job refactoring - bug fix
sowmya-dixit Oct 4, 2021
3f0bcae
Issue #TG-1100 fix: Druid On-demand Exhaust Job refactoring - test cases
sowmya-dixit Oct 4, 2021
855c560
Issue #TG-1100 fix: Druid On-demand Exhaust Job refactoring - test cases
sowmya-dixit Oct 4, 2021
5e1afe6
Issue #TG-1100 fix: Hot-fix changes patch
sowmya-dixit Oct 4, 2021
39d88b2
Issue #TG-1100 fix: Druid On-demand Exhaust Job refactoring - test cases
sowmya-dixit Oct 4, 2021
772f4c8
Issue #TG-1100 fix: Druid On-demand Exhaust Job refactoring - test cases
sowmya-dixit Oct 5, 2021
72a85c9
Merge pull request #133 from sowmya-dixit/release-4.3.0
RevathiKotla Oct 5, 2021
95fbcc4
Merge pull request #134 from project-sunbird/release-4.3.0
sowmya-dixit Oct 26, 2021
7305f9c
Issue #TG-1116 fix: Enhance druid reports to generate metrics with ou…
sowmya-dixit Oct 27, 2021
1498ba1
Issue #TG-1116 fix: Enhance druid reports to generate metrics with ou…
sowmya-dixit Oct 27, 2021
10636e3
Issue #TG-1116 fix: Enhance druid reports to generate metrics with ou…
sowmya-dixit Nov 2, 2021
95c2b5e
Issue #TG-1116 fix: Fix build issues
sowmya-dixit Nov 2, 2021
b4778b2
Issue #TG-1116 fix: Improve coverage
sowmya-dixit Nov 3, 2021
2402894
Merge pull request #135 from sowmya-dixit/release-4.4.0
RevathiKotla Nov 3, 2021
dc95e41
Issue #TG-1116 fix: Enhance druid reports to generate metrics with ou…
sowmya-dixit Nov 23, 2021
32dbe57
Merge pull request #136 from sowmya-dixit/release-4.4.0
sowmya-dixit Nov 24, 2021
7d943c7
streamQuery issue fix
Dec 3, 2021
11d95c3
Merge branch 'release-4.4.0' of github.com:project-sunbird/sunbird-co…
Dec 10, 2021
cb44698
handle huge amount of data by streamQuery in OnDemandDruidExhaustJob
Dec 10, 2021
0482087
Merge branch 'release-4.4.0' of github.com:shikshalokam/sunbird-core-…
Dec 10, 2021
0e74106
handle huge amount of data by streamQuery in OnDemandDruidExhaustJob
Dec 10, 2021
43a901e
Merge branch 'release-4.4.0' of github.com:shikshalokam/sunbird-core-…
Dec 10, 2021
f3fbf27
handle huge amount of data by streamQuery in OnDemandDruidExhaustJob
Dec 13, 2021
e0e65e4
Issue SB-000 fix: Ignoring the druid query processing model testcases
manjudr Dec 13, 2021
d76dcaf
Merge pull request #140 from manjudr/testcase-fixes
sowmya-dixit Dec 13, 2021
a4f4e73
Merge branch 'release-4.4.0' of github.com:project-sunbird/sunbird-co…
Dec 13, 2021
bddc7b1
remove storage config logs in onDemand
Dec 13, 2021
32f45c5
Merge pull request #139 from shikshalokam/release-4.4.0
sowmya-dixit Dec 13, 2021
105463b
sorting rows in the csv for onDemandDruidExhaust Data Product
Shakthieshwari Feb 9, 2022
a27b7ca
sorting csv none changes
Shakthieshwari Feb 23, 2022
a9cd240
Merge pull request #142 from shikshalokam/release-4.8.0
sowmya-dixit Feb 24, 2022
124250a
add filter logic to the ML data product
Shakthieshwari Apr 23, 2022
63c03fd
SB-29643: Allow JobManager to fetch data from CephS3 storage (#145)
shobhit-vashistha Apr 26, 2022
7078a76
Merge pull request #149 from shikshalokam/release-4.9.0
sowmya-dixit Apr 26, 2022
19b589e
Merge pull request #150 from project-sunbird/release-4.8.0
sowmya-dixit May 6, 2022
9b0f2fe
Issue SB-28788 feat: Upgrade spark to 3.0.0
anandp504 May 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added batch-models/lib/scruid_2.12-2.4.0.jar
Binary file not shown.
58 changes: 39 additions & 19 deletions batch-models/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<version>2.7.4</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -138,24 +138,24 @@
<version>4.1.0</version>
<scope>test</scope>
</dependency>
<!-- -->
<!--
<dependency>
<groupId>optional</groupId>
<artifactId>optional_${scala.version}</artifactId>
<version>1.0</version>
</dependency>

-->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.maj.version}</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.maj.version}</artifactId>
<version>0.11.2</version>
<scope>provided</scope>
<version>3.0.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.scalanlp</groupId>-->
<!-- <artifactId>breeze_${scala.maj.version}</artifactId>-->
<!-- <version>0.11.2</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.maj.version}</artifactId>
Expand All @@ -164,7 +164,7 @@
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<artifactId>spark-core_${scala.maj.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -208,29 +208,31 @@
</dependency>
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.11</artifactId>
<artifactId>lift-json_${scala.maj.version}</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.flyberrycapital</groupId>
<artifactId>scala-slack_2.11</artifactId>
<version>0.1.0</version>
<!--<artifactId>scala-slack_2.11</artifactId>
<version>0.1.0</version> -->
<artifactId>scala-slack_${scala.maj.version}</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.github.java-json-tools</groupId>
<artifactId>json-schema-validator</artifactId>
<version>2.2.8</version>
<scope>test</scope>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.sunbird</groupId>
<artifactId>video-streaming</artifactId>
<version>1.0</version>
</dependency>
</dependency>-->
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>cloud-store-sdk</artifactId>
<version>1.2.6</version>
<artifactId>cloud-store-sdk_${scala.maj.version}</artifactId>
<version>1.3.0</version>
<exclusions>
<exclusion>
<groupId>com.microsoft.azure</groupId>
Expand All @@ -240,24 +242,42 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.maj.version}.0-RC1</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>ing.wbaa.druid</groupId>
<artifactId>scruid_${scala.maj.version}</artifactId>
<version>2.4.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/scruid_2.11-2.4.0.jar</systemPath>
<systemPath>${project.basedir}/lib/scruid_2.12-2.4.0.jar</systemPath>
</dependency>
<dependency>
<groupId>pl.allegro.tech</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.ekstep.analytics.exhaust

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.StructType
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig, JobContext, StorageConfig}
import org.ekstep.analytics.framework.util.CommonUtil
import org.sunbird.cloud.storage.conf.AppConf

trait BaseReportsJob {

def getReportingFrameworkContext()(implicit fc: Option[FrameworkContext]): FrameworkContext = {
fc match {
case Some(value) => {
value
}
case None => {
new FrameworkContext();
}
}
}

def openSparkSession(config: JobConfig): SparkSession = {

val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val sparkCassandraConnectionHost = modelParams.get("sparkCassandraConnectionHost")
val sparkElasticsearchConnectionHost = modelParams.get("sparkElasticsearchConnectionHost")
val sparkRedisConnectionHost = modelParams.get("sparkRedisConnectionHost")
val sparkUserDbRedisIndex = modelParams.get("sparkUserDbRedisIndex")
val sparkUserDbRedisPort = modelParams.get("sparkUserDbRedisPort")
JobContext.parallelization = CommonUtil.getParallelization(config)
val readConsistencyLevel = modelParams.getOrElse("cassandraReadConsistency", "LOCAL_QUORUM").asInstanceOf[String];
val writeConsistencyLevel = modelParams.getOrElse("cassandraWriteConsistency", "LOCAL_QUORUM").asInstanceOf[String]
val sparkSession = CommonUtil.getSparkSession(JobContext.parallelization, config.appName.getOrElse(config.model), sparkCassandraConnectionHost, sparkElasticsearchConnectionHost, Option(readConsistencyLevel),sparkRedisConnectionHost, sparkUserDbRedisIndex, sparkUserDbRedisPort)
setReportsStorageConfiguration(config)(sparkSession)
sparkSession;

}

def setReportsStorageConfiguration(config: JobConfig)(implicit spark: SparkSession) {

val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val store = modelParams.getOrElse("store", "local").asInstanceOf[String];
val storageKey = modelParams.getOrElse("storageKeyConfig", "reports_storage_key").asInstanceOf[String];
val storageSecret = modelParams.getOrElse("storageSecretConfig", "reports_storage_secret").asInstanceOf[String];
store.toLowerCase() match {
case "s3" =>
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getConfig(storageKey));
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AppConf.getConfig(storageSecret));
case "azure" =>
val storageKeyValue = AppConf.getConfig(storageKey);
spark.sparkContext.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
spark.sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.$storageKeyValue.blob.core.windows.net", AppConf.getConfig(storageSecret))
spark.sparkContext.hadoopConfiguration.set(s"fs.azure.account.keyprovider.$storageKeyValue.blob.core.windows.net", "org.apache.hadoop.fs.azure.SimpleKeyProvider")
case _ =>

}

}

def getStorageConfig(config: JobConfig, key: String): StorageConfig = {

val modelParams = config.modelParams.getOrElse(Map[String, Option[AnyRef]]());
val container = modelParams.getOrElse("storageContainer", "reports").asInstanceOf[String]
val storageKey = modelParams.getOrElse("storageKeyConfig", "reports_storage_key").asInstanceOf[String];
val storageSecret = modelParams.getOrElse("storageSecretConfig", "reports_storage_secret").asInstanceOf[String];
val store = modelParams.getOrElse("store", "local").asInstanceOf[String]
StorageConfig(store, container, key, Option(storageKey), Option(storageSecret));
}

}
Loading