Flume NG MongoDB sink. The source was implemented to populate JSON into MongoDB.
- Clone the repository
- Install latest Maven and build source by 'mvn package'
- Generate classpath by 'mvn dependency:build-classpath'
- Append classpath in $FLUME_HOME/conf/flume-env.sh
- Add the sink definition according to Configuration
type: org.riderzen.flume.sink.MongoSink
host: db host [localhost]
port: db port [27017]
username: db username []
password: db password []
model: single or dynamic, single mean all data will insert into the same collection,
and dynamic means every event will specify cllection name by event header 'collection' [single]
db: db name [events]
collection: default collection name, will used in single model [events]
batch: batch size of insert opertion [100]
autoWrap: indicator of wrap the event body as a JSONObject that has one field [false]
wrapField: use with autoWrap, set the field name of JSONObject [log]
timestampField: date type field that record the creating time of record,
it can be a existing filed name that the sink will convert this filed to date type,
or it's a new filed name that the sink will create it automatically []
the supported date pattern as follows:
"yyyy-MM-dd"
"yyyy-MM-dd HH:mm:ss"
"yyyy-MM-dd HH:mm:ss.SSS"
"yyyy-MM-dd HH:mm:ss Z"
"yyyy-MM-dd HH:mm:ss.SSS Z"
"yyyy-MM-dd'T'HH:mm:ssZ"
"yyyy-MM-dd'T'HH:mm:ss.SSSZ"
"yyyy-MM-dd'T'HH:mm:ssz"
"yyyy-MM-dd'T'HH:mm:ss.SSSz"
authenticationEnabled: true means login by username/password, false means login without authentication [false]
username: required when "authenticationEnabled" is true []
password: required when "authenticationEnabled" is true []
agent2.sources = source2
agent2.channels = channel2
agent2.sinks = sink2
agent2.sources.source2.type = org.riderzen.flume.source.MsgPackSource
agent2.sources.source2.bind = localhost
agent2.sources.source2.port = 1985
agent2.sources.source2.channels = channel2
agent2.sinks.sink2.type = org.riderzen.flume.sink.MongoSink
agent2.sinks.sink2.host = localhost
agent2.sinks.sink2.port = 27017
agent2.sinks.sink2.model = single
agent2.sinks.sink2.collection = events
agent2.sinks.sink2.batch = 100
agent2.sinks.sink2.channel = channel2
agent2.channels.channel2.type = memory
agent2.channels.channel2.capacity = 1000000
agent2.channels.channel2.transactionCapacity = 800
agent2.channels.channel2.keep-alive = 3
The sink supports some headers in dynamic model:
'db': db name
'collection' : collection name