Skip to content

Commit

Permalink
Merge pull request #2 from rob/PR_amazon-api
Browse files Browse the repository at this point in the history
PR amazon api
  • Loading branch information
Rob Story authored and GitHub Enterprise committed Sep 21, 2016
2 parents ee0930c + 18fec5e commit bb06aa7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
9 changes: 4 additions & 5 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
(defproject tumalo "0.2.0"
(defproject tumalo "0.2.1"
:description "Clojure Elasticsearch Indexing Tools"
:license {:name "MIT"
:url "https://opensource.org/licenses/MIT"}
:dependencies [[org.clojure/clojure "1.8.0"]
[org.clojure/tools.logging "0.3.1"]
[org.clojure/core.async "0.2.374"]
[ch.qos.logback/logback-classic "1.1.3"]
[amazonica "0.3.59"]
[clojurewerkz/elastisch "2.2.1"]
[clojurewerkz/elastisch "2.2.1" :exclusions [com.fasterxml.jackson.core/jackson-core]]
[com.amazonaws/aws-java-sdk-s3 "1.11.35"]
[prismatic/schema "1.1.1"]
[clj-http "2.2.0"]
[clj-time "0.11.0"]]
:resource-paths ["resources"]
:profiles {:dev {:resource-paths ["resources/test"]}
:uberjar {:aot :all}}
:global-vars {*warn-on-reflection* true}
:javac-options ["-target" "1.8" "-source" "1.8"])
:global-vars {*warn-on-reflection* true})
61 changes: 42 additions & 19 deletions src/tumalo/es.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(ns tumalo.es
(:require [clojure.tools.logging :refer [log logf]]
[clojure.core.async :refer [chan thread close! >!! <!!]]
[amazonica.aws.s3 :as s3]
[clojurewerkz.elastisch.rest.bulk :as esb]
[clojurewerkz.elastisch.rest.document :as esd]
[clojurewerkz.elastisch.rest :as esr]
[schema.core :as s]
[tumalo.schemas :as ts])
(:import [java.util HashMap]
[clojurewerkz.elastisch.rest Connection]
[clojure.core.async.impl.channels ManyToManyChannel]))
[clojure.core.async.impl.channels ManyToManyChannel]
[com.amazonaws.services.s3 AmazonS3Client]
[com.amazonaws.services.s3.model ListObjectsRequest ObjectListing
S3ObjectSummary]))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Connections
Expand Down Expand Up @@ -67,7 +69,7 @@
batch-size :- s/Num]
(let [partitioned-docs (partition-all batch-size docs)]
(doseq [doc-batch partitioned-docs
:let [bulk-batch (esb/bulk-index doc-batch)]]
:let [bulk-batch (esb/bulk-create doc-batch)]]
(esb/bulk-with-index-and-type pool target-index-name target-mapping-type bulk-batch))))


Expand Down Expand Up @@ -113,31 +115,37 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Indexing from S3

(defn reduce-objects-and-streams
(s/defn reduce-objects-and-streams
"Reducer: get the S3 object, process it, concat the returned values and streams to the :values and
:streams sequences in the accumulating map"
[processing-fn accum s3-object]
(let [s3-object (s3/get-object (:bucket-name s3-object) (:key s3-object))
[s3-client :- AmazonS3Client
processing-fn
accum :- [s/Any]
s3-object-summary :- S3ObjectSummary]
(let [bucket-name (.getBucketName s3-object-summary)
key (.getKey s3-object-summary)
s3-object (.getObject s3-client bucket-name key)
processed-obj-and-streams (processing-fn s3-object)]
{:values (concat (:values accum) (:values processed-obj-and-streams))
:streams (concat (:streams accum) (:streams processed-obj-and-streams))}))


(s/defn get-next-s3-object-batch!
"Get next batch of S3 objects and put them on the `chan`"
[s3-chan :- ManyToManyChannel
last-object-batch :- {s/Keyword s/Any}
[s3-client :- AmazonS3Client
s3-chan :- ManyToManyChannel
object-batch :- ObjectListing
processing-fn]
(logf :info "Fetching next batch of %s objects from %s"
(:max-keys last-object-batch) (:bucket-name last-object-batch))
(let [object-summaries (:object-summaries last-object-batch)
reducer (partial reduce-objects-and-streams processing-fn)
(.getMaxKeys object-batch) (.getBucketName object-batch))
(let [object-summaries (.getObjectSummaries object-batch)
reducer (partial reduce-objects-and-streams s3-client processing-fn)
processed-objects (reduce reducer [] object-summaries)]
(log :info "S3 Objects fetched, putting on channel to ES writer!")
(>!! s3-chan processed-objects)
(if (:truncated? last-object-batch)
(get-next-s3-object-batch! s3-chan
(s3/list-next-batch-of-objects last-object-batch)
(if (.isTruncated object-batch)
(get-next-s3-object-batch! s3-client
s3-chan
(.listNextBatchOfObjects s3-client object-batch)
processing-fn)
(do
(log :info "Finished fetching S3 data! Closing channel.")
Expand All @@ -152,11 +160,26 @@
es-batch-size :- s/Num
s3-batch-size :- s/Num
f]
"Index data from S3 to Elasticsearch.
The batches will be fetched from S3 and written to ES in parallel after processing. You
can use es-batch-size and s3-batch-size to balance this parallelism, as each S3 batch
will get written to ES in N batches of es-batch-size.
The argument 'f' must be a function that can accept an AWD SDK S3Object and return the following response shape:
{:values [docs-ready-for-indexing]
:streams [Closeable]}
where docs-ready-for-indexing is a sequence of documents prepared for bulk indexing,
e.g. they contain :_index, :_type, and :_id,"
(let [s3-chan (chan 10)
s3-first-batch (s3/list-objects :bucket-name bucket-name
:prefix prefix
:max-keys s3-batch-size)]
(thread (get-next-s3-object-batch! s3-chan s3-first-batch f))
s3-client (AmazonS3Client.)
object-request (doto (ListObjectsRequest. )
(.setBucketName bucket-name)
(.setPrefix prefix)
(.setMaxKeys (int s3-batch-size)))
s3-first-batch (.listObjects s3-client object-request)]
(thread (get-next-s3-object-batch! s3-client s3-chan s3-first-batch f))
(loop [batches-and-streams (<!! s3-chan)]
(let [bulk-batch (:values batches-and-streams)
streams (:streams batches-and-streams)]
Expand Down

0 comments on commit bb06aa7

Please sign in to comment.