diff --git a/pom.xml b/pom.xml
index 40ec64c..c51f879 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,11 @@
commons-lang3
3.4
+
+ org.neo4j.driver
+ neo4j-java-driver
+ 1.0.3
+
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java b/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java
index a39c716..1b71cc1 100644
--- a/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java
@@ -1,14 +1,15 @@
package org.graylog.plugins.outputs.neo4j;
-import com.floreysoft.jmte.Engine;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang3.StringUtils;
-import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.graylog.plugins.outputs.neo4j.transport.INeo4jTransport;
+import org.graylog.plugins.outputs.neo4j.transport.Neo4jTransports;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
+import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
@@ -16,16 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
+
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,37 +26,40 @@ public class Neo4jOutput implements MessageOutput {
private Configuration configuration;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
- private WebTarget cypher;
+ private INeo4jTransport transport;
- private static final String CK_NEO4J_URL = "neo4j_url";
- private static final String CK_NEO4J_STARTUP_QUERY = "neo4j_startup_query";
- private static final String CK_NEO4J_QUERY = "neo4j_query";
- private static final String CK_NEO4J_USER = "neo4j_user";
- private static final String CK_NEO4J_PASSWORD = "neo4j_password";
+ public static final String CK_PROTOCOL = "neo4j_protocol";
+ public static final String CK_NEO4J_URL = "neo4j_url";
+ public static final String CK_NEO4J_STARTUP_QUERY = "neo4j_startup_query";
+ public static final String CK_NEO4J_QUERY = "neo4j_query";
+ public static final String CK_NEO4J_USER = "neo4j_user";
+ public static final String CK_NEO4J_PASSWORD = "neo4j_password";
@Inject
- public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration configuration) throws MessageOutputConfigurationException {
- this.configuration = configuration;
+ public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration config) throws MessageOutputConfigurationException {
+ configuration = config;
+ final Neo4jTransports transportSelection;
+ switch (configuration.getString(CK_PROTOCOL).toUpperCase(Locale.ENGLISH)) {
+ case "BOLT":
+ transportSelection = Neo4jTransports.BOLT;
+ break;
+ case "HTTP":
+ transportSelection = Neo4jTransports.HTTP;
+ break;
+
+ default:
+ throw new MessageOutputConfigurationException("Unknown protocol " + configuration.getString(CK_PROTOCOL));
+ }
- URI databaseUri;
try {
- URL baseUrl = new URL(configuration.getString(CK_NEO4J_URL));
- databaseUri = new URL(baseUrl, StringUtils.removeEnd(baseUrl.getPath(), "/") + "/transaction/commit").toURI();
- } catch (URISyntaxException e) {
- throw new MessageOutputConfigurationException("Syntax error in neo4j URL");
- } catch (MalformedURLException e) {
- throw new MessageOutputConfigurationException("Malformed neo4j URL");
+ transport = Neo4jTransports.create(transportSelection, configuration);
+ } catch (Exception e) {
+ final String error = "Error initializing " + INeo4jTransport.class;
+ LOG.error(error, e);
+ throw new MessageOutputConfigurationException(error);
}
- HttpAuthenticationFeature auth = HttpAuthenticationFeature.basic
- (configuration.getString(CK_NEO4J_USER), configuration.getString(CK_NEO4J_PASSWORD));
-
- Client client = ClientBuilder.newClient();
- client.register(auth);
- cypher = client.target(databaseUri);
- if (! configuration.getString(CK_NEO4J_STARTUP_QUERY).isEmpty()) {
- postQuery(parseQuery(configuration.getString(CK_NEO4J_STARTUP_QUERY)));
- }
+ LOG.info("Neo4j output started!");
isRunning.set(true);
}
@@ -75,17 +70,7 @@ public boolean isRunning() {
@Override
public void write(Message message) throws Exception {
- Iterator messageFields = message.getFields().entrySet().iterator();
- Map model = new HashMap<>();
- while (messageFields.hasNext()) {
- Map.Entry pair = (Map.Entry) messageFields.next();
- model.put(String.valueOf(pair.getKey()), String.valueOf(pair.getValue()));
- }
- Engine engine = new Engine();
- String queryString = engine.transform(configuration.getString(CK_NEO4J_QUERY), model);
- List> query = parseQuery(queryString);
-
- postQuery(query);
+ transport.send(message);
}
@Override
@@ -96,42 +81,10 @@ public void write(List messages) throws Exception {
}
- private List> parseQuery(String queryString) {
- List> query = new ArrayList<>();
-
- queryString = queryString.replace("\n", " ");
- queryString = queryString.replace("\t", " ");
- queryString = queryString.replace("\r", "");
- queryString = queryString.replace(";", "");
-
- HashMap statement = new HashMap<>();
- statement.put("statement", queryString);
- query.add(statement);
-
- return query;
- }
-
- private void postQuery(List> queries) {
- HashMap>> payload = new HashMap<>();
- payload.put("statements", queries);
- Response response = cypher
- .request(MediaType.APPLICATION_JSON)
- .post(Entity.entity(payload, MediaType.APPLICATION_JSON), Response.class);
-
- String result = String.format(
- "POST [%s] to [%s], status code [%d], headers: %s, returned data: %s",
- payload, configuration.getString(CK_NEO4J_URL), response.getStatus(), response.getHeaders(),
- response);
- if (response.getStatus() >= 400) {
- LOG.info(result);
- } else {
- LOG.debug(result);
- }
- }
-
@Override
public void stop() {
LOG.info("Stopping Neo4j output");
+ transport.stop();
isRunning.set(false);
}
@@ -151,9 +104,18 @@ public static class Config extends MessageOutput.Config {
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest configurationRequest = new ConfigurationRequest();
+ final Map protocols = ImmutableMap.of(
+ "HTTP", "HTTP",
+ "Bolt", "Bolt");
+
+ configurationRequest.addField(new DropdownField(
+ CK_PROTOCOL, "Neo4J Protocol", "HTTP", protocols,
+ "The protocol used to connect to Neo4J",
+ ConfigurationField.Optional.NOT_OPTIONAL));
+
configurationRequest.addField(new TextField(
CK_NEO4J_URL, "Neo4j URL", "http://localhost:7474/db/data",
- "URL to Neo4j",
+ "default for Bolt: bolt://localhost:7687/",
ConfigurationField.Optional.NOT_OPTIONAL)
);
@@ -167,7 +129,9 @@ public ConfigurationRequest getRequestedConfiguration() {
CK_NEO4J_QUERY, "Cypher query",
"MERGE (source:HOST { address: '${source}' })\n" +
"MERGE (user_id:USER { user_id: '${user_id}'})\nMERGE (source)-[:CONNECT]->(user_id)",
- "Query will be executed on every log message. Use template substitutions to access message fields: ${took_ms}",
+ "Query will be executed on every log message.\n" +
+ "HTTP Mode: Use template substitutions to access message fields: ${took_ms}\n" +
+ "Bolt Mode: Use curly brackets only to access message fields: {took_ms}",
ConfigurationField.Optional.NOT_OPTIONAL, TextField.Attribute.TEXTAREA)
);
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutputMetaData.java b/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutputMetaData.java
index 963396c..1e6ec79 100644
--- a/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutputMetaData.java
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutputMetaData.java
@@ -21,7 +21,7 @@ public String getName() {
@Override
public String getAuthor() {
- return "Graylog, Inc.";
+ return "Graylog, Inc., adapted by Rainer Cording";
}
@Override
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/transport/INeo4jTransport.java b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/INeo4jTransport.java
new file mode 100644
index 0000000..98f0729
--- /dev/null
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/INeo4jTransport.java
@@ -0,0 +1,33 @@
+package org.graylog.plugins.outputs.neo4j.transport;
+
+import org.graylog2.plugin.Message;
+
+/**
+ * Created by dev on 16/08/16.
+ */
+public interface INeo4jTransport {
+ /**
+ * Sends the given message to the remote host. This blocks until there is sufficient capacity to
+ * process the message. It is not guaranteed that the message has been sent once the method call returns because
+ * a queue might be used to dispatch the message.
+ *
+ * @param message message to send to the remote host
+ * @throws InterruptedException
+ */
+ public void send(Message message) throws InterruptedException;
+
+ /**
+ * Tries to send the given message to the remote host. It does not block if there is not enough
+ * capacity to process the message. It is not guaranteed that the message has been sent once the method call
+ * returns because a queue might be used to dispatch the message.
+ *
+ * @param message message to send to the remote host
+ * @return true if the message could be dispatched, false otherwise
+ */
+ public boolean trySend(Message message);
+
+ /**
+ * Stops the transport. Should be used to gracefully shutdown the backend.
+ */
+ public void stop();
+}
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JBoltTransport.java b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JBoltTransport.java
new file mode 100644
index 0000000..939944e
--- /dev/null
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JBoltTransport.java
@@ -0,0 +1,126 @@
+package org.graylog.plugins.outputs.neo4j.transport;
+
+import org.graylog.plugins.outputs.neo4j.Neo4jOutput;
+import org.graylog2.plugin.Message;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
+import org.neo4j.driver.v1.*;
+import org.neo4j.driver.v1.exceptions.ClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Created by dev on 16/08/16.
+ */
+public class Neo4JBoltTransport implements INeo4jTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(Neo4JBoltTransport.class);
+
+ private Driver driver;
+ private Configuration configuration;
+ private String parsedCreateQery = null;
+ List fields;
+
+
+ public Neo4JBoltTransport(Configuration config) throws MessageOutputConfigurationException {
+
+
+ configuration = config;
+ fields = new LinkedList();;
+ Session session = null;
+
+ try {
+ driver = GraphDatabase.driver( config.getString(Neo4jOutput.CK_NEO4J_URL),
+ AuthTokens.basic(config.getString(Neo4jOutput.CK_NEO4J_USER), config.getString(Neo4jOutput.CK_NEO4J_PASSWORD)) );
+ session = driver.session();
+
+ //run initialization query only once
+ String createQueryOnce = config.getString(Neo4jOutput.CK_NEO4J_STARTUP_QUERY);
+
+ if (createQueryOnce.length() > 0)
+ session.run(createQueryOnce);
+ }
+ catch (ClientException e){
+ throw new MessageOutputConfigurationException("Malformed neo4j configuration: " + e );
+ }
+ finally {
+ session.close();
+ }
+ //get message fields needed by cypher query
+ String createQuery = config.getString(Neo4jOutput.CK_NEO4J_QUERY);
+ LOG.debug("Bolt protocol, create query: " + createQuery);
+
+ Matcher m = Pattern.compile("\\{([^{}]*)\\}").matcher(createQuery);
+ while (m.find()) {
+ fields.add(m.group(1));
+ LOG.debug("Found field in cypher statement: " + m.group(1));
+ }
+ LOG.info("Identified " + fields.size() + " fields in graph create query.");
+
+ parsedCreateQery = parseQuery(createQuery);
+
+
+ }
+
+ private void postQuery(String query, Map mapping) {
+ Session session = null;
+ try {
+ session = driver.session();
+ session.run(query, mapping).consume();
+ }
+ catch (ClientException e) {
+ LOG.debug("Could not push message to Graph Database: " + e.getMessage());
+ }
+ finally{
+ if (session != null && session.isOpen())
+ session.close();
+ }
+ }
+
+ private String parseQuery(String queryString) {
+
+ queryString = queryString.replace("\n", " ");
+ queryString = queryString.replace("\t", " ");
+ queryString = queryString.replace("\r", "");
+ queryString = queryString.replace(";", "");
+
+ return queryString;
+ }
+
+ @Override
+ public void send(Message message) throws InterruptedException {
+
+ HashMap convertedFields = new HashMap(){};
+
+ for (String field : fields){
+ if (message.hasField(field)) {
+ Object valueForField = message.getField(field);
+ convertedFields.put(field, String.valueOf(valueForField));
+ }
+ else
+ {
+ convertedFields.put(field, null);
+ }
+ }
+ postQuery(parsedCreateQery, convertedFields);
+
+ }
+
+
+
+
+
+ @Override
+ public boolean trySend(Message message) {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+ driver.close();
+ }
+}
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JHttpTransport.java b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JHttpTransport.java
new file mode 100644
index 0000000..3e4ab63
--- /dev/null
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4JHttpTransport.java
@@ -0,0 +1,114 @@
+package org.graylog.plugins.outputs.neo4j.transport;
+
+import com.floreysoft.jmte.Engine;
+import org.apache.commons.lang3.StringUtils;
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.graylog.plugins.outputs.neo4j.Neo4jOutput;
+import org.graylog2.plugin.Message;
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * Created by dev on 16/08/16.
+ */
+public class Neo4JHttpTransport implements INeo4jTransport {
+ private static final Logger LOG = LoggerFactory.getLogger(Neo4JHttpTransport.class);
+
+ private WebTarget cypher;
+ private Configuration configuration;
+
+ public Neo4JHttpTransport(Configuration config) throws MessageOutputConfigurationException {
+
+ this.configuration = config;
+ URI databaseUri;
+ try {
+ URL baseUrl = new URL(configuration.getString(Neo4jOutput.CK_NEO4J_URL));
+ databaseUri = new URL(baseUrl, StringUtils.removeEnd(baseUrl.getPath(), "/") + "/transaction/commit").toURI();
+ } catch (URISyntaxException e) {
+ throw new MessageOutputConfigurationException("Syntax error in neo4j URL");
+ } catch (MalformedURLException e) {
+ throw new MessageOutputConfigurationException("Malformed neo4j URL: " + e );
+ }
+
+ HttpAuthenticationFeature auth = HttpAuthenticationFeature.basic
+ (configuration.getString(Neo4jOutput.CK_NEO4J_USER), configuration.getString(Neo4jOutput.CK_NEO4J_PASSWORD));
+
+ Client client = ClientBuilder.newClient();
+ client.register(auth);
+ cypher = client.target(databaseUri);
+ if (!configuration.getString(Neo4jOutput.CK_NEO4J_STARTUP_QUERY).isEmpty()) {
+ postQuery(parseQuery(configuration.getString(Neo4jOutput.CK_NEO4J_STARTUP_QUERY)));
+ }
+ }
+ private void postQuery(List> queries) {
+ HashMap>> payload = new HashMap<>();
+ payload.put("statements", queries);
+ Response response = cypher
+ .request(MediaType.APPLICATION_JSON)
+ .post(Entity.entity(payload, MediaType.APPLICATION_JSON), Response.class);
+
+ String result = String.format(
+ "POST [%s] to [%s], status code [%d], headers: %s, returned data: %s",
+ payload, configuration.getString(Neo4jOutput.CK_NEO4J_URL), response.getStatus(), response.getHeaders(),
+ response);
+ if (response.getStatus() >= 400) {
+ LOG.info(result);
+ } else {
+ LOG.debug(result);
+ }
+ }
+
+ private List> parseQuery(String queryString) {
+ List> query = new ArrayList<>();
+
+ queryString = queryString.replace("\n", " ");
+ queryString = queryString.replace("\t", " ");
+ queryString = queryString.replace("\r", "");
+ queryString = queryString.replace(";", "");
+
+ HashMap statement = new HashMap<>();
+ statement.put("statement", queryString);
+ query.add(statement);
+
+ return query;
+ }
+
+ @Override
+ public void send(Message message) throws InterruptedException {
+ Iterator messageFields = message.getFields().entrySet().iterator();
+ Map model = new HashMap<>();
+ while (messageFields.hasNext()) {
+ Map.Entry pair = (Map.Entry) messageFields.next();
+ model.put(String.valueOf(pair.getKey()), String.valueOf(pair.getValue()));
+ }
+ Engine engine = new Engine();
+ String queryString = engine.transform(configuration.getString(Neo4jOutput.CK_NEO4J_QUERY), model);
+ List> query = parseQuery(queryString);
+
+ postQuery(query);
+ }
+
+ @Override
+ public boolean trySend(Message message) {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4jTransports.java b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4jTransports.java
new file mode 100644
index 0000000..94b0b0f
--- /dev/null
+++ b/src/main/java/org/graylog/plugins/outputs/neo4j/transport/Neo4jTransports.java
@@ -0,0 +1,40 @@
+package org.graylog.plugins.outputs.neo4j.transport;
+
+import org.graylog2.plugin.configuration.Configuration;
+import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
+
+/**
+ * Created by dev on 16/08/16.
+
+/**
+ * Factory for building a {@link INeo4jTransport}.
+ */
+public enum Neo4jTransports {
+ BOLT,
+ HTTP;
+
+ /**
+ * Creates a {@link INeo4jTransport} from the given protocol and configuration.
+ *
+ * @param transport the transport protocol to use
+ * @param config the {@link Configuration} to pass to the transport
+ * @return An initialized and started {@link INeo4jTransport}
+ */
+ public static INeo4jTransport create(final Neo4jTransports transport, final Configuration config) throws MessageOutputConfigurationException {
+ INeo4jTransport neo4JTransport;
+
+ switch (transport) {
+ case BOLT:
+ neo4JTransport = new Neo4JBoltTransport(config);
+ break;
+ case HTTP:
+ neo4JTransport = new Neo4JHttpTransport(config);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported Neo4J transport: " + transport);
+ }
+
+ return neo4JTransport;
+ }
+
+}
\ No newline at end of file