Skip to content

Commit

Permalink
RDB Loader: retry on refused connection (close #8)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Aug 13, 2019
1 parent 8c528a5 commit d667f69
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ object LoaderA {
EitherT(Free.liftF[LoaderA, Either[LoaderError, Unit]](DeleteDir(path)))


/** Block thread for some time */
/** Block thread for some time, milliseconds */
def sleep(timeout: Long): Action[Unit] =
Free.liftF[LoaderA, Unit](Sleep(timeout))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package interpreters
import java.io.IOException
import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes
import java.sql.Connection

import scala.util.control.NonFatal

Expand Down Expand Up @@ -43,6 +44,7 @@ import discovery.ManifestDiscovery
import utils.Common
import implementations._
import com.snowplowanalytics.snowplow.rdbloader.{ Log => ExitLog }
import com.snowplowanalytics.snowplow.rdbloader.loaders.Common.SqlString

/**
* Interpreter performs all actual side-effecting work,
Expand All @@ -67,7 +69,18 @@ class RealWorldInterpreter private[interpreters](

// dbConnection is Either because not required for log dump
// lazy to wait before tunnel established
private lazy val dbConnection = JdbcInterpreter.getConnection(cliConfig.target)
private var dbConnection: Either[LoaderError, Connection] = _

private def getConnection(force: Boolean = false): Either[LoaderError, Connection] = {
if (dbConnection == null) {
dbConnection = JdbcInterpreter.getConnection(cliConfig.target)
}
if (force) {
println("Forcing reconnection to DB")
dbConnection = JdbcInterpreter.getConnection(cliConfig.target)
}
dbConnection
}

private lazy val manifest =
ManifestInterpreter.initialize(cliConfig.target.processingManifest, cliConfig.configYaml.aws.s3.region, utils.Common.DefaultClient) match {
Expand All @@ -82,6 +95,21 @@ class RealWorldInterpreter private[interpreters](
// DB messages that should be printed only to output and if failure is DB-related
private val messagesCopy = collection.mutable.ListBuffer.empty[String]

def executeWithRetry[A](action: Connection => SqlString => Either[LoaderError.StorageTargetError, A])(sql: SqlString) = {
val firstAttempt = for { conn <- getConnection(); r <- action(conn)(sql) } yield r
firstAttempt match {
case Left(LoaderError.StorageTargetError(message)) if message.contains("Connection refused") =>
println(message)
println("Sleeping and making another try")
Thread.sleep(10000)
for {
conn <- getConnection(true)
r <- action(conn)(sql)
} yield r
case other => other
}
}

def run: LoaderA ~> Id = new (LoaderA ~> Id) {

def apply[A](effect: LoaderA[A]): Id[A] = {
Expand Down Expand Up @@ -126,22 +154,18 @@ class RealWorldInterpreter private[interpreters](

case ExecuteUpdate(query) =>
if (query.startsWith("COPY ")) { logCopy(query.split(" ").take(2).mkString(" ")) }
executeWithRetry[Long](JdbcInterpreter.executeUpdate)(query).asInstanceOf[Id[A]]

val result = for {
conn <- dbConnection
res <- JdbcInterpreter.executeUpdate(conn)(query)
} yield res
result.asInstanceOf[Id[A]]
case CopyViaStdin(files, query) =>
for {
conn <- dbConnection
conn <- getConnection()
_ = log(s"Copying ${files.length} files via stdin")
res <- JdbcInterpreter.copyViaStdin(conn, files, query)
} yield res

case ExecuteQuery(query, d) =>
for {
conn <- dbConnection
conn <- getConnection()
res <- JdbcInterpreter.executeQuery(conn)(query)(d)
} yield res

Expand Down Expand Up @@ -180,7 +204,7 @@ class RealWorldInterpreter private[interpreters](
val logs = messages.mkString("\n") + "\n"
TrackerInterpreter.dumpStdout(amazonS3, key, logs)
case Exit(loadResult, dumpResult) =>
dbConnection.foreach(c => c.close())
getConnection().foreach(c => c.close())
TrackerInterpreter.exit(loadResult, dumpResult)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ object JdbcInterpreter {
case NonFatal(e: java.sql.SQLException) if Option(e.getMessage).getOrElse("").contains("is not authorized to assume IAM Role") =>
StorageTargetError("IAM Role with S3 Read permissions is not attached to Redshift instance")
case NonFatal(e) =>
System.err.println("RDB Loader unknown error in executeUpdate")
e.printStackTrace()
println("RDB Loader unknown error in executeUpdate")
e.printStackTrace(System.out)
StorageTargetError(Option(e.getMessage).getOrElse(e.toString))
}

Expand All @@ -65,8 +65,8 @@ object JdbcInterpreter {
}
} catch {
case NonFatal(e) =>
System.err.println("RDB Loader unknown error in executeQuery")
e.printStackTrace()
println("RDB Loader unknown error in executeQuery")
e.printStackTrace(System.out)
StorageTargetError(Option(e.getMessage).getOrElse(e.toString)).asLeft[A]
}

Expand All @@ -75,8 +75,8 @@ object JdbcInterpreter {
Right(conn.setAutoCommit(autoCommit))
} catch {
case e: SQLException =>
System.err.println("setAutocommit error")
e.printStackTrace()
println("setAutocommit error")
e.printStackTrace(System.out)
Left(StorageTargetError(e.toString))
}

Expand Down Expand Up @@ -117,17 +117,27 @@ object JdbcInterpreter {
props.setProperty("user", target.username)
props.setProperty("password", password)


target match {
case r: StorageTarget.RedshiftConfig =>
val url = s"jdbc:redshift://${target.host}:${target.port}/${target.database}"
def connect() =
Either.catchNonFatal(new RedshiftDriver().connect(s"jdbc:redshift://${target.host}:${target.port}/${target.database}", props))

for {
_ <- r.jdbc.validation match {
case Left(error) => error.asLeft
case Right(propertyUpdaters) =>
propertyUpdaters.foreach(f => f(props)).asRight
}
connection <- Either.catchNonFatal(new RedshiftDriver().connect(url, props)).leftMap { x =>
LoaderError.StorageTargetError(x.getMessage)
firstAttempt = connect()
connection <- firstAttempt match {
case Right(c) =>
c.asRight
case Left(e) =>
println("Error during connection acquisition. Sleeping and making another attempt")
e.printStackTrace(System.out)
Thread.sleep(60000)
connect().leftMap(e2 => LoaderError.StorageTargetError(e2.getMessage))
}
} yield connection

Expand All @@ -138,8 +148,8 @@ object JdbcInterpreter {
}
} catch {
case NonFatal(e) =>
System.err.println("RDB Loader getConnection error")
e.printStackTrace()
println("RDB Loader getConnection error")
e.printStackTrace(System.out)
Left(StorageTargetError(s"Problems with establishing DB connection\n${e.getMessage}"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object TrackerInterpreter {
val message = toMsg(response, true)

// The only place in interpreters where println used instead of logger as this is async function
if (message.isEmpty) () else System.err.println(message)
if (message.isEmpty) () else println(message)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object Common {

val BeginTransaction: SqlString = SqlString.unsafeCoerce("BEGIN")
val CommitTransaction: SqlString = SqlString.unsafeCoerce("COMMIT")
val AbortTransaction: SqlString = SqlString.unsafeCoerce("ABORT")

/** ADT representing possible destination of events table */
sealed trait EventsTable { def getDescriptor: String }
Expand Down Expand Up @@ -121,6 +122,31 @@ object Common {
}
}

/**
* Inspect loading result and make an attempt to retry if it failed with "Connection refused"
* @param loadAction set of queries inside a transaction loading atomic and shredded only
* (no vacuum or analyze)
*/
def retryIfFailed(loadAction: LoaderAction[Unit]): LoaderAction[Unit] = {
val retry = loadAction.value.flatMap[Either[LoaderError, Unit]] {
case Left(LoaderError.StorageTargetError(message)) if message.contains("Connection refused") =>
for {
_ <- LoaderA.print(s"Loading failed with [$message], making another attempt")
retransact <- (LoaderA.executeUpdate(Common.AbortTransaction) *> LoaderA.executeUpdate(Common.BeginTransaction)).value
_ <- LoaderA.sleep(60000)
result <- retransact match {
case Right(_) => loadAction.value
case Left(_) => Action.lift(retransact.void)
}
} yield result
case e @ Left(_) =>
LoaderA.print("Loading failed, no retries will be made") *> Action.lift(e)
case success =>
Action.lift(success)
}
LoaderAction(retry)
}

/**
* String representing valid SQL query/statement,
* ready to be executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ package object rdbloader {
*/
type Action[A] = Free[LoaderA, A]

object Action {
def lift[A](value: A): Action[A] =
Free.pure[LoaderA, A](value)
}

/**
* Loading effect, producing value of type `A` with possible `LoaderError`
*
Expand Down

0 comments on commit d667f69

Please sign in to comment.