Skip to content

Commit

Permalink
changes to fetch records
Browse files Browse the repository at this point in the history
  • Loading branch information
vinoth-gojek committed Feb 7, 2025
1 parent a9853b8 commit 52f3ea7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
Binary file modified caraml-store-spark/prebuilt-jars/custom-dialect.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package dev.caraml.spark.sources.maxCompute

import org.apache.spark.sql.jdbc.JdbcDialect

class CustomDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = {
println("can handle? this one")
println(url.startsWith("jdbc:odps"), url)
url.startsWith("jdbc:odps")
}

override def quoteIdentifier(colName: String): String = {
println("inside quote identifier", colName, s"$colName")
s"$colName"
}

override def getSchemaQuery(table: String): String = {
println("getschemaquery", table)
table
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package dev.caraml.spark.sources.maxCompute

import dev.caraml.spark.{MaxComputeSource}

import dev.caraml.spark.MaxComputeSource
import org.joda.time.DateTime
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{SQLContext}
import org.apache.spark.sql.jdbc.JdbcDialects
import com.caraml.odps.CustomDialect

object MaxComputeReader {
def createBatchSource(
Expand All @@ -19,23 +18,39 @@ object MaxComputeReader {
val maxComputeJDBCConnectionURL =
"jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project

val sqlQuery = "select * from %s.%s where to_millis(%s) > %d and to_millis(%s) < %d" format (
val sqlQuery = "(select * from `%s.%s` where to_millis(%s) > %d and to_millis(%s) < %d)" format (
source.dataset, source.table, source.eventTimestampColumn, start.getMillis, source.eventTimestampColumn, end.getMillis
)

// val sqlQuery = "(select * from `%s.%s`)" format (
// source.dataset, source.table
// )
println("query is", sqlQuery)

println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project))
val customDialect = new CustomDialect()
JdbcDialects.registerDialect(customDialect)
println("custom dialect registered")
println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project))

val data = sparkSession.read
.format("jdbc")
.option("url", maxComputeJDBCConnectionURL)
// Not setting queryTimeout will fail the query, whereas setting it up actually doesn't make an impact
.option("queryTimeout", 5000)
.option("query", sqlQuery)
.option("dbtable", sqlQuery)
// ,option("query", sqlQuery)
.option("user", maxComputeAccessID)
.option("password", maxComputeAccessKey)
.load()

println(data)
println(data.toDF().show(3))

// data.toDF().registerTempTable("temp_table")
// val valres = sparkSession.sql("select * from temp_table")
//
// println("result from query", valres)
data.toDF()
}
}

0 comments on commit 52f3ea7

Please sign in to comment.