From 0527302c0adc4430d9e1c4cf9f898793b187da58 Mon Sep 17 00:00:00 2001 From: Rob Story Date: Tue, 20 Sep 2016 12:45:12 -0400 Subject: [PATCH 1/2] REF: Update to 0.2.1, which includes migrating to the AWS Java SDK to avoid some amazonica bugs --- project.clj | 6 ++--- src/tumalo/es.clj | 61 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/project.clj b/project.clj index 695f3ec..e265e86 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject tumalo "0.2.0" +(defproject tumalo "0.2.1" :description "Clojure Elasticsearch Indexing Tools" :license {:name "MIT" :url "https://opensource.org/licenses/MIT"} @@ -6,8 +6,8 @@ [org.clojure/tools.logging "0.3.1"] [org.clojure/core.async "0.2.374"] [ch.qos.logback/logback-classic "1.1.3"] - [amazonica "0.3.59"] - [clojurewerkz/elastisch "2.2.1"] + [clojurewerkz/elastisch "2.2.1" :exclusions [com.fasterxml.jackson.core/jackson-core]] + [com.amazonaws/aws-java-sdk-s3 "1.11.35"] [prismatic/schema "1.1.1"] [clj-http "2.2.0"] [clj-time "0.11.0"]] diff --git a/src/tumalo/es.clj b/src/tumalo/es.clj index f960567..95974fe 100644 --- a/src/tumalo/es.clj +++ b/src/tumalo/es.clj @@ -1,7 +1,6 @@ (ns tumalo.es (:require [clojure.tools.logging :refer [log logf]] [clojure.core.async :refer [chan thread close! >!! !! s3-chan processed-objects) - (if (:truncated? last-object-batch) - (get-next-s3-object-batch! s3-chan - (s3/list-next-batch-of-objects last-object-batch) + (if (.isTruncated object-batch) + (get-next-s3-object-batch! s3-client + s3-chan + (.listNextBatchOfObjects s3-client object-batch) processing-fn) (do (log :info "Finished fetching S3 data! Closing channel.") @@ -152,11 +160,26 @@ es-batch-size :- s/Num s3-batch-size :- s/Num f] + "Index data from S3 to Elasticsearch. + + The batches will be fetched from S3 and written to ES in parallel after processing. You + can use es-batch-size and s3-batch-size to balance this parallelism, as each S3 batch + will get written to ES in N batches of es-batch-size. + + The argument 'f' must be a function that can accept an AWD SDK S3Object and return the following response shape: + {:values [docs-ready-for-indexing] + :streams [Closeable]} + + where docs-ready-for-indexing is a sequence of documents prepared for bulk indexing, + e.g. they contain :_index, :_type, and :_id," (let [s3-chan (chan 10) - s3-first-batch (s3/list-objects :bucket-name bucket-name - :prefix prefix - :max-keys s3-batch-size)] - (thread (get-next-s3-object-batch! s3-chan s3-first-batch f)) + s3-client (AmazonS3Client.) + object-request (doto (ListObjectsRequest. ) + (.setBucketName bucket-name) + (.setPrefix prefix) + (.setMaxKeys (int s3-batch-size))) + s3-first-batch (.listObjects s3-client object-request)] + (thread (get-next-s3-object-batch! s3-client s3-chan s3-first-batch f)) (loop [batches-and-streams ( Date: Wed, 21 Sep 2016 10:34:51 -0400 Subject: [PATCH 2/2] REF: Don't be so heavy handed with java version --- project.clj | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/project.clj b/project.clj index e265e86..fbcea2e 100644 --- a/project.clj +++ b/project.clj @@ -14,5 +14,4 @@ :resource-paths ["resources"] :profiles {:dev {:resource-paths ["resources/test"]} :uberjar {:aot :all}} - :global-vars {*warn-on-reflection* true} - :javac-options ["-target" "1.8" "-source" "1.8"]) + :global-vars {*warn-on-reflection* true})