From fe07a147bfc76e660fd1954b29c3f850c3a7a5e8 Mon Sep 17 00:00:00 2001 From: Brian Palmer Date: Mon, 2 Mar 2015 17:20:33 -0700 Subject: [PATCH] initial version --- .gitignore | 15 +++++ Gemfile | 4 ++ LICENSE.txt | 22 +++++++ README.md | 31 +++++++++ Rakefile | 2 + lib/logstash/inputs/kinesis.rb | 90 ++++++++++++++++++++++++++ lib/logstash/inputs/kinesis/version.rb | 7 ++ logstash-input-kinesis.gemspec | 33 ++++++++++ 8 files changed, 204 insertions(+) create mode 100644 .gitignore create mode 100644 Gemfile create mode 100644 LICENSE.txt create mode 100644 README.md create mode 100644 Rakefile create mode 100644 lib/logstash/inputs/kinesis.rb create mode 100644 lib/logstash/inputs/kinesis/version.rb create mode 100644 logstash-input-kinesis.gemspec diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2b2c322 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +/.bundle/ +/.yardoc +/Gemfile.lock +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ +*.bundle +*.so +*.o +*.a +mkmf.log +lib/logstash-input-kinesis_jars.rb diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..3d5dafc --- /dev/null +++ b/Gemfile @@ -0,0 +1,4 @@ +source 'https://rubygems.org' + +# Specify your gem's dependencies in logstash-input-kinesis.gemspec +gemspec diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..7de2d50 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,22 @@ +Copyright (c) 2015 Brian Palmer + +MIT License + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..36b3451 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# Logstash::Input::Kinesis + +TODO: Write a gem description + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem 'logstash-input-kinesis' +``` + +And then execute: + + $ bundle + +Or install it yourself as: + + $ gem install logstash-input-kinesis + +## Usage + +TODO: Write usage instructions here + +## Contributing + +1. Fork it ( https://github.com/[my-github-username]/logstash-input-kinesis/fork ) +2. Create your feature branch (`git checkout -b my-new-feature`) +3. Commit your changes (`git commit -am 'Add some feature'`) +4. Push to the branch (`git push origin my-new-feature`) +5. Create a new Pull Request diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..809eb56 --- /dev/null +++ b/Rakefile @@ -0,0 +1,2 @@ +require "bundler/gem_tasks" + diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb new file mode 100644 index 0000000..7710e1f --- /dev/null +++ b/lib/logstash/inputs/kinesis.rb @@ -0,0 +1,90 @@ +# encoding: utf-8 +require "logstash/inputs/base" +require "logstash/errors" +require "logstash/environment" +require "logstash/namespace" + +require "logstash/inputs/kinesis/version" +require 'logstash-input-kinesis_jars' + +class LogStash::Inputs::Kinesis < LogStash::Inputs::Base + KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker + + config_name 'kinesis' + milestone 1 + + # The application name used for the dynamodb coordination table. Must be + # unique for this kinesis stream. + config :application_name, :validate => :string, :default => "logstash" + + # The kinesis stream name. + config :kinesis_stream_name, :validate => :string, :required => true + + def register + # the INFO log level is extremely noisy in KCL + org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis"). + logger.setLevel(java.util.logging::Level::WARNING) + + worker_id = java.util::UUID.randomUUID.to_s + creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new() + @config = KCL::KinesisClientLibConfiguration.new( + @application_name, + @kinesis_stream_name, + creds, + worker_id).withInitialPositionInStream(KCL::InitialPositionInStream::TRIM_HORIZON) + end + + def run(output_queue) + worker_factory = WorkerFactory.new(@codec, output_queue, method(:decorate)) + @worker = KCL::Worker.new(worker_factory, @config) + @worker.run() + end + + def teardown + @worker.shutdown if @worker + end + + class WorkerFactory + include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessorFactory + def initialize(codec, output_queue, decorator) + @codec = codec + @output_queue = output_queue + @decorator = decorator + end + + def createProcessor + Worker.new(@codec.clone, @output_queue, @decorator) + end + end + + class Worker + include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor + + def initialize(*args) + # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor + if !@constructed + @codec, @output_queue, @decorator = args + @constructed = true + else + _shard_id, _ = args + @decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder() + end + end + + def processRecords(records, checkpointer) + records.each { |record| process_record(record) } + checkpointer.checkpoint() + end + + def shutdown(checkpointer, reason) + end + + def process_record(record) + raw = @decoder.decode(record.getData).to_s + @codec.decode(raw) do |event| + @decorator.call(event) + @output_queue << event + end + end + end +end diff --git a/lib/logstash/inputs/kinesis/version.rb b/lib/logstash/inputs/kinesis/version.rb new file mode 100644 index 0000000..9d237cc --- /dev/null +++ b/lib/logstash/inputs/kinesis/version.rb @@ -0,0 +1,7 @@ +module Logstash + module Input + module Kinesis + VERSION = "1.0.0" + end + end +end diff --git a/logstash-input-kinesis.gemspec b/logstash-input-kinesis.gemspec new file mode 100644 index 0000000..06e7d94 --- /dev/null +++ b/logstash-input-kinesis.gemspec @@ -0,0 +1,33 @@ +# coding: utf-8 +lib = File.expand_path('../lib', __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require 'logstash/inputs/kinesis/version' + +Gem::Specification.new do |spec| + spec.name = "logstash-input-kinesis" + spec.version = Logstash::Input::Kinesis::VERSION + spec.authors = ["Brian Palmer"] + spec.email = ["brian@codekitchen.net"] + spec.summary = %q{Logstash plugin for Kinesis input} + spec.description = %q{This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program} + spec.homepage = "" + spec.license = "MIT" + + spec.files = %w[Gemfile LICENSE.txt README.md Rakefile] + Dir.glob("lib/logstash/inputs/**/*") + spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) + spec.require_paths = ["lib"] + + # Special flag to let us know this is actually a logstash plugin + spec.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" } + + spec.platform = 'java' + + spec.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.2.1'" + + spec.add_runtime_dependency 'jar-dependencies', '0.1.7' + spec.add_runtime_dependency 'ruby-maven', '3.1.1.0.8' + spec.add_runtime_dependency "maven-tools", '1.0.7' + + spec.add_development_dependency "bundler", "~> 1.7" + spec.add_development_dependency "rake", "~> 10.0" +end