Skip to content

Commit

Permalink
Merge pull request #2 from rcord02/master
Browse files Browse the repository at this point in the history
Added support for Bolt protocol (configurable Bolt/HTTP)
  • Loading branch information
Marius Sturm authored Aug 19, 2016
2 parents f365a6a + d324d06 commit 0a181c3
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 84 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>

<build>
Expand Down
130 changes: 47 additions & 83 deletions src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
package org.graylog.plugins.outputs.neo4j;

import com.floreysoft.jmte.Engine;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang3.StringUtils;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.graylog.plugins.outputs.neo4j.transport.INeo4jTransport;
import org.graylog.plugins.outputs.neo4j.transport.Neo4jTransports;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
import org.graylog2.plugin.streams.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -34,37 +26,40 @@ public class Neo4jOutput implements MessageOutput {

private Configuration configuration;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private WebTarget cypher;
private INeo4jTransport transport;

private static final String CK_NEO4J_URL = "neo4j_url";
private static final String CK_NEO4J_STARTUP_QUERY = "neo4j_startup_query";
private static final String CK_NEO4J_QUERY = "neo4j_query";
private static final String CK_NEO4J_USER = "neo4j_user";
private static final String CK_NEO4J_PASSWORD = "neo4j_password";
public static final String CK_PROTOCOL = "neo4j_protocol";
public static final String CK_NEO4J_URL = "neo4j_url";
public static final String CK_NEO4J_STARTUP_QUERY = "neo4j_startup_query";
public static final String CK_NEO4J_QUERY = "neo4j_query";
public static final String CK_NEO4J_USER = "neo4j_user";
public static final String CK_NEO4J_PASSWORD = "neo4j_password";

@Inject
public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration configuration) throws MessageOutputConfigurationException {
this.configuration = configuration;
public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration config) throws MessageOutputConfigurationException {
configuration = config;
final Neo4jTransports transportSelection;
switch (configuration.getString(CK_PROTOCOL).toUpperCase(Locale.ENGLISH)) {
case "BOLT":
transportSelection = Neo4jTransports.BOLT;
break;
case "HTTP":
transportSelection = Neo4jTransports.HTTP;
break;

default:
throw new MessageOutputConfigurationException("Unknown protocol " + configuration.getString(CK_PROTOCOL));
}

URI databaseUri;
try {
URL baseUrl = new URL(configuration.getString(CK_NEO4J_URL));
databaseUri = new URL(baseUrl, StringUtils.removeEnd(baseUrl.getPath(), "/") + "/transaction/commit").toURI();
} catch (URISyntaxException e) {
throw new MessageOutputConfigurationException("Syntax error in neo4j URL");
} catch (MalformedURLException e) {
throw new MessageOutputConfigurationException("Malformed neo4j URL");
transport = Neo4jTransports.create(transportSelection, configuration);
} catch (Exception e) {
final String error = "Error initializing " + INeo4jTransport.class;
LOG.error(error, e);
throw new MessageOutputConfigurationException(error);
}

HttpAuthenticationFeature auth = HttpAuthenticationFeature.basic
(configuration.getString(CK_NEO4J_USER), configuration.getString(CK_NEO4J_PASSWORD));

Client client = ClientBuilder.newClient();
client.register(auth);
cypher = client.target(databaseUri);
if (! configuration.getString(CK_NEO4J_STARTUP_QUERY).isEmpty()) {
postQuery(parseQuery(configuration.getString(CK_NEO4J_STARTUP_QUERY)));
}
LOG.info("Neo4j output started!");
isRunning.set(true);
}

Expand All @@ -75,17 +70,7 @@ public boolean isRunning() {

@Override
public void write(Message message) throws Exception {
Iterator messageFields = message.getFields().entrySet().iterator();
Map<String, Object> model = new HashMap<>();
while (messageFields.hasNext()) {
Map.Entry pair = (Map.Entry) messageFields.next();
model.put(String.valueOf(pair.getKey()), String.valueOf(pair.getValue()));
}
Engine engine = new Engine();
String queryString = engine.transform(configuration.getString(CK_NEO4J_QUERY), model);
List<HashMap<String, String>> query = parseQuery(queryString);

postQuery(query);
transport.send(message);
}

@Override
Expand All @@ -96,42 +81,10 @@ public void write(List<Message> messages) throws Exception {

}

private List<HashMap<String, String>> parseQuery(String queryString) {
List<HashMap<String, String>> query = new ArrayList<>();

queryString = queryString.replace("\n", " ");
queryString = queryString.replace("\t", " ");
queryString = queryString.replace("\r", "");
queryString = queryString.replace(";", "");

HashMap<String, String> statement = new HashMap<>();
statement.put("statement", queryString);
query.add(statement);

return query;
}

private void postQuery(List<HashMap<String, String>> queries) {
HashMap<String, List<HashMap<String, String>>> payload = new HashMap<>();
payload.put("statements", queries);
Response response = cypher
.request(MediaType.APPLICATION_JSON)
.post(Entity.entity(payload, MediaType.APPLICATION_JSON), Response.class);

String result = String.format(
"POST [%s] to [%s], status code [%d], headers: %s, returned data: %s",
payload, configuration.getString(CK_NEO4J_URL), response.getStatus(), response.getHeaders(),
response);
if (response.getStatus() >= 400) {
LOG.info(result);
} else {
LOG.debug(result);
}
}

@Override
public void stop() {
LOG.info("Stopping Neo4j output");
transport.stop();
isRunning.set(false);
}

Expand All @@ -151,9 +104,18 @@ public static class Config extends MessageOutput.Config {
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest configurationRequest = new ConfigurationRequest();

final Map<String, String> protocols = ImmutableMap.of(
"HTTP", "HTTP",
"Bolt", "Bolt");

configurationRequest.addField(new DropdownField(
CK_PROTOCOL, "Neo4J Protocol", "HTTP", protocols,
"The protocol used to connect to Neo4J",
ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(new TextField(
CK_NEO4J_URL, "Neo4j URL", "http://localhost:7474/db/data",
"URL to Neo4j",
"default for Bolt: bolt://localhost:7687/",
ConfigurationField.Optional.NOT_OPTIONAL)
);

Expand All @@ -167,7 +129,9 @@ public ConfigurationRequest getRequestedConfiguration() {
CK_NEO4J_QUERY, "Cypher query",
"MERGE (source:HOST { address: '${source}' })\n" +
"MERGE (user_id:USER { user_id: '${user_id}'})\nMERGE (source)-[:CONNECT]->(user_id)",
"Query will be executed on every log message. Use template substitutions to access message fields: ${took_ms}",
"Query will be executed on every log message.\n" +
"HTTP Mode: Use template substitutions to access message fields: ${took_ms}\n" +
"Bolt Mode: Use curly brackets only to access message fields: {took_ms}",
ConfigurationField.Optional.NOT_OPTIONAL, TextField.Attribute.TEXTAREA)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public String getName() {

@Override
public String getAuthor() {
return "Graylog, Inc.";
return "Graylog, Inc., adapted by Rainer Cording";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.graylog.plugins.outputs.neo4j.transport;

import org.graylog2.plugin.Message;

/**
* Created by dev on 16/08/16.
*/
public interface INeo4jTransport {
/**
* Sends the given message to the remote host. This <strong>blocks</strong> until there is sufficient capacity to
* process the message. It is not guaranteed that the message has been sent once the method call returns because
* a queue might be used to dispatch the message.
*
* @param message message to send to the remote host
* @throws InterruptedException
*/
public void send(Message message) throws InterruptedException;

/**
* Tries to send the given message to the remote host. It does <strong>not block</strong> if there is not enough
* capacity to process the message. It is not guaranteed that the message has been sent once the method call
* returns because a queue might be used to dispatch the message.
*
* @param message message to send to the remote host
* @return true if the message could be dispatched, false otherwise
*/
public boolean trySend(Message message);

/**
* Stops the transport. Should be used to gracefully shutdown the backend.
*/
public void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.graylog.plugins.outputs.neo4j.transport;

import org.graylog.plugins.outputs.neo4j.Neo4jOutput;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.outputs.MessageOutputConfigurationException;
import org.neo4j.driver.v1.*;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Created by dev on 16/08/16.
*/
public class Neo4JBoltTransport implements INeo4jTransport {
private static final Logger LOG = LoggerFactory.getLogger(Neo4JBoltTransport.class);

private Driver driver;
private Configuration configuration;
private String parsedCreateQery = null;
List<String> fields;


public Neo4JBoltTransport(Configuration config) throws MessageOutputConfigurationException {


configuration = config;
fields = new LinkedList<String>();;
Session session = null;

try {
driver = GraphDatabase.driver( config.getString(Neo4jOutput.CK_NEO4J_URL),
AuthTokens.basic(config.getString(Neo4jOutput.CK_NEO4J_USER), config.getString(Neo4jOutput.CK_NEO4J_PASSWORD)) );
session = driver.session();

//run initialization query only once
String createQueryOnce = config.getString(Neo4jOutput.CK_NEO4J_STARTUP_QUERY);

if (createQueryOnce.length() > 0)
session.run(createQueryOnce);
}
catch (ClientException e){
throw new MessageOutputConfigurationException("Malformed neo4j configuration: " + e );
}
finally {
session.close();
}
//get message fields needed by cypher query
String createQuery = config.getString(Neo4jOutput.CK_NEO4J_QUERY);
LOG.debug("Bolt protocol, create query: " + createQuery);

Matcher m = Pattern.compile("\\{([^{}]*)\\}").matcher(createQuery);
while (m.find()) {
fields.add(m.group(1));
LOG.debug("Found field in cypher statement: " + m.group(1));
}
LOG.info("Identified " + fields.size() + " fields in graph create query.");

parsedCreateQery = parseQuery(createQuery);


}

private void postQuery(String query, Map<String, Object> mapping) {
Session session = null;
try {
session = driver.session();
session.run(query, mapping).consume();
}
catch (ClientException e) {
LOG.debug("Could not push message to Graph Database: " + e.getMessage());
}
finally{
if (session != null && session.isOpen())
session.close();
}
}

private String parseQuery(String queryString) {

queryString = queryString.replace("\n", " ");
queryString = queryString.replace("\t", " ");
queryString = queryString.replace("\r", "");
queryString = queryString.replace(";", "");

return queryString;
}

@Override
public void send(Message message) throws InterruptedException {

HashMap<String, Object> convertedFields = new HashMap<String, Object>(){};

for (String field : fields){
if (message.hasField(field)) {
Object valueForField = message.getField(field);
convertedFields.put(field, String.valueOf(valueForField));
}
else
{
convertedFields.put(field, null);
}
}
postQuery(parsedCreateQery, convertedFields);

}





@Override
public boolean trySend(Message message) {
return false;
}

@Override
public void stop() {
driver.close();
}
}
Loading

0 comments on commit 0a181c3

Please sign in to comment.