diff --git a/.github/workflows/publish-container-image.yml b/.github/workflows/publish-container-image.yml index 03ced126..7eeadcc9 100644 --- a/.github/workflows/publish-container-image.yml +++ b/.github/workflows/publish-container-image.yml @@ -15,6 +15,7 @@ jobs: env: CONTAINER_REGISTRY_USERNAME: ${GITHUB_ACTOR} CONTAINER_REGISTRY_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN: ${{ secrets.SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN }} steps: - name: Checkout diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 2750a8e4..39392746 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -29,3 +29,5 @@ jobs: - name: Run Tests run: mvn --file pom.xml --batch-mode test + env: + SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN: ${{ secrets.SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN }} diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/config/MonitoringConfiguration.java b/context-monitoring/src/main/java/de/atb/context/monitoring/config/MonitoringConfiguration.java index 9a3d99a0..32cf3645 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/config/MonitoringConfiguration.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/config/MonitoringConfiguration.java @@ -15,6 +15,12 @@ */ +import de.atb.context.common.Configuration; +import de.atb.context.common.exceptions.ConfigurationException; +import de.atb.context.monitoring.config.models.*; +import de.atb.context.tools.ontology.AmIMonitoringConfiguration; +import org.simpleframework.xml.core.Persister; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -22,18 +28,6 @@ import java.util.List; import java.util.Map; -import de.atb.context.monitoring.config.models.IMonitoringConfiguration; -import de.atb.context.monitoring.config.models.Index; -import org.simpleframework.xml.Serializer; -import org.simpleframework.xml.core.Persister; -import de.atb.context.tools.ontology.AmIMonitoringConfiguration; -import de.atb.context.common.Configuration; -import de.atb.context.common.exceptions.ConfigurationException; -import de.atb.context.monitoring.config.models.Config; -import de.atb.context.monitoring.config.models.DataSource; -import de.atb.context.monitoring.config.models.Interpreter; -import de.atb.context.monitoring.config.models.Monitor; - /** * Settings * @@ -42,35 +36,35 @@ */ public final class MonitoringConfiguration extends Configuration implements IMonitoringConfiguration { - private static volatile Map settings = new HashMap<>(); - private static final String DefaultFileName = "monitoring-config.xml"; + private static final Map SETTINGS = new HashMap<>(); + private static final String DEFAULT_FILE_NAME = "monitoring-config.xml"; public static MonitoringConfiguration getInstance() { - if (settings.get(DefaultFileName) == null) { - settings.put(DefaultFileName, new MonitoringConfiguration(DefaultFileName)); + if (SETTINGS.get(DEFAULT_FILE_NAME) == null) { + SETTINGS.put(DEFAULT_FILE_NAME, new MonitoringConfiguration(DEFAULT_FILE_NAME)); } - return settings.get(DefaultFileName); + return SETTINGS.get(DEFAULT_FILE_NAME); } public static MonitoringConfiguration getInstance(final AmIMonitoringConfiguration config) { - if (settings.get(config) == null) { - settings.put(config.getId(), new MonitoringConfiguration(config)); + if (SETTINGS.get(config) == null) { + SETTINGS.put(config.getId(), new MonitoringConfiguration(config)); } - return settings.get(config.getId()); + return SETTINGS.get(config.getId()); } public static MonitoringConfiguration getInstance(final String configFileName) { - if (settings.get(configFileName) == null) { - settings.put(configFileName, new MonitoringConfiguration(configFileName)); + if (SETTINGS.get(configFileName) == null) { + SETTINGS.put(configFileName, new MonitoringConfiguration(configFileName)); } - return settings.get(configFileName); + return SETTINGS.get(configFileName); } public static MonitoringConfiguration getInstance(final String configFileName, final String configFilePath) { - if (settings.get(configFileName) == null) { - settings.put(configFileName, new MonitoringConfiguration(configFileName, configFilePath)); + if (SETTINGS.get(configFileName) == null) { + SETTINGS.put(configFileName, new MonitoringConfiguration(configFileName, configFilePath)); } - return settings.get(configFileName); + return SETTINGS.get(configFileName); } private MonitoringConfiguration(final String givenName, final String givenPath) { @@ -88,23 +82,19 @@ private MonitoringConfiguration(final AmIMonitoringConfiguration config) { protected void readConfigurationFile() { InputStream is = null; try { - final Serializer serializer = new Persister(); - - String drmHandle = sysCaller.openDRMobject("monitoring-config.xml", configurationLookupPath,"read"); + final String drmHandle = sysCaller.openDRMobject(configurationFileName, configurationLookupPath, "read"); if (drmHandle != null) { - byte[] readConfig = sysCaller.getDRMobject("monitoring-config.xml", configurationLookupPath); + final byte[] readConfig = sysCaller.getDRMobject(configurationFileName, configurationLookupPath); if (readConfig != null) { is = new ByteArrayInputStream(readConfig); - this.configurationBean = serializer.read( - this.configurationClass, is); + this.configurationBean = new Persister().read(this.configurationClass, is); is.close(); - logger.info("" + this.configurationFileName + " loaded!"); + logger.info("{} loaded!", configurationFileName); } sysCaller.closeDRMobject(drmHandle); } } catch (final Exception e) { - logger.error("Could not serialize the " + configurationName - + " file: " + this.configurationFileName, e); + logger.error("Could not serialize the {} file {}", configurationName, configurationFileName, e); } finally { if (is != null) { try { @@ -170,15 +160,25 @@ public void checkConsistency() throws ConfigurationException { } if (getDataSource(monitor.getDataSourceId()) == null) { - throw new ConfigurationException("DataSource '%s' for Monitor '%s' is not configured", monitor.getDataSourceId(), - monitor.getId()); + throw new ConfigurationException( + "DataSource '%s' for Monitor '%s' is not configured", + monitor.getDataSourceId(), + monitor.getId() + ); } if (getInterpreter(monitor.getInterpreterId()) == null) { - throw new ConfigurationException("Interpreter '%s' for Monitor '%s' is not configured", monitor.getInterpreterId(), - monitor.getId()); + throw new ConfigurationException( + "Interpreter '%s' for Monitor '%s' is not configured", + monitor.getInterpreterId(), + monitor.getId() + ); } if (getIndex(monitor.getIndexId()) == null) { - throw new ConfigurationException("Index '%s' for Monitor '%s' is not configured", monitor.getIndexId(), monitor.getId()); + throw new ConfigurationException( + "Index '%s' for Monitor '%s' is not configured", + monitor.getIndexId(), + monitor.getId() + ); } } } diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSource.java b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSource.java index 0ac35321..3d532d2b 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSource.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSource.java @@ -15,8 +15,6 @@ */ -import java.net.URI; - import de.atb.context.common.authentication.Credentials; import de.atb.context.monitoring.config.models.DataSource; import de.atb.context.monitoring.config.models.DataSourceType; @@ -24,83 +22,101 @@ import thewebsemantic.Namespace; import thewebsemantic.RdfType; +import java.io.Serializable; +import java.net.URI; + /** * MessageBrokerDataSource * * @author scholze * @version $LastChangedRevision: 156 $ - * */ @RdfType("MessageBrokerDataSource") @Namespace("http://atb-bremen.de/") public class MessageBrokerDataSource extends DataSource { - private static final long serialVersionUID = 3490943354053238739L; + public MessageBrokerDataSource() { + } + + public MessageBrokerDataSource(final DataSource base) { + this.id = base.getId(); + this.monitor = base.getMonitor(); + this.options = base.getOptions(); + this.type = base.getType().toString(); + this.uri = base.getUri(); + } + + @Override + public final DataSourceType getType() { + return DataSourceType.MessageBroker; + } + + public final String getMessageBrokerServer() { + return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerServer, true); + } - public MessageBrokerDataSource() { - } + public final Integer getMessageBrokerPort() { + return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerPort, true); + } - public MessageBrokerDataSource(final DataSource base) { - this.id = base.getId(); - this.monitor = base.getMonitor(); - this.options = base.getOptions(); - this.type = base.getType().toString(); - this.uri = base.getUri(); - } + public final String getUserName() { + return this.getOptionValue(MessageBrokerDataSourceOptions.UserName, true); + } - @Override - public final DataSourceType getType() { - return DataSourceType.MessageBroker; - } + public final String getPassword() { + return this.getOptionValue(MessageBrokerDataSourceOptions.Password, true); + } - public final String getMessageBrokerServer() { - return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerServer, true); - } + public final String getIncomingExchange() { + return this.getOptionValue(MessageBrokerDataSourceOptions.IncomingExchange, true); + } - public final Integer getMessageBrokerPort() { - return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerPort, true); + public final String getIncomingTopic() { + return this.getOptionValue(MessageBrokerDataSourceOptions.IncomingTopic, true); } - public final String getUserName() { - return this.getOptionValue(MessageBrokerDataSourceOptions.UserName, true); - } + public final boolean isIncomingDurable() { + final Serializable value = this.getOptionValue(MessageBrokerDataSourceOptions.IncomingDurable, false); + return value != null && (boolean) value; + } - public final String getPassword() { - return this.getOptionValue(MessageBrokerDataSourceOptions.Password, true); - } + public final String getOutgoingExchange() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingExchange, true); + } - public final String getExchange() { - return this.getOptionValue(MessageBrokerDataSourceOptions.Exchange, true); + public final String getOutgoingTopic() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingTopic, true); } - public final String getTopic() { - return this.getOptionValue(MessageBrokerDataSourceOptions.Topic, true); + public final String getOutgoingQueue() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingQueue, true); } - public final String getDleTopic() { - return this.getOptionValue(MessageBrokerDataSourceOptions.DleTopic, true); + public final boolean isOutgoingDurable() { + final Serializable value = this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingDurable, false); + return value != null && (boolean) value; } public final Credentials getCredentials() { - String userName = this.getUserName(); - String password = this.getPassword(); - return new Credentials(userName, password); - } + String userName = this.getUserName(); + String password = this.getPassword(); + return new Credentials(userName, password); + } - public final IMessageBroker toMessageBroker() { - final URI myUri = URI.create(uri); + public final IMessageBroker toMessageBroker() { + final URI myUri = URI.create(uri); final Credentials myCredentials = getCredentials(); - return new IMessageBroker() { + return new IMessageBroker() { - @Override - public URI getURI() { - return myUri; - } + @Override + public URI getURI() { + return myUri; + } @Override public Credentials getCredentials() { return myCredentials; } - }; - } + }; + } } diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSourceOptions.java b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSourceOptions.java index e50fa48e..83867d66 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSourceOptions.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/MessageBrokerDataSourceOptions.java @@ -34,11 +34,19 @@ public enum MessageBrokerDataSourceOptions implements IDataSourceOptionValue { Password("password", String.class), - Exchange("exchange", String.class), + IncomingExchange("incoming-exchange", String.class), - Topic("topic", String.class), + IncomingTopic("incoming-topic", String.class), - DleTopic("dle-topic", String.class); + IncomingDurable("incoming-durable", Boolean.class), + + OutgoingExchange("outgoing-exchange", String.class), + + OutgoingTopic("outgoing-topic", String.class), + + OutgoingQueue("outgoing-queue", String.class), + + OutgoingDurable("outgoing-durable", Boolean.class); private final String key; private final Class valueType; diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/WebServiceDataSource.java b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/WebServiceDataSource.java index e8042093..61efca4b 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/WebServiceDataSource.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/WebServiceDataSource.java @@ -15,14 +15,14 @@ */ -import java.net.URI; - +import de.atb.context.common.authentication.Credentials; import de.atb.context.monitoring.config.models.DataSource; import de.atb.context.monitoring.config.models.DataSourceType; import de.atb.context.monitoring.models.IWebService; import thewebsemantic.Namespace; import thewebsemantic.RdfType; -import de.atb.context.common.authentication.Credentials; + +import java.net.URI; /** * WebServiceDataSource @@ -68,6 +68,10 @@ public final String getPassword() { return this.getOptionValue(WebServiceDataSourceOptions.Password, true); } + public final Long getStartDelay() { + return this.getOptionValue(WebServiceDataSourceOptions.StartDelay, true); + } + public final String getMachineId() { return this.getOptionValue(WebServiceDataSourceOptions.MachineId, true); } @@ -78,10 +82,6 @@ public final Credentials getCredentials() { return new Credentials(userName, password); } - public final Long getStartDelay() { - return this.getOptionValue(WebServiceDataSourceOptions.StartDelay, true); - } - public final IWebService toWebService() { final URI myUri = URI.create(uri); final Credentials myCredentials = getCredentials(); diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/MessageBrokerMonitor.java b/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/MessageBrokerMonitor.java index 0c7688fb..113c548d 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/MessageBrokerMonitor.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/MessageBrokerMonitor.java @@ -14,8 +14,6 @@ * #L% */ -import java.nio.charset.StandardCharsets; - import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; @@ -34,6 +32,8 @@ import de.atb.context.monitoring.parser.messagebroker.MessageBrokerParser; import de.atb.context.tools.ontology.AmIMonitoringConfiguration; +import java.nio.charset.StandardCharsets; + /** * WebServiceMonitor * @@ -89,16 +89,32 @@ protected void doMonitor(final InterpreterConfiguration setting) throws Exceptio this.parser = getParser(setting); } - final Channel channel = MessageBrokerUtil.connectToTopicExchange(dataSource); - MessageBrokerUtil.registerListenerOnTopic(channel, dataSource, deliverCallback, cancelCallback); + final Channel channel = MessageBrokerUtil.connectToTopicExchange( + dataSource.getMessageBrokerServer(), + dataSource.getMessageBrokerPort(), + dataSource.getUserName(), + dataSource.getPassword(), + dataSource.getIncomingExchange(), + dataSource.isIncomingDurable() + ); + MessageBrokerUtil.registerListenerOnTopic( + channel, + dataSource.getIncomingExchange(), + dataSource.getIncomingTopic(), + dataSource.getId(), + deliverCallback, + cancelCallback + ); } } // FIXME: envelope is never used protected final void handleMessage(Envelope envelope, String message) { - this.logger.info("Handling message from exchange \"{}\" with routing key \"{}\" ...", - envelope.getExchange(), - envelope.getRoutingKey()); + this.logger.info( + "Handling message from exchange \"{}\" with routing key \"{}\" ...", + envelope.getExchange(), + envelope.getRoutingKey() + ); try { if ((this.dataSource.getUri() != null)) { MessageBrokerAnalyser analyser = (MessageBrokerAnalyser) parser.getAnalyser(); diff --git a/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/util/MessageBrokerUtil.java b/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/util/MessageBrokerUtil.java index 6ba3f4b8..f4106a35 100644 --- a/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/util/MessageBrokerUtil.java +++ b/context-monitoring/src/main/java/de/atb/context/monitoring/monitors/messagebroker/util/MessageBrokerUtil.java @@ -14,11 +14,6 @@ * #L% */ -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.UUID; -import java.util.concurrent.TimeoutException; - import com.google.gson.Gson; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; @@ -26,11 +21,15 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; -import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + /** * Helper class wrapping methods for interacting with message broker. */ @@ -49,6 +48,7 @@ public class MessageBrokerUtil { * @param userName the username to use when connecting to message broker - optional * @param password the password to use when connecting to message broker - optional * @param exchange the topic exchange's name + * @param durable whether the topic should be durable or not * @return a {@link Channel} object representing the established connection to the message broker * @throws IOException in case of error * @throws TimeoutException in case of error @@ -57,73 +57,53 @@ public static Channel connectToTopicExchange(final String host, final int port, final String userName, final String password, - final String exchange) throws IOException, TimeoutException { - LOGGER.info("Connecting to messagebroker {}:{} with user {}", host, port, userName != null ? userName : ""); - final ConnectionFactory factory = new ConnectionFactory(); - - factory.setHost(host); - factory.setPort(port); - - if (StringUtils.isNotBlank(userName)) { - factory.setUsername(userName); - } - if (StringUtils.isNotBlank(password)) { - factory.setPassword(password); - } - - factory.setAutomaticRecoveryEnabled(true); - - final Connection connection = factory.newConnection(); + final String exchange, + final boolean durable) throws IOException, TimeoutException { + final Connection connection = getConnection(host, port, userName, password); final Channel channel = connection.createChannel(); LOGGER.info("Creating topic exchange {}", exchange); - channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true); + channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, durable); return channel; } /** - * Connect to the message broker specified by {@code host} and {@code port}. - * Create the given {@code exchange} if it does not exist yet. + * Connect to the message broker specified by {@code host} and {@code port} + * with the credentials specified by {@code userName} and {@code password}. + * Create the given {@code queue} if it does not exist yet. * * @param host the host where the message broker is running * @param port the port where the message broker is listening - * @param exchange the topic exchange's name + * @param userName the username to use when connecting to message broker - optional + * @param password the password to use when connecting to message broker - optional + * @param queue the queue's name + * @param durable whether the queue should be durable or not * @return a {@link Channel} object representing the established connection to the message broker * @throws IOException in case of error * @throws TimeoutException in case of error - * @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String) */ - public static Channel connectToTopicExchange(final String host, - final int port, - final String exchange) throws IOException, TimeoutException { - return connectToTopicExchange(host, port, null, null, exchange); - } + public static Channel connectToQueue(final String host, + final int port, + final String userName, + final String password, + final String queue, + final boolean durable) throws IOException, TimeoutException { + final Connection connection = getConnection(host, port, userName, password); - /** - * Connect to message broker. Message broker details and credentials are specified in the given {@code dataSource}. - * Create the given {@code exchange} if it does not exist yet. - * - * @param dataSource the {@link MessageBrokerDataSource} containing the message broker connection details - * @return a {@link Channel} object representing the established connection to the message broker - * @throws IOException in case of error - * @throws TimeoutException in case of error - * @see MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String) - */ - public static Channel connectToTopicExchange(final MessageBrokerDataSource dataSource) - throws IOException, TimeoutException { - return connectToTopicExchange(dataSource.getMessageBrokerServer(), - dataSource.getMessageBrokerPort(), - dataSource.getUserName(), - dataSource.getPassword(), - dataSource.getExchange()); + final Channel channel = connection.createChannel(); + + LOGGER.info("Creating queue {}", queue); + channel.queueDeclare(queue, durable, false, false, null); + + return channel; } /** * Register the given callback functions to consume messages on the given {@code exchange} for the given {@code topic}. *

- * Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads + * Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String, boolean)} * to create {@link Channel}. * * @param channel the {@link Channel} object representing the established connection to the message broker @@ -148,57 +128,70 @@ public static void registerListenerOnTopic(final Channel channel, channel.basicConsume(queue, true, deliverCallback, cancelCallback); } - /** - * Register the given callback functions to consume messages. - * The exchange and topic to register for are specified in the given {@code dataSource}. - *

- * Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads - * to create {@link Channel}. - * - * @param channel the {@link Channel} object representing the established connection to the message broker - * @param dataSource the {@link MessageBrokerDataSource} containing the exchange and topic details - * @param deliverCallback callback function to handle received messages - * @param cancelCallback callback function to handle cancellation of the listener - * @throws IOException in case of error - */ - public static void registerListenerOnTopic(final Channel channel, - final MessageBrokerDataSource dataSource, - final DeliverCallback deliverCallback, - final CancelCallback cancelCallback) throws IOException { - registerListenerOnTopic(channel, - dataSource.getExchange(), - dataSource.getTopic(), - dataSource.getId(), - deliverCallback, - cancelCallback); - } - /** * Converts the given {@code payload} object to a JSON string * and sends it to the given {@code topic} on the given {@code exchange}. *

- * Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String)} or one of its overloads + * Use {@link MessageBrokerUtil#connectToTopicExchange(String, int, String, String, String, boolean)} * to create {@link Channel}. * * @param channel the {@link Channel} object representing the established connection to the message broker * @param exchange the topic exchange's name * @param topic the topic's name * @param payload the object to send - * @throws IOException in case of error */ public static void convertAndSendToTopic(final Channel channel, final String exchange, final String topic, - final Object payload) throws IOException { - final String jsonMessage = GSON.toJson(payload); - sendToTopic(channel, exchange, topic, jsonMessage); + final Object payload) { + try { + final String jsonMessage = GSON.toJson(payload); + LOGGER.info("Publishing message to topic {}/{}: {}", exchange, topic, jsonMessage); + channel.basicPublish(exchange, topic, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOGGER.error("Failed to send {} to topic {}/{}", payload, exchange, topic, e); + } + } + + /** + * Converts the given {@code payload} object to a JSON string and sends it to the given {@code queue}. + *

+ * Use {@link MessageBrokerUtil#connectToQueue(String, int, String, String, String, boolean)} + * to create {@link Channel}. + * + * @param channel the {@link Channel} object representing the established connection to the message broker + * @param queue the queue's name + * @param payload the object to send + */ + public static void convertAndSendToQueue(final Channel channel, final String queue, final Object payload) { + try { + final String jsonMessage = GSON.toJson(payload); + LOGGER.info("Publishing message to queue {}: {}", queue, jsonMessage); + channel.basicPublish("", queue, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + LOGGER.error("Failed to send {} to queue {}", payload, queue, e); + } } - private static void sendToTopic(final Channel channel, - final String exchange, - final String topic, - final String jsonMessage) throws IOException { - LOGGER.info("Publishing message to topic {}/{}: {}", exchange, topic, jsonMessage); - channel.basicPublish(exchange, topic, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); + private static Connection getConnection(final String host, + final int port, + final String userName, + final String password) throws IOException, TimeoutException { + LOGGER.info("Connecting to messagebroker {}:{} with user {}", host, port, userName != null ? userName : ""); + final ConnectionFactory factory = new ConnectionFactory(); + + factory.setHost(host); + factory.setPort(port); + + if (StringUtils.isNotBlank(userName)) { + factory.setUsername(userName); + } + if (StringUtils.isNotBlank(password)) { + factory.setPassword(password); + } + + factory.setAutomaticRecoveryEnabled(true); + + return factory.newConnection(); } } diff --git a/context-monitoring/src/test/java/de/atb/context/monitoring/monitors/file/TestFilePairSystemMonitor.java b/context-monitoring/src/test/java/de/atb/context/monitoring/monitors/file/TestFilePairSystemMonitor.java index 0dd7c62d..6e60f1fd 100644 --- a/context-monitoring/src/test/java/de/atb/context/monitoring/monitors/file/TestFilePairSystemMonitor.java +++ b/context-monitoring/src/test/java/de/atb/context/monitoring/monitors/file/TestFilePairSystemMonitor.java @@ -39,9 +39,10 @@ public class TestFilePairSystemMonitor { @BeforeClass public static void beforeClass() throws ConfigurationException { - String absolutefilePath = Path.of("src", "test", "resources").toAbsolutePath().toString(); + String absoluteFilePath = + Path.of("src", "test", "resources", "filepairmonitor").toAbsolutePath().toString(); - config = MonitoringConfiguration.getInstance("monitoring-filepair-config.xml", absolutefilePath); + config = MonitoringConfiguration.getInstance("monitoring-config.xml", absoluteFilePath); monitor = config.getMonitor("monitor-dummy"); Assert.assertTrue("No monitors 'monitor-dummy' specified!", config.getMonitor("monitor-dummy") != null); diff --git a/context-monitoring/src/test/resources/monitoring-filepair-config.xml b/context-monitoring/src/test/resources/filepairmonitor/monitoring-config.xml similarity index 87% rename from context-monitoring/src/test/resources/monitoring-filepair-config.xml rename to context-monitoring/src/test/resources/filepairmonitor/monitoring-config.xml index 47228d83..71d82581 100644 --- a/context-monitoring/src/test/resources/monitoring-filepair-config.xml +++ b/context-monitoring/src/test/resources/filepairmonitor/monitoring-config.xml @@ -1,10 +1,10 @@ + xsi:schemaLocation="http://www.atb-bremen.de ../monitoring-config.xsd"> - + diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitAnalyser.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitAnalyser.java deleted file mode 100644 index 75a48db2..00000000 --- a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitAnalyser.java +++ /dev/null @@ -1,59 +0,0 @@ -package de.atb.context.monitoring.analyser; - -/* - * #%L - * SmartCLIDE Monitoring - * %% - * Copyright (C) 2021 ATB – Institut für angewandte Systemtechnik Bremen GmbH - * %% - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * #L% - */ - -import java.util.Date; -import java.util.List; - -import com.google.gson.Gson; -import de.atb.context.monitoring.analyser.messagebroker.MessageBrokerAnalyser; -import de.atb.context.monitoring.config.models.DataSource; -import de.atb.context.monitoring.config.models.InterpreterConfiguration; -import de.atb.context.monitoring.index.Indexer; -import de.atb.context.monitoring.models.GitDataModel; -import de.atb.context.monitoring.models.GitMessage; -import de.atb.context.tools.ontology.AmIMonitoringConfiguration; -import org.apache.lucene.document.Document; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GitAnalyser extends MessageBrokerAnalyser { - - private static final Logger logger = LoggerFactory.getLogger(GitAnalyser.class); - - private static final Gson GSON = new Gson(); - - public GitAnalyser(final DataSource dataSource, - final InterpreterConfiguration interpreterConfiguration, - final Indexer indexer, - final Document document, - final AmIMonitoringConfiguration amiConfiguration) { - super(dataSource, interpreterConfiguration, indexer, document, amiConfiguration); - } - - @Override - public List analyseObject(final String input) { - try { - final GitMessage gitMessage = GSON.fromJson(input, GitMessage.class); - final GitDataModel model = new GitDataModel(); - model.addGitMessage(gitMessage); - model.setMonitoredAt(new Date()); - return List.of(model); - } catch (Exception e) { - logger.error("Error analysing input: {}", input); - return List.of(); - } - } -} diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitlabCommitAnalyser.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitlabCommitAnalyser.java new file mode 100644 index 00000000..873a5a74 --- /dev/null +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/analyser/GitlabCommitAnalyser.java @@ -0,0 +1,74 @@ +package de.atb.context.monitoring.analyser; + +/* + * #%L + * SmartCLIDE Monitoring + * %% + * Copyright (C) 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import de.atb.context.monitoring.analyser.webservice.WebServiceAnalyser; +import de.atb.context.monitoring.config.models.DataSource; +import de.atb.context.monitoring.config.models.InterpreterConfiguration; +import de.atb.context.monitoring.config.models.datasources.GitlabDataSource; +import de.atb.context.monitoring.index.Indexer; +import de.atb.context.monitoring.models.GitlabCommitDataModel; +import de.atb.context.monitoring.models.GitlabCommitMessage; +import de.atb.context.monitoring.models.IWebService; +import de.atb.context.tools.ontology.AmIMonitoringConfiguration; +import eu.smartclide.contexthandling.services.GitlabApiClient; +import org.apache.lucene.document.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; + +public class GitlabCommitAnalyser extends WebServiceAnalyser { + + private static final Logger logger = LoggerFactory.getLogger(GitlabCommitAnalyser.class); + + private final GitlabApiClient gitlabApiClient; + + public GitlabCommitAnalyser(final DataSource dataSource, + final InterpreterConfiguration interpreterConfiguration, + final Indexer indexer, + final Document document, + final AmIMonitoringConfiguration amiConfiguration) { + super(dataSource, interpreterConfiguration, indexer, document, amiConfiguration); + if (!(dataSource instanceof GitlabDataSource)) { + throw new IllegalArgumentException("Given dataSource must be of type GitlabDataSource!"); + } + gitlabApiClient = new GitlabApiClient( + ((GitlabDataSource) dataSource).getGitLabAccessToken(), + dataSource.getUri() + ); + } + + @Override + public List analyseObject(IWebService service) { + try { + final List gitlabCommitMessages = gitlabApiClient.getGitlabCommitMessages(); + + if (gitlabCommitMessages.isEmpty()) { + return List.of(); + } + + final GitlabCommitDataModel model = new GitlabCommitDataModel(); + model.setGitlabCommitMessages(gitlabCommitMessages); + model.setMonitoredAt(new Date()); + logger.info("Analysed {} GitlabCommitMessages", gitlabCommitMessages.size()); + return List.of(model); + } catch (Exception e) { + logger.error("Error analysing service: {}", service); + return List.of(); + } + } +} diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSource.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSource.java new file mode 100644 index 00000000..967ff246 --- /dev/null +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSource.java @@ -0,0 +1,83 @@ +package de.atb.context.monitoring.config.models.datasources; + +/*- + * #%L + * SmartCLIDE Monitoring + * %% + * Copyright (C) 2015 - 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import de.atb.context.monitoring.config.models.DataSource; +import lombok.NoArgsConstructor; +import thewebsemantic.Namespace; +import thewebsemantic.RdfType; + +import java.io.Serializable; + +/** + * GitLabDataSource + * + * @author scholze + * @version $LastChangedRevision: 156 $ + */ +@RdfType("GitlabDataSource") +@Namespace("http://atb-bremen.de/") +@NoArgsConstructor +public class GitlabDataSource extends WebServiceDataSource { + + public static final IDataSourceOptionValue ACCESS_TOKEN_OPTION = new IDataSourceOptionValue() { + @Override + public String getKeyName() { + return "token"; + } + + @Override + public Class getValueType() { + return String.class; + } + }; + + public GitlabDataSource(final DataSource base) { + this.id = base.getId(); + this.monitor = base.getMonitor(); + this.options = base.getOptions(); + this.type = base.getType().toString(); + this.uri = base.getUri(); + } + + public final String getGitLabAccessToken() { + return this.getOptionValue(ACCESS_TOKEN_OPTION, true); + } + + public final String getMessageBrokerServer() { + return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerServer, true); + } + + public final Integer getMessageBrokerPort() { + return this.getOptionValue(MessageBrokerDataSourceOptions.MessageBrokerPort, true); + } + + public final String getOutgoingExchange() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingExchange, true); + } + + public final String getOutgoingTopic() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingTopic, true); + } + + public final String getOutgoingQueue() { + return this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingQueue, true); + } + + public final boolean isOutgoingDurable() { + final Serializable value = this.getOptionValue(MessageBrokerDataSourceOptions.OutgoingDurable, false); + return value != null && (boolean) value; + } +} diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitDataModel.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitDataModel.java similarity index 65% rename from smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitDataModel.java rename to smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitDataModel.java index 1f7c0e57..969221f1 100644 --- a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitDataModel.java +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitDataModel.java @@ -4,7 +4,7 @@ * #%L * SmartCLIDE Monitoring * %% - * Copyright (C) 2021 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * Copyright (C) 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH * %% * This program and the accompanying materials are made * available under the terms of the Eclipse Public License 2.0 @@ -14,59 +14,59 @@ * #L% */ -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; - -import org.apache.jena.rdf.model.Model; import de.atb.context.common.Version; import de.atb.context.common.util.ApplicationScenario; import de.atb.context.common.util.BusinessCase; -import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource; +import de.atb.context.monitoring.config.models.datasources.WebServiceDataSource; import de.atb.context.monitoring.rdf.RdfHelper; import de.atb.context.persistence.ModelOutputLanguage; import lombok.Getter; import lombok.Setter; +import org.apache.jena.rdf.model.Model; import org.simpleframework.xml.Root; +import thewebsemantic.Id; import thewebsemantic.Namespace; import thewebsemantic.RdfType; -import thewebsemantic.Id; -@RdfType("GitDataModel") +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +@RdfType("GitlabCommitDataModel") @Namespace(BusinessCase.NS_DUMMY_URL) @Root @Getter @Setter -public class GitDataModel implements IMonitoringDataModel { +public class GitlabCommitDataModel implements IMonitoringDataModel { private Date monitoredAt; private String documentIndexId = "index/broker"; private String documentUri; @Id private String identifier; - private MessageBrokerDataSource dataSource; - private String implementingClassName = GitDataModel.class.getName(); + private WebServiceDataSource dataSource; + private String implementingClassName = GitlabCommitDataModel.class.getName(); private String monitoringDataVersion = Version.MONITORING_DATA.getVersionString(); - private List gitMessages = new ArrayList<>(); + private List gitlabCommitMessages = new ArrayList<>(); - public GitDataModel() { + public GitlabCommitDataModel() { this.identifier = UUID.randomUUID().toString(); } - @Override - public GitDataModel fromRdfModel(String rdfModel) { - return RdfHelper.createMonitoringData(rdfModel, GitDataModel.class); + public void addGitMessage(GitlabCommitMessage gitlabCommitMessage) { + if (!this.gitlabCommitMessages.contains(gitlabCommitMessage)) { + this.gitlabCommitMessages.add(gitlabCommitMessage); + } } - public void addGitMessage(GitMessage gitMessage) { - if (!this.gitMessages.contains(gitMessage)) { - this.gitMessages.add(gitMessage); - } + @Override + public GitlabCommitDataModel fromRdfModel(String rdfModel) { + return RdfHelper.createMonitoringData(rdfModel, GitlabCommitDataModel.class); } @Override - public GitDataModel fromRdfModel(Model model) { - return RdfHelper.createMonitoringData(model, GitDataModel.class); + public GitlabCommitDataModel fromRdfModel(Model model) { + return RdfHelper.createMonitoringData(model, GitlabCommitDataModel.class); } @Override @@ -85,7 +85,7 @@ public String getContextIdentifierClassName() { } @Override - public void setDataSource(MessageBrokerDataSource dataSource) { + public void setDataSource(WebServiceDataSource dataSource) { } @Override diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitMessage.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitMessage.java similarity index 71% rename from smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitMessage.java rename to smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitMessage.java index b4deda2b..b9cff37d 100644 --- a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitMessage.java +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/models/GitlabCommitMessage.java @@ -17,6 +17,7 @@ import de.atb.context.common.util.BusinessCase; import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -24,7 +25,7 @@ import thewebsemantic.Namespace; import thewebsemantic.RdfType; -@RdfType("GitMessage") +@RdfType("GitlabCommitMessage") @Namespace(BusinessCase.NS_DUMMY_URL) @Getter @Setter @@ -32,12 +33,14 @@ @NoArgsConstructor @AllArgsConstructor @ToString -public class GitMessage { - String timestamp; +@EqualsAndHashCode +public class GitlabCommitMessage { String user; String repository; String branch; - Integer noOfCommitsInBranch; - Integer noOfPushesInBranch; Integer noOfModifiedFiles; + // Has to be Integer because + // de.atb.context.persistence.monitoring.MonitoringDataRepository.persist(java.lang.String, java.lang.Class) + // cannot persist long values for some reason. + Integer timeSinceLastCommit; } diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitMonitor.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitMonitor.java deleted file mode 100644 index 443f9231..00000000 --- a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitMonitor.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.atb.context.monitoring.monitors; - -/* - * #%L - * SmartCLIDE Monitoring - * %% - * Copyright (C) 2021 ATB – Institut für angewandte Systemtechnik Bremen GmbH - * %% - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * #L% - */ - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -import de.atb.context.monitoring.config.models.DataSource; -import de.atb.context.monitoring.config.models.Interpreter; -import de.atb.context.monitoring.config.models.Monitor; -import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource; -import de.atb.context.monitoring.index.Indexer; -import de.atb.context.monitoring.monitors.messagebroker.MessageBrokerMonitor; -import de.atb.context.tools.ontology.AmIMonitoringConfiguration; -import eu.smartclide.contexthandling.dle.listener.DleGitMonitorProgressListener; - -public class GitMonitor extends MessageBrokerMonitor { - public GitMonitor(final DataSource dataSource, - final Interpreter interpreter, - final Monitor monitor, - final Indexer indexer, - final AmIMonitoringConfiguration configuration) throws IOException, TimeoutException { - super(dataSource, interpreter, monitor, indexer, configuration); - - // FIXME: this is a temporary workaround and should be removed! - addProgressListener(new DleGitMonitorProgressListener((MessageBrokerDataSource) dataSource)); - } -} diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitlabCommitMonitor.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitlabCommitMonitor.java new file mode 100644 index 00000000..75ccf904 --- /dev/null +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/monitors/GitlabCommitMonitor.java @@ -0,0 +1,42 @@ +package de.atb.context.monitoring.monitors; + +/* + * #%L + * SmartCLIDE Monitoring + * %% + * Copyright (C) 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import de.atb.context.monitoring.config.models.DataSource; +import de.atb.context.monitoring.config.models.Interpreter; +import de.atb.context.monitoring.config.models.Monitor; +import de.atb.context.monitoring.config.models.datasources.GitlabDataSource; +import de.atb.context.monitoring.index.Indexer; +import de.atb.context.monitoring.monitors.webservice.WebServiceMonitor; +import de.atb.context.tools.ontology.AmIMonitoringConfiguration; +import eu.smartclide.contexthandling.dle.listener.DleGitlabCommitMonitorProgressListener; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +public class GitlabCommitMonitor extends WebServiceMonitor { + public GitlabCommitMonitor(final DataSource dataSource, + final Interpreter interpreter, + final Monitor monitor, + final Indexer indexer, + final AmIMonitoringConfiguration configuration) throws IOException, TimeoutException { + super(dataSource, interpreter, monitor, indexer, configuration); + if (!(dataSource instanceof GitlabDataSource)) { + throw new IllegalArgumentException("Given dataSource must be of type GitlabDataSource!"); + } + this.addProgressListener(new DleGitlabCommitMonitorProgressListener((GitlabDataSource) dataSource)); + this.logger.info("Initialized GitlabCommitMonitor for uri: " + dataSource.getUri()); + } +} diff --git a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitParser.java b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitlabCommitParser.java similarity index 51% rename from smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitParser.java rename to smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitlabCommitParser.java index d0f2c252..4d5ea56b 100644 --- a/smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitParser.java +++ b/smartclide-monitoring/src/main/java/de/atb/context/monitoring/parser/GitlabCommitParser.java @@ -16,21 +16,26 @@ import de.atb.context.monitoring.config.models.DataSource; import de.atb.context.monitoring.config.models.InterpreterConfiguration; +import de.atb.context.monitoring.config.models.datasources.GitlabDataSource; import de.atb.context.monitoring.index.Indexer; -import de.atb.context.monitoring.parser.messagebroker.MessageBrokerParser; +import de.atb.context.monitoring.models.IWebService; +import de.atb.context.monitoring.parser.webservice.WebServiceParser; import de.atb.context.tools.ontology.AmIMonitoringConfiguration; import org.apache.lucene.document.Document; -public class GitParser extends MessageBrokerParser { - public GitParser(final DataSource dataSource, - final InterpreterConfiguration interpreterConfiguration, - final Indexer indexer, - final AmIMonitoringConfiguration amiConfiguration) { +public class GitlabCommitParser extends WebServiceParser { + public GitlabCommitParser(final DataSource dataSource, + final InterpreterConfiguration interpreterConfiguration, + final Indexer indexer, + final AmIMonitoringConfiguration amiConfiguration) { super(dataSource, interpreterConfiguration, indexer, amiConfiguration); + if (!(dataSource instanceof GitlabDataSource)) { + throw new IllegalArgumentException("Given dataSource must be of type GitlabDataSource!"); + } } @Override - protected boolean parseObject(final String message, final Document document) { + protected boolean parseObject(IWebService service, Document document) { return true; } } diff --git a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitMonitorProgressListener.java b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitMonitorProgressListener.java deleted file mode 100644 index 0b1ae8c4..00000000 --- a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitMonitorProgressListener.java +++ /dev/null @@ -1,90 +0,0 @@ -package eu.smartclide.contexthandling.dle.listener; - -/*- - * #%L - * SmartCLIDE Monitoring - * %% - * Copyright (C) 2015 - 2021 ATB – Institut für angewandte Systemtechnik Bremen GmbH - * %% - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - * #L% - */ - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSource; -import de.atb.context.monitoring.events.MonitoringProgressListener; -import de.atb.context.monitoring.models.GitDataModel; -import de.atb.context.monitoring.models.GitMessage; -import de.atb.context.monitoring.models.IMonitoringDataModel; -import de.atb.context.monitoring.monitors.messagebroker.util.MessageBrokerUtil; -import eu.smartclide.contexthandling.dle.model.CommitMessage; -import eu.smartclide.contexthandling.dle.model.DleMessage; -import org.apache.lucene.document.Document; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DleGitMonitorProgressListener implements MonitoringProgressListener> { - - private static final Logger LOGGER = LoggerFactory.getLogger(DleGitMonitorProgressListener.class); - - private final String topic; - private final String exchange; - private final Channel channel; - - public DleGitMonitorProgressListener(final MessageBrokerDataSource messageBrokerDataSource) - throws IOException, TimeoutException { - exchange = messageBrokerDataSource.getExchange(); - topic = messageBrokerDataSource.getDleTopic(); - channel = MessageBrokerUtil.connectToTopicExchange(messageBrokerDataSource); - } - - @Override - public void documentIndexed(final String indexId, final Document document) { - // noop - } - - @Override - public void documentParsed(final String parsed, final Document document) { - // noop - } - - @Override - public void documentAnalysed(final List> analysed, - final String parsed, - final Document document) { - analysed.stream() - .filter(iMonitoringDataModel -> iMonitoringDataModel instanceof GitDataModel) - .map(iMonitoringDataModel -> (GitDataModel) iMonitoringDataModel) - .flatMap(gitDataModel -> gitDataModel.getGitMessages().stream()) - .map(this::convertToDleMessage) - .forEach(this::send); - } - - private DleMessage convertToDleMessage(final GitMessage gitMessage) { - return DleMessage.builder() - .monitor(CommitMessage.builder() - .user(gitMessage.getUser()) - .branch(gitMessage.getBranch()) - .files(gitMessage.getNoOfModifiedFiles()) - .build()) - .build(); - } - - private void send(final DleMessage dleMessage) { - try { - // simulate that actual context-extraction will take some time - Thread.sleep(1000); - MessageBrokerUtil.convertAndSendToTopic(channel, exchange, topic, dleMessage); - } catch (Exception e) { - LOGGER.error("Failed to send {} to {}/{}", dleMessage, exchange, topic, e); - } - } -} diff --git a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitlabCommitMonitorProgressListener.java b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitlabCommitMonitorProgressListener.java new file mode 100644 index 00000000..bed10013 --- /dev/null +++ b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/listener/DleGitlabCommitMonitorProgressListener.java @@ -0,0 +1,132 @@ +package eu.smartclide.contexthandling.dle.listener; + +/*- + * #%L + * SmartCLIDE Monitoring + * %% + * Copyright (C) 2015 - 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import com.rabbitmq.client.Channel; +import de.atb.context.monitoring.config.models.datasources.GitlabDataSource; +import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSourceOptions; +import de.atb.context.monitoring.events.MonitoringProgressListener; +import de.atb.context.monitoring.models.GitlabCommitDataModel; +import de.atb.context.monitoring.models.GitlabCommitMessage; +import de.atb.context.monitoring.models.IMonitoringDataModel; +import de.atb.context.monitoring.models.IWebService; +import de.atb.context.monitoring.monitors.messagebroker.util.MessageBrokerUtil; +import eu.smartclide.contexthandling.dle.model.CommitMessage; +import eu.smartclide.contexthandling.dle.model.DleMessage; +import org.apache.commons.lang3.StringUtils; +import org.apache.lucene.document.Document; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeoutException; + +public class DleGitlabCommitMonitorProgressListener + implements MonitoringProgressListener> { + + private final Channel channel; + private final String exchange; + private final String topic; + private final String queue; + private final boolean useTopic; + + public DleGitlabCommitMonitorProgressListener(final GitlabDataSource dataSource) + throws IOException, TimeoutException { + exchange = dataSource.getOutgoingExchange(); + topic = dataSource.getOutgoingTopic(); + queue = dataSource.getOutgoingQueue(); + useTopic = isOutgoingTopic(); + channel = connectToMessageBroker( + dataSource.getMessageBrokerServer(), + dataSource.getMessageBrokerPort(), + dataSource.getUserName(), + dataSource.getPassword(), + dataSource.isOutgoingDurable() + ); + } + + @Override + public void documentIndexed(final String indexId, final Document document) { + // noop + } + + @Override + public void documentParsed(final IWebService parsed, final Document document) { + // noop + } + + @Override + public void documentAnalysed(final List> analysed, + final IWebService parsed, + final Document document) { + analysed.stream() + .filter(iMonitoringDataModel -> iMonitoringDataModel instanceof GitlabCommitDataModel) + .map(iMonitoringDataModel -> (GitlabCommitDataModel) iMonitoringDataModel) + .flatMap(gitlabCommitDataModel -> gitlabCommitDataModel.getGitlabCommitMessages().stream()) + .map(this::convertToDleMessage) + .forEach(this::send); + } + + private DleMessage convertToDleMessage(final GitlabCommitMessage gitlabCommitMessage) { + return DleMessage.builder() + .monitor(CommitMessage.builder() + .repoId(gitlabCommitMessage.getRepository()) + .user(gitlabCommitMessage.getUser()) + .branch(gitlabCommitMessage.getBranch()) + .timeSinceLastCommit(gitlabCommitMessage.getTimeSinceLastCommit()) + .numberOfFilesModified(gitlabCommitMessage.getNoOfModifiedFiles()) + .build()) + .build(); + } + + private void send(final DleMessage dleMessage) { + if (useTopic) { + MessageBrokerUtil.convertAndSendToTopic(channel, exchange, topic, dleMessage); + } else { + MessageBrokerUtil.convertAndSendToQueue(channel, queue, dleMessage); + } + } + + private Channel connectToMessageBroker(final String host, + final Integer port, + final String userName, + final String password, + final boolean durable) throws IOException, TimeoutException { + return useTopic + ? MessageBrokerUtil.connectToTopicExchange(host, port, userName, password, exchange, durable) + : MessageBrokerUtil.connectToQueue(host, port, userName, password, queue, durable); + } + + private boolean isOutgoingTopic() { + if (StringUtils.isBlank(topic) && StringUtils.isBlank(queue)) { + throw new IllegalArgumentException(String.format( + "Must specify either %s or %s!", + MessageBrokerDataSourceOptions.OutgoingTopic.getKeyName(), + MessageBrokerDataSourceOptions.OutgoingQueue.getKeyName() + )); + } + if (StringUtils.isNotBlank(topic)) { + if (StringUtils.isBlank(exchange)) { + throw new IllegalArgumentException(String.format( + "Must specify %s when connecting to topic %s!", + MessageBrokerDataSourceOptions.OutgoingExchange.getKeyName(), + MessageBrokerDataSourceOptions.OutgoingTopic.getKeyName() + )); + } + return true; + } else { + return false; + } + } +} diff --git a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/model/CommitMessage.java b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/model/CommitMessage.java index f13552be..5ba45455 100644 --- a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/model/CommitMessage.java +++ b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/dle/model/CommitMessage.java @@ -14,6 +14,7 @@ * #L% */ +import com.google.gson.annotations.SerializedName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -32,7 +33,12 @@ public class CommitMessage { final String header = "new commit"; @Builder.Default final String state = "info"; + @SerializedName("repo_id") + String repoId; String user; String branch; - Integer files; + @SerializedName("time_since_last_commit") + Integer timeSinceLastCommit; + @SerializedName("number_of_files_modified") + Integer numberOfFilesModified; } diff --git a/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/services/GitlabApiClient.java b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/services/GitlabApiClient.java new file mode 100644 index 00000000..d8f3f2fc --- /dev/null +++ b/smartclide-monitoring/src/main/java/eu/smartclide/contexthandling/services/GitlabApiClient.java @@ -0,0 +1,213 @@ +package eu.smartclide.contexthandling.services; + +/*- + * #%L + * SmartCLIDE Monitoring + * %% + * Copyright (C) 2015 - 2022 ATB – Institut für angewandte Systemtechnik Bremen GmbH + * %% + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * #L% + */ + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import de.atb.context.monitoring.models.GitlabCommitMessage; +import org.apache.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.LinkedList; +import java.util.List; + +public class GitlabApiClient { + + private static final Logger logger = LoggerFactory.getLogger(GitlabApiClient.class); + private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME; + private static final String membershipParam = "&membership=true"; + private static final String paginationParam = "&per_page=100"; + private static final String refNameParam = "&ref_name="; + private static final String sinceParam = "&since="; + private static final ZonedDateTime initialSinceDate = ZonedDateTime.of(2022, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + private static final String uriPartForBranches = "/repository/branches/"; + private static final String uriPartForCommits = "/repository/commits/"; + private static final String uriPartForDiff = "/diff/"; + private static final String uriPartForProjects = "/api/v4/projects/"; + private final String baseUri; + private final String uriParams; + private ZonedDateTime lastRun = null; + + public GitlabApiClient(String accessToken, String gitlabBaseUri) { + String accessTokenParam = "private_token=" + accessToken; + this.uriParams = "?" + accessTokenParam + membershipParam + paginationParam; + this.baseUri = gitlabBaseUri + uriPartForProjects; + } + + /** + * generates gitMessages for given user + * creates separate message for the commit in all the branches + * + * @return List + */ + public List getGitlabCommitMessages() { + // if we are running for the first time get all commits since `initialSinceDate` + // otherwise get all commits since last run + final ZonedDateTime nowAtUtc = ZonedDateTime.now(ZoneOffset.UTC); + final String sinceDateTime = (lastRun == null) + ? initialSinceDate.format(formatter) + : lastRun.format(formatter); + // adjust the time of last run + lastRun = nowAtUtc; + + // first get all user projects + JsonArray projects = getUserProjects(); + logger.info("Total {} user projects are found", projects.size()); + List gitlabCommitMessages = new LinkedList<>(); + for (JsonElement project : projects) { + JsonObject projectJsonObject = project.getAsJsonObject(); + // get project id + String projectId = projectJsonObject.get("id").getAsString(); + // get all branches for given project, create a new GitMessage + JsonArray branches = getAllBranchesForGivenProject(projectId); + logger.info("Total {} branches for project with given ID {} are found", branches.size(), projectId); + + for (JsonElement branch : branches) { + String branchName = branch.getAsJsonObject().get("name").getAsString(); + // get all commits for given branch + JsonArray commitsInBranch = getCommitsForGivenBranch(projectId, branchName, sinceDateTime); + logger.info("Total {} commits are found in a given branch with name '{}' since {}", + commitsInBranch.size(), branchName, sinceDateTime); + + for (JsonElement commit : commitsInBranch) { + JsonObject commitJsonObject = commit.getAsJsonObject(); + String commitId = commitJsonObject.get("id").getAsString(); + + GitlabCommitMessage gitlabCommitMessage = new GitlabCommitMessage(); + gitlabCommitMessage.setUser(commitJsonObject.get("author_name").getAsString()); + gitlabCommitMessage.setRepository(projectJsonObject.get("path_with_namespace").getAsString()); + gitlabCommitMessage.setBranch(branchName); + final int timeSinceLastCommit = calculateTimeSinceLastCommit(projectId, commitJsonObject); + gitlabCommitMessage.setTimeSinceLastCommit(timeSinceLastCommit); + JsonArray newCommitDiff = getCommitDiff(projectId, commitId); + gitlabCommitMessage.setNoOfModifiedFiles(newCommitDiff.size()); + gitlabCommitMessages.add(gitlabCommitMessage); + logger.info("Git commit message: " + gitlabCommitMessage); + } + } + } + return gitlabCommitMessages; + } + + private int calculateTimeSinceLastCommit(String projectId, JsonObject commit) { + int difference = 0; + // check if parent id exists for given commit + if (commit.get("parent_ids").getAsJsonArray().size() > 0) { + String parentCommitId = commit.get("parent_ids").getAsString(); + String commitCreationDateStr = commit.get("created_at").getAsString(); + JsonObject parentCommit = getCommitById(projectId, parentCommitId); + if (parentCommit.getAsJsonObject().has("created_at")) { + String parentCommitCreationDateStr = parentCommit.getAsJsonObject().get("created_at").getAsString(); + + try { + ZonedDateTime commitCreationDate = ZonedDateTime.parse(commitCreationDateStr, formatter); + ZonedDateTime parentCommitCreationDate = ZonedDateTime.parse(parentCommitCreationDateStr, formatter); + long longDifference = commitCreationDate.toInstant().getEpochSecond() - + parentCommitCreationDate.toInstant().getEpochSecond(); + if (longDifference <= (long) Integer.MAX_VALUE) { + difference = (int) longDifference; + } + } catch (DateTimeParseException e) { + logger.error("Failed to parse commit creation date", e); + } + } else { + logger.info("Could not get parent commit with ID: {}", parentCommitId); + } + } else { + logger.info("No parent commit exist for commit with ID: {}", commit.get("id").getAsString()); + } + return difference; + } + + private JsonArray getAllBranchesForGivenProject(String projectId) { + return parseHttpResponseToJsonArray(makeGetCallToGitlab(baseUri + projectId + uriPartForBranches + uriParams)); + } + + private JsonObject getCommitById(String projectId, String commitId) { + return parseHttpResponseToJsonObject(makeGetCallToGitlab(baseUri + projectId + + uriPartForCommits + commitId + uriParams)); + } + + private JsonArray getCommitDiff(String projectId, String commitId) { + return parseHttpResponseToJsonArray(makeGetCallToGitlab(baseUri + projectId + + uriPartForCommits + commitId + uriPartForDiff + uriParams)); + } + + private JsonArray getCommitsForGivenBranch(String projectId, String branchName, String sinceDateTime) { + return parseHttpResponseToJsonArray(makeGetCallToGitlab(baseUri + projectId + + uriPartForCommits + uriParams + refNameParam + branchName + sinceParam + sinceDateTime)); + } + + private JsonArray getUserProjects() { + return parseHttpResponseToJsonArray(makeGetCallToGitlab(baseUri + uriParams)); + } + + /** + * this method makes get call to gitlab server with given uri + * + * @param uri as string + * @return HttpResponse as response + */ + private HttpResponse makeGetCallToGitlab(String uri) { + + HttpResponse response = null; + try { + final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofMinutes(5)) + .build(); + + HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(uri)).build(); + + // receive response from Gitlab + response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + + if (response.statusCode() != HttpStatus.SC_OK) { + logger.error("Http response error:" + response.statusCode() + response.body()); + return null; + } + } catch (IOException | InterruptedException e) { + logger.error("HTTP Client connection interruption exception", e); + } + return response; + } + + private JsonArray parseHttpResponseToJsonArray(HttpResponse response) { + if (response != null) { + return JsonParser.parseString(response.body()).getAsJsonArray(); + } + return new JsonArray(); + } + + private JsonObject parseHttpResponseToJsonObject(HttpResponse response) { + if (response != null) { + return JsonParser.parseString(response.body()).getAsJsonObject(); + } + return new JsonObject(); + } +} diff --git a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabApiClientTest.java b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabApiClientTest.java new file mode 100644 index 00000000..bfd666a2 --- /dev/null +++ b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabApiClientTest.java @@ -0,0 +1,45 @@ +package de.atb.context.monitoring; + +import de.atb.context.monitoring.models.GitlabCommitMessage; +import eu.smartclide.contexthandling.services.GitlabApiClient; +import org.apache.commons.lang3.StringUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class GitlabApiClientTest { + + private static final String gitlabBaseUri = "https://gitlab.atb-bremen.de"; + private GitlabApiClient gitlabApiClient; + + @Before + public void setup() { + final String gitlabApiToken = System.getenv("SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN"); + if (StringUtils.isBlank(gitlabApiToken)) { + throw new IllegalStateException("Did not find valid GitLab API token in \"SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN\" environment variable!"); + } + gitlabApiClient = new GitlabApiClient(gitlabApiToken, gitlabBaseUri); + } + + @Test + public void testGetGitlabCommitMessages() { + List gitlabCommitMessages = gitlabApiClient.getGitlabCommitMessages(); + + assertFalse(gitlabCommitMessages.isEmpty()); + gitlabCommitMessages.forEach(gitlabCommitMessage -> { + assertTrue(StringUtils.isNotBlank(gitlabCommitMessage.getUser())); + assertTrue(StringUtils.isNotBlank(gitlabCommitMessage.getRepository())); + assertTrue(StringUtils.isNotBlank(gitlabCommitMessage.getBranch())); + assertTrue(gitlabCommitMessage.getNoOfModifiedFiles() > 0); + assertTrue(gitlabCommitMessage.getTimeSinceLastCommit() >= 0); + }); + + // run a second time, which should not produce any results + gitlabCommitMessages = gitlabApiClient.getGitlabCommitMessages(); + assertTrue(gitlabCommitMessages.isEmpty()); + } +} diff --git a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabCommitMonitorTest.java b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabCommitMonitorTest.java new file mode 100644 index 00000000..4aa43415 --- /dev/null +++ b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/GitlabCommitMonitorTest.java @@ -0,0 +1,167 @@ +package de.atb.context.monitoring; + +import com.rabbitmq.client.Channel; +import de.atb.context.common.ContextPathUtils; +import de.atb.context.common.util.ApplicationScenario; +import de.atb.context.monitoring.analyser.FakeGitlabCommitAnalyser; +import de.atb.context.monitoring.config.models.Config; +import de.atb.context.monitoring.config.models.datasources.GitlabDataSource; +import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSourceOptions; +import de.atb.context.monitoring.models.GitlabCommitDataModel; +import de.atb.context.monitoring.models.GitlabCommitMessage; +import de.atb.context.monitoring.monitors.messagebroker.util.MessageBrokerUtil; +import de.atb.context.services.IAmIMonitoringDataRepositoryService; +import de.atb.context.services.manager.ServiceManager; +import de.atb.context.services.wrapper.AmIMonitoringDataRepositoryServiceWrapper; +import eu.smartclide.contexthandling.ServiceMain; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.simpleframework.xml.core.Persister; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.RabbitMQContainer; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +/** + * TestMonitoringService + * + * @author scholze + * @version $LastChangedRevision: 577 $ + */ +public class GitlabCommitMonitorTest { + + private static final Logger logger = LoggerFactory.getLogger(GitlabCommitMonitorTest.class); + + private static final String RABBITMQ_3_ALPINE = "rabbitmq:3-alpine"; + private static final String QUEUE_NAME_DLE = "code_repo_recommendation_queue"; + private static final String DATASOURCE_GITLAB = "datasource-gitlab"; + private static final String MONITORING_CONFIG_FILE_NAME = "monitoring-config.xml"; + private static final String AMI_REPOSITORY_ID = "AmI-repository"; + + private AmIMonitoringDataRepositoryServiceWrapper monitoringDataRepository; + private Channel fakeDleChannel; + + private int fakeDleNumberOfReceivedMessages; + + // starts a new rabbitmq message broker in a docker container. + // @Rule must be final. + @Rule + public final RabbitMQContainer container = new RabbitMQContainer(RABBITMQ_3_ALPINE).withAdminPassword(null); + + @Before + public void setup() throws Exception { + final String gitlabApiToken = System.getenv("SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN"); + if (StringUtils.isBlank(gitlabApiToken)) { + throw new IllegalStateException("Did not find valid GitLab API token in \"SMARTCLIDE_CONTEXT_GITLAB_API_TOKEN\" environment variable!"); + } + + // setup message broker + final String rabbitMQContainerHost = container.getHost(); + final Integer rabbitMQContainerAmqpPort = container.getAmqpPort(); + + // setup fake DLE + fakeDleNumberOfReceivedMessages = 0; + fakeDleChannel = createFakeDleListener(rabbitMQContainerHost, rabbitMQContainerAmqpPort); + + // write dynamically allocated message broker host and port to monitoring config file + final Path monitoringConfigFilePath = ContextPathUtils.getConfigDirPath().resolve(MONITORING_CONFIG_FILE_NAME); + updateDataSource(monitoringConfigFilePath, rabbitMQContainerHost, rabbitMQContainerAmqpPort, gitlabApiToken); + + // start service + ServiceMain.startService(); + + // get repository service + monitoringDataRepository = ServiceManager.getLSWServiceContainer().stream() + .filter(container -> container.getId().equals(AMI_REPOSITORY_ID)) + .findFirst() + .map(container -> { + final IAmIMonitoringDataRepositoryService repositoryService = + ServiceManager.getWebservice(container); + return new AmIMonitoringDataRepositoryServiceWrapper<>(repositoryService); + }) + .orElseThrow(() -> new RuntimeException("Could not setup AmI-repository!")); + } + + @After + public void tearDown() throws IOException, TimeoutException { + if (monitoringDataRepository != null) { + monitoringDataRepository.shutdown(); + } + + if (fakeDleChannel != null) { + fakeDleChannel.close(); + } + fakeDleNumberOfReceivedMessages = 0; + } + + @Test + public void testDoMonitor() throws InterruptedException { + // give services some time to start up and run monitor once + Thread.sleep(15000); + + // get the latest entry of monitored data from the repository + List data = monitoringDataRepository.getMonitoringData( + ApplicationScenario.getInstance(), + GitlabCommitDataModel.class, + 1 + ); + assertEquals(1, data.size()); + + final List gitlabCommitMessages = data.get(0).getGitlabCommitMessages(); + assertArrayEquals( + FakeGitlabCommitAnalyser.FAKE_GITLAB_COMMIT_MESSAGES.toArray(), + gitlabCommitMessages.toArray() + ); + + // assert that all GitLabCommitMessages have been received by fake DLE + assertEquals(gitlabCommitMessages.size(), fakeDleNumberOfReceivedMessages); + } + + private void updateDataSource(final Path monitoringConfig, + final String messageBrokerHost, + final Integer messageBrokerPort, + final String gitlabApiToken) throws Exception { + final Persister persister = new Persister(); + final Config config = persister.read(Config.class, new File(monitoringConfig.toString())); + final Map optionsMap = config.getDataSource(DATASOURCE_GITLAB).getOptionsMap(); + optionsMap.put(MessageBrokerDataSourceOptions.MessageBrokerServer.getKeyName(), messageBrokerHost); + optionsMap.put(MessageBrokerDataSourceOptions.MessageBrokerPort.getKeyName(), messageBrokerPort.toString()); + optionsMap.put(GitlabDataSource.ACCESS_TOKEN_OPTION.getKeyName(), gitlabApiToken); + config.getDataSource(DATASOURCE_GITLAB).setOptions(optionsMap); + persister.write(config, new File(monitoringConfig.toString())); + } + + private Channel createFakeDleListener(final String rabbitMQContainerHost, final Integer rabbitMQContainerAmqpPort) + throws IOException, TimeoutException { + final Channel channel = MessageBrokerUtil.connectToQueue( + rabbitMQContainerHost, + rabbitMQContainerAmqpPort, + null, + null, + QUEUE_NAME_DLE, + false + ); + channel.basicConsume( + QUEUE_NAME_DLE, + true, + (t, m) -> { + logger.info("DLE received message: {}", new String(m.getBody(), StandardCharsets.UTF_8)); + fakeDleNumberOfReceivedMessages++; + }, + (t) -> logger.info("cancelled!") + ); + return channel; + } +} diff --git a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/TestDataRetrieval.java b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/TestDataRetrieval.java deleted file mode 100644 index 7947667f..00000000 --- a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/TestDataRetrieval.java +++ /dev/null @@ -1,159 +0,0 @@ -package de.atb.context.monitoring; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import de.atb.context.common.ContextPathUtils; -import de.atb.context.common.util.ApplicationScenario; -import de.atb.context.monitoring.config.models.Config; -import de.atb.context.monitoring.config.models.datasources.MessageBrokerDataSourceOptions; -import de.atb.context.monitoring.models.GitDataModel; -import de.atb.context.monitoring.models.GitMessage; -import de.atb.context.monitoring.monitors.messagebroker.util.MessageBrokerUtil; -import de.atb.context.services.IAmIMonitoringDataRepositoryService; -import de.atb.context.services.manager.ServiceManager; -import de.atb.context.services.wrapper.AmIMonitoringDataRepositoryServiceWrapper; -import eu.smartclide.contexthandling.ServiceMain; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.simpleframework.xml.core.Persister; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.RabbitMQContainer; - -import static org.junit.Assert.assertEquals; - -/** - * TestMonitoringService - * - * @author scholze - * @version $LastChangedRevision: 577 $ - */ -public class TestDataRetrieval { - - private static final Logger logger = LoggerFactory.getLogger(TestDataRetrieval.class); - - private static final String RABBITMQ_3_ALPINE = "rabbitmq:3-alpine"; - private static final String EXCHANGE_NAME = "smartclide-monitoring"; - private static final String ROUTING_KEY_MONITORING = "monitoring.git.commits"; - private static final String ROUTING_KEY_DLE = "dle.git.commits"; - private static final String QUEUE_PREFIX_DLE = "Fake-DLE"; - private static final String DATASOURCE_GIT = "datasource-git"; - private static final String MONITORING_CONFIG_FILE_NAME = "monitoring-config.xml"; - private static final String AMI_REPOSITORY_ID = "AmI-repository"; - - private AmIMonitoringDataRepositoryServiceWrapper monitoringDataRepository; - - private Channel channel; - - // starts a new rabbitmq message broker in a docker container. - // @Rule must be final. - @Rule - public final RabbitMQContainer container = new RabbitMQContainer(RABBITMQ_3_ALPINE).withAdminPassword(null); - - @Before - public void setup() throws Exception { - // setup message broker - final String rabbitMQContainerHost = container.getHost(); - final Integer rabbitMQContainerAmqpPort = container.getAmqpPort(); - channel = MessageBrokerUtil.connectToTopicExchange(rabbitMQContainerHost, rabbitMQContainerAmqpPort, EXCHANGE_NAME); - - // setup fake DLE - createFakeDleListener(); - - // write dynamically allocated message broker host and port to monitoring config file - final Path monitoringConfigFilePath = ContextPathUtils.getConfigDirPath().resolve(MONITORING_CONFIG_FILE_NAME); - updateMessageBrokerDataSource(monitoringConfigFilePath, rabbitMQContainerHost, rabbitMQContainerAmqpPort); - - // start service - ServiceMain.startService(); - - // get repository service - monitoringDataRepository = ServiceManager.getLSWServiceContainer().stream() - .filter(container -> container.getId().equals(AMI_REPOSITORY_ID)) - .findFirst() - .map(container -> { - final IAmIMonitoringDataRepositoryService repositoryService = - ServiceManager.getWebservice(container); - return new AmIMonitoringDataRepositoryServiceWrapper<>(repositoryService); - }) - .orElseThrow(() -> new RuntimeException("Could not setup AmI-repository!")); - } - - @After - public void tearDown() throws IOException, TimeoutException { - monitoringDataRepository.shutdown(); - - if (channel != null) { - channel.close(); - } - } - - @Test - public void testDataRetrieval() throws IOException, InterruptedException { - // wait for services to start - Thread.sleep(10000); - - final GitMessage gitMessage = GitMessage.builder() - .timestamp(ZonedDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)) - .user("user@smartclide.eu") - .repository("git@github.com:eclipse-researchlabs/smartclide-context.git") - .branch("branch") - .noOfCommitsInBranch(42) - .noOfModifiedFiles(3) - .noOfPushesInBranch(17) - .build(); - MessageBrokerUtil.convertAndSendToTopic(channel, EXCHANGE_NAME, ROUTING_KEY_MONITORING, gitMessage); - - Thread.sleep(10000); - - // get the monitored data from the repository (the latest registry) - final List data = - monitoringDataRepository.getMonitoringData(ApplicationScenario.getInstance(), GitDataModel.class, 1); - - assertEquals(1, data.size()); - final List gitMessages = data.get(0).getGitMessages(); - assertEquals(1, gitMessages.size()); - final GitMessage fromRepo = gitMessages.get(0); - assertEquals(gitMessage.getTimestamp(), fromRepo.getTimestamp()); - assertEquals(gitMessage.getUser(), fromRepo.getUser()); - assertEquals(gitMessage.getRepository(), fromRepo.getRepository()); - assertEquals(gitMessage.getBranch(), fromRepo.getBranch()); - assertEquals(gitMessage.getNoOfCommitsInBranch(), fromRepo.getNoOfCommitsInBranch()); - assertEquals(gitMessage.getNoOfModifiedFiles(), fromRepo.getNoOfModifiedFiles()); - assertEquals(gitMessage.getNoOfPushesInBranch(), fromRepo.getNoOfPushesInBranch()); - } - - private void updateMessageBrokerDataSource(final Path monitoringConfig, final String host, final Integer port) - throws Exception { - final Persister persister = new Persister(); - final Config config = persister.read(Config.class, new File(monitoringConfig.toString())); - final Map optionsMap = config.getDataSource(DATASOURCE_GIT).getOptionsMap(); - optionsMap.put(MessageBrokerDataSourceOptions.MessageBrokerServer.getKeyName(), host); - optionsMap.put(MessageBrokerDataSourceOptions.MessageBrokerPort.getKeyName(), port.toString()); - config.getDataSource(DATASOURCE_GIT).setOptions(optionsMap); - persister.write(config, new File(monitoringConfig.toString())); - } - - private void createFakeDleListener() throws IOException { - MessageBrokerUtil.registerListenerOnTopic( - channel, - EXCHANGE_NAME, - ROUTING_KEY_DLE, - QUEUE_PREFIX_DLE, - (t, m) -> logger.info("DLE received message: {}", new String(m.getBody(), StandardCharsets.UTF_8)), - (t) -> logger.info("cancelled!") - ); - } - -} diff --git a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/analyser/FakeGitlabCommitAnalyser.java b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/analyser/FakeGitlabCommitAnalyser.java new file mode 100644 index 00000000..8fbd5b21 --- /dev/null +++ b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/analyser/FakeGitlabCommitAnalyser.java @@ -0,0 +1,62 @@ +package de.atb.context.monitoring.analyser; + +import de.atb.context.monitoring.config.models.DataSource; +import de.atb.context.monitoring.config.models.InterpreterConfiguration; +import de.atb.context.monitoring.index.Indexer; +import de.atb.context.monitoring.models.GitlabCommitDataModel; +import de.atb.context.monitoring.models.GitlabCommitMessage; +import de.atb.context.monitoring.models.IWebService; +import de.atb.context.tools.ontology.AmIMonitoringConfiguration; +import org.apache.lucene.document.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; + +public class FakeGitlabCommitAnalyser extends GitlabCommitAnalyser { + + public static final List FAKE_GITLAB_COMMIT_MESSAGES = List.of( + GitlabCommitMessage.builder() + .user("John Doe") + .repository("smartclide/foo") + .branch("main") + .noOfModifiedFiles(29) + .timeSinceLastCommit(0) + .build(), + GitlabCommitMessage.builder() + .user("John Doe") + .repository("smartclide/bar") + .branch("fix/4711/npe-in-constructor") + .noOfModifiedFiles(4) + .timeSinceLastCommit(766) + .build(), + GitlabCommitMessage.builder() + .user("Jane Doe") + .repository("smartclide/bar") + .branch("main") + .noOfModifiedFiles(17) + .timeSinceLastCommit(98438) + .build() + ); + + private static final Logger logger = LoggerFactory.getLogger(GitlabCommitAnalyser.class); + + public FakeGitlabCommitAnalyser(final DataSource dataSource, + final InterpreterConfiguration interpreterConfiguration, + final Indexer indexer, + final Document document, + final AmIMonitoringConfiguration amiConfiguration) { + super(dataSource, interpreterConfiguration, indexer, document, amiConfiguration); + } + + @Override + public List analyseObject(final IWebService service) { + final List gitlabCommitMessages = FAKE_GITLAB_COMMIT_MESSAGES; + final GitlabCommitDataModel model = new GitlabCommitDataModel(); + model.setGitlabCommitMessages(gitlabCommitMessages); + model.setMonitoredAt(new Date()); + logger.info("Analysed {} GitlabCommitMessages", gitlabCommitMessages.size()); + return List.of(model); + } +} diff --git a/smartclide-monitoring/src/test/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSourceTest.java b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSourceTest.java new file mode 100644 index 00000000..3161d710 --- /dev/null +++ b/smartclide-monitoring/src/test/java/de/atb/context/monitoring/config/models/datasources/GitlabDataSourceTest.java @@ -0,0 +1,55 @@ +package de.atb.context.monitoring.config.models.datasources; + +import de.atb.context.monitoring.config.MonitoringConfiguration; +import de.atb.context.monitoring.config.models.DataSource; +import org.hamcrest.core.IsInstanceOf; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.util.Objects; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; + +public class GitlabDataSourceTest { + + @Test + public void configFileShouldBeDeserializedToCorrectGitlabDataSource() throws URISyntaxException { + final String expectedId = "datasource-gitlab"; + final String expectedMonitor = "de.atb.context.monitoring.monitors.GitlabCommitMonitor"; + final String expectedUri = "https://gitlab.example.com"; + final String expectedAccessToken = "s3cr3t"; + final String expectedMessageBrokerHost = "localhost"; + final int expectedMessageBrokerPort = 5672; + final String expectedUsername = "username"; + final String expectedPassword = "password"; + final String expectedOutgoingQueue = "code_repo_recommendation_queue"; + final URI uri = Objects.requireNonNull(this.getClass().getResource("/config/gitlab-monitoring")).toURI(); + final String configDirPath = Path.of(uri).toAbsolutePath().toString(); + + final String configFileName = "monitoring-config.xml"; + + final MonitoringConfiguration config = MonitoringConfiguration.getInstance(configFileName, configDirPath); + + assertThat(config, is(notNullValue())); + assertThat(config.getDataSources(), is(notNullValue())); + assertThat(config.getDataSources().size(), equalTo(1)); + final DataSource dataSource = config.getDataSources().get(0); + assertThat(dataSource, is(IsInstanceOf.instanceOf(GitlabDataSource.class))); + final GitlabDataSource gitlabDataSource = (GitlabDataSource) dataSource; + assertThat(gitlabDataSource.getId(), equalTo(expectedId)); + assertThat(gitlabDataSource.getMonitor(), equalTo(expectedMonitor)); + assertThat(gitlabDataSource.getUri(), equalTo(expectedUri)); + assertThat(gitlabDataSource.getGitLabAccessToken(), equalTo(expectedAccessToken)); + assertThat(gitlabDataSource.getMessageBrokerServer(), equalTo(expectedMessageBrokerHost)); + assertThat(gitlabDataSource.getMessageBrokerPort(), equalTo(expectedMessageBrokerPort)); + assertThat(gitlabDataSource.getUserName(), equalTo(expectedUsername)); + assertThat(gitlabDataSource.getPassword(), equalTo(expectedPassword)); + assertThat(gitlabDataSource.getOutgoingQueue(), equalTo(expectedOutgoingQueue)); + assertThat(gitlabDataSource.isOutgoingDurable(), equalTo(false)); + } +} diff --git a/smartclide-monitoring/src/test/resources/config/gitlab-monitoring/monitoring-config.xml b/smartclide-monitoring/src/test/resources/config/gitlab-monitoring/monitoring-config.xml new file mode 100644 index 00000000..c09465bc --- /dev/null +++ b/smartclide-monitoring/src/test/resources/config/gitlab-monitoring/monitoring-config.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/smartclide-monitoring/src/test/resources/config/monitoring-config.xml b/smartclide-monitoring/src/test/resources/config/monitoring-config.xml index ee36fae9..720ba342 100644 --- a/smartclide-monitoring/src/test/resources/config/monitoring-config.xml +++ b/smartclide-monitoring/src/test/resources/config/monitoring-config.xml @@ -4,26 +4,30 @@ xsi:schemaLocation="http://www.atb-bremen.de monitoring-config.xsd"> - + - + - + + parser="de.atb.context.monitoring.parser.GitlabCommitParser" + analyser="de.atb.context.monitoring.analyser.FakeGitlabCommitAnalyser"/> - +