From 52f3ea7c81f190ce2676fe3bb5d7ea0ab0f51b70 Mon Sep 17 00:00:00 2001 From: Vinoth Kumar Sambath Date: Fri, 7 Feb 2025 11:33:38 +0800 Subject: [PATCH] changes to fetch records --- .../prebuilt-jars/custom-dialect.jar | Bin 971 -> 2975 bytes .../sources/maxCompute/CustomDialect.scala | 21 +++++++++++++++ .../sources/maxCompute/MaxComputeReader.scala | 25 ++++++++++++++---- 3 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/CustomDialect.scala diff --git a/caraml-store-spark/prebuilt-jars/custom-dialect.jar b/caraml-store-spark/prebuilt-jars/custom-dialect.jar index e8bbc779c25b20b27244aa4e53972ff01a0ed51e..f8b26e8f6c02cbf788c3638eb4dd486272ed5670 100644 GIT binary patch literal 2975 zcmaKudpy(oAIBM^%a7{B4skf5P6)}uhA8GXml-xSw^>M$>q19VMDDlAby#j$CU-GJ z%qZk?LP#+(xh|AMe%r6}J3sa7oY(j9{eC}xyq@p(WXPEAen4{wmhx>z}U(0W6BK3!>~>XlK#%jG2)>%+gBHRDZ>^ z>zN5$L9tH&t{~Ub^}G^!;t!QK)05MH9m6brzun5q9(KX`>Ng9&@9h1vgTwk@J>0*k ze*X~%zODLtVSO(DBLMml;CaEz_y4FMf-=Eo7Z=wb`?F4BxLefn_7}BklqMH^*GCC@g zj6%)fsdHORm+a6(a63En!eB}%rIZ@HN|oGv$FIFE$vnNt&`8$NZR<4%$c8{x?1PsA zpD$g1p053Po9V%2!1^tG(RsE|Nfy-R-j42yzAbjz8RD=Cgu|>m%{ra*jmerKy75TE z&utB9wuy34wCWg7e4y-XW+DA9Igy0VT=P_sskn1w1|$9q3VEqm(PveY9?L6Q zNd->aagC~m0@1mdqMmxkiPyP981}g22aN^Eo3=#XO0Q&6q)N4`9UW|r#Kp>asCkwWSLO? zagi}d<>`9iuNlNxfD0g z(7y%jl;pISm4@jt{8!HxCON=z)w#`fBo(pO zM^S6GvCHwe@X7pQZ;!LMB2g3{h9Vd5!Uwky-g}9aR8ne#4W-=z4U3w*i?WpwG|0KN z(56QZp||u_ppaIw700KoAV=W9@Nwegt4@NsT>3d>IDE=8Dm_UE{z8O)7jwK5mzgD=sp#~G1j6gCR=tp8Z;aIKqn3baP~2z8}w(-)_DR58;J zGqa8^g*&H^9t!Hn6dAh}=ZXddJd!=4QeGV0mk`l{Q7XHn@zhvN6Hwn<8M|v5{GN%s zm@sJURA{41?$&guc@o_UTE*QU6<;F>4eVc7tD&W`f&k$x?(ouADlLp%6#0`Hl6xX1 zUBnB#%$`EdX?6G3@VFix2g=9cx^7v-SV{`5eCmR7FLWe$P~B*}M-_yIjQGrnQ?L}< z`XuitQmuVFB4j~vghk@l%Ced75S0Lz{du-IvF)2xC3PEyk-`OKk%VF3Y-DMi zpGC7KNTe%LGz3hZS=L=slh>g;0_WZy!Fb+x45>?!?e<*jsOG_-$hqgn((h>>|K z=Qyb-Y(t?ijsTBi)0Y9|HsGn8Q_|Kszc@Fc-=MBy4N@xNYUFg<1)L0=PD`Wt=QB$TqdBQJL)cu>*-99o}I8@JVvmBM6pzLgJ96Xkn1;W0EMrG5Af z=>GiePaP)5l$Ubq9NP`%RBmUi9ksl&^n{>b{ma}z!09;i{yO#PoFeGwRYjP>k?E2& zS^t_Dkq?7c%XEx-j>CZx&&6i#-J~9GrDT7A?6q&6iQKXzIfm`YeI28OL>0FhL7aMe zEs43pAB8@pvV^-Q(6aDP0GM_DQNxd+!JchV4;f;hu@*H^=7g^5-cS24`;Dbywn~x)RM+v)`}eFpSFdlT+;Es267tlej%jw6 zo2h*%LNHijT9gfmFN1)CsuwLopH}1(V|~mr6r59;CqSuHzh_F55w=x^=Q7duBQ7#{ z*tw=$yBZm5xn7Gg0)jRbE2`{ZaiHePwx2irSixB&RQ@sm-Jq9RQc|TOW|Hy6*31I7 z&@BC%!iK-DOe*Qv;X1-S{NU46-mw#i`y)BKrzAibk71QYhXlaydEyEM)a{FM_%`FR z1@iU7XwuheQmlKUwCeXt#Cj!dDsR2LsK$gE(bYbng0gedU77_NZufjXC6W0+(+48~ zsY3tmKBTos=N35?A3oHP#y|X;-$gRl{~}MQfB6izz68#%&pRgM&OtGC{I<1-HDDXVCu7dlE;RGLt@P3-7nG7TB4?onwSt>WxGTBzW~`2Ph93#D-JR&I$6Ev+9ldVp2Y*6 z#bsU1pu9r$E@*gl34-PRWV9X>q1``hc+xagDe1k&>OG>HTt-MEdF@``|gue>dX?R)~ND`8X&)`)Xo?&p7_g^>2mb;idop literal 971 zcmWIWW@h1HVBlb2P|LN6Vn6~cKz4F|u6_Vc72Hr2$%#dYxjA?g3ZN>?Pbnw{s}^D4 z;NW18L9b*IunNWO+BiK$IQ{zFyCJXUe-M;;z!bz^!yw-(A-sA^rsIuIXWo6D z{rvgNzaKyUux@C-p|`N%v}=g>j;7V2ht^d`d@X2R_#r~;R_Pc&09o zvVP6uAID{WVWO^hfOgW^<5trovKGp*E1K8HuxBO~xXnono?Ul9SdPynK{(H?(YB-c zi}gWnF7-X9)?^#y%9@|sH#INdP;O3u@b{Z`hfmGucZps-VfPo=wH+&DD)&vEbt`Gb zf5FMG0{UH+S?b<;xbX6`6VA=gE?=xN@s#)zyR;?q_?@UkNxOOPa+VqX{r_@GPHof; ztNU;Fi@uXpeZ=(ZckqSR_a@9LOlz6xGW&R8xYfl^7nUshH{HB#YyIa9jg?kAUruHB z?q@j4dh4o&@u|`J}0{M)*Vn?G>}c(Zdf{`r>tk&%I+ikX2S zz?+eYivf4y1tudT07=TYb$}8x0w@8QARX|ejawrqaU*~?kcp%bn$#JYL>Lf>9oZA0 e#Et-!Kqk6wEJ;7Wo0Scui3JFM0qNaLARYk2|4uFd diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/CustomDialect.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/CustomDialect.scala new file mode 100644 index 0000000..4c8eb6b --- /dev/null +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/CustomDialect.scala @@ -0,0 +1,21 @@ +package dev.caraml.spark.sources.maxCompute + +import org.apache.spark.sql.jdbc.JdbcDialect + +class CustomDialect extends JdbcDialect { + override def canHandle(url: String): Boolean = { + println("can handle? this one") + println(url.startsWith("jdbc:odps"), url) + url.startsWith("jdbc:odps") + } + + override def quoteIdentifier(colName: String): String = { + println("inside quote identifier", colName, s"$colName") + s"$colName" + } + + override def getSchemaQuery(table: String): String = { + println("getschemaquery", table) + table + } +} \ No newline at end of file diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/MaxComputeReader.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/MaxComputeReader.scala index b7b12e4..9095922 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/MaxComputeReader.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/sources/maxCompute/MaxComputeReader.scala @@ -1,11 +1,10 @@ package dev.caraml.spark.sources.maxCompute -import dev.caraml.spark.{MaxComputeSource} - +import dev.caraml.spark.MaxComputeSource import org.joda.time.DateTime import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{SQLContext} import org.apache.spark.sql.jdbc.JdbcDialects -import com.caraml.odps.CustomDialect object MaxComputeReader { def createBatchSource( @@ -19,23 +18,39 @@ object MaxComputeReader { val maxComputeJDBCConnectionURL = "jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project - val sqlQuery = "select * from %s.%s where to_millis(%s) > %d and to_millis(%s) < %d" format ( + val sqlQuery = "(select * from `%s.%s` where to_millis(%s) > %d and to_millis(%s) < %d)" format ( source.dataset, source.table, source.eventTimestampColumn, start.getMillis, source.eventTimestampColumn, end.getMillis ) +// val sqlQuery = "(select * from `%s.%s`)" format ( +// source.dataset, source.table +// ) + println("query is", sqlQuery) + + println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project)) val customDialect = new CustomDialect() JdbcDialects.registerDialect(customDialect) + println("custom dialect registered") + println(JdbcDialects.get("jdbc:odps:https://service.ap-southeast-5.maxcompute.aliyun.com/api/?project=%s" format source.project)) val data = sparkSession.read .format("jdbc") .option("url", maxComputeJDBCConnectionURL) // Not setting queryTimeout will fail the query, whereas setting it up actually doesn't make an impact .option("queryTimeout", 5000) - .option("query", sqlQuery) + .option("dbtable", sqlQuery) +// ,option("query", sqlQuery) .option("user", maxComputeAccessID) .option("password", maxComputeAccessKey) .load() + println(data) + println(data.toDF().show(3)) + +// data.toDF().registerTempTable("temp_table") +// val valres = sparkSession.sql("select * from temp_table") +// +// println("result from query", valres) data.toDF() } }