diff --git a/docker-compose.yml b/docker-compose.yml index 915d96c69c..d9d9a97fc0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.7' +version: '3' services: nakadi: @@ -58,4 +58,4 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_BROKER_ID: 0 volumes: - - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file + - /var/run/docker.sock:/var/run/docker.sock diff --git a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java index d5625e1ee1..d9a3965460 100644 --- a/src/main/java/org/zalando/nakadi/controller/EventTypeController.java +++ b/src/main/java/org/zalando/nakadi/controller/EventTypeController.java @@ -88,7 +88,7 @@ public ResponseEntity create(@Valid @RequestBody final EventTypeBase eventTyp throw new ValidationException(errors); } - eventTypeService.create(eventType); + eventTypeService.create(eventType, true); return ResponseEntity.status(HttpStatus.CREATED).headers(generateWarningHeaders(eventType)).build(); } diff --git a/src/main/java/org/zalando/nakadi/controller/VersionController.java b/src/main/java/org/zalando/nakadi/controller/VersionController.java index 8c7473cb1a..bad47a87ab 100644 --- a/src/main/java/org/zalando/nakadi/controller/VersionController.java +++ b/src/main/java/org/zalando/nakadi/controller/VersionController.java @@ -9,6 +9,7 @@ import org.springframework.web.bind.annotation.RestController; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -90,6 +91,8 @@ public VersionInfo getVersion() { private static ScmSource loadScmSource(final ObjectMapper objectMapper) { try (InputStream in = new FileInputStream(SCM_SOURCE_FILE)) { return objectMapper.readValue(in, ScmSource.class); + } catch (FileNotFoundException ex) { + LOG.warn("Failed to read scm-source.json file from " + SCM_SOURCE_FILE + ", file not found"); } catch (IOException ex) { LOG.warn("Failed to read scm-source.json file from " + SCM_SOURCE_FILE, ex); } diff --git a/src/main/java/org/zalando/nakadi/service/EventTypeService.java b/src/main/java/org/zalando/nakadi/service/EventTypeService.java index 16a07d2745..64bba0764c 100644 --- a/src/main/java/org/zalando/nakadi/service/EventTypeService.java +++ b/src/main/java/org/zalando/nakadi/service/EventTypeService.java @@ -135,7 +135,7 @@ public List list() { return eventTypeRepository.list(); } - public void create(final EventTypeBase eventType) + public void create(final EventTypeBase eventType, final boolean checkAuth) throws TopicCreationException, InternalNakadiException, NoSuchPartitionStrategyException, @@ -158,7 +158,9 @@ public void create(final EventTypeBase eventType) validateCompaction(eventType); enrichment.validate(eventType); partitionResolver.validate(eventType); - authorizationValidator.validateAuthorization(eventType.asBaseResource()); + if (checkAuth) { + authorizationValidator.validateAuthorization(eventType.asBaseResource()); + } eventTypeRepository.saveEventType(eventType); diff --git a/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java index f59b6e5eb6..ce3ca18d5f 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiAuditLogInitialization.java @@ -1,9 +1,5 @@ package org.zalando.nakadi.service; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.common.io.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -11,19 +7,17 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; -import org.zalando.nakadi.domain.EventTypeBase; -import org.zalando.nakadi.exceptions.runtime.DuplicatedEventTypeNameException; -import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; @Component @ConfigurationProperties(prefix = "nakadi.audit") public class NakadiAuditLogInitialization { private static final Logger LOG = LoggerFactory.getLogger(NakadiAuditLogInitialization.class); - private final ObjectMapper objectMapper; - private final EventTypeService eventTypeService; + private final SystemEventTypeInitializer systemEventTypeInitializer; private final FeatureToggleService featureToggleService; private String eventType; @@ -32,10 +26,10 @@ public class NakadiAuditLogInitialization { private String authValue; @Autowired - public NakadiAuditLogInitialization(final ObjectMapper objectMapper, final EventTypeService eventTypeService, - final FeatureToggleService featureToggleService) { - this.objectMapper = objectMapper; - this.eventTypeService = eventTypeService; + public NakadiAuditLogInitialization( + final SystemEventTypeInitializer systemEventTypeInitializer, + final FeatureToggleService featureToggleService) { + this.systemEventTypeInitializer = systemEventTypeInitializer; this.featureToggleService = featureToggleService; } @@ -45,28 +39,13 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep LOG.debug("Audit log collection is disabled, skip creation of audit log event type"); return; } + final Map replacements = new HashMap<>(); + replacements.put("event_type_name_placeholder", eventType); + replacements.put("owning_application_placeholder", owningApplication); + replacements.put("auth_data_type_placeholder", authDataType); + replacements.put("auth_value_placeholder", authValue); - LOG.debug("Initializing Audit log event type"); - - String auditEventTypeString = Resources - .toString(Resources.getResource("audit_event_type.json"), Charsets.UTF_8); - - auditEventTypeString = auditEventTypeString.replaceAll("event_type_name_placeholder", eventType); - auditEventTypeString = auditEventTypeString.replaceAll("owning_application_placeholder", owningApplication); - auditEventTypeString = auditEventTypeString.replaceAll("auth_data_type_placeholder", authDataType); - auditEventTypeString = auditEventTypeString.replaceAll("auth_value_placeholder", authValue); - - final TypeReference typeReference = new TypeReference() { - }; - final EventTypeBase eventType = objectMapper.readValue(auditEventTypeString, typeReference); - - try { - eventTypeService.create(eventType); - } catch (final DuplicatedEventTypeNameException e) { - LOG.debug("Audit event type already exists " + eventType.getName()); - } catch (final NakadiBaseException e) { - LOG.debug("Problem creating audit event type " + eventType.getName(), e); - } + systemEventTypeInitializer.createEventTypesFromResource("audit_event_types.json", replacements); } public String getEventType() { diff --git a/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java b/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java index d989277825..dbab95c9a7 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiKpiInitialization.java @@ -1,9 +1,5 @@ package org.zalando.nakadi.service; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.common.io.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -11,13 +7,9 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; -import org.zalando.nakadi.domain.EventTypeBase; -import org.zalando.nakadi.exceptions.runtime.DuplicatedEventTypeNameException; -import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; @Component @@ -25,8 +17,7 @@ public class NakadiKpiInitialization { private static final Logger LOG = LoggerFactory.getLogger(NakadiKpiInitialization.class); - private final ObjectMapper objectMapper; - private final EventTypeService eventTypeService; + private final SystemEventTypeInitializer systemEventTypeInitializer; private final FeatureToggleService featureToggleService; private String nakadiAccessLog; @@ -37,10 +28,10 @@ public class NakadiKpiInitialization { private String nakadiEventTypeLog; @Autowired - public NakadiKpiInitialization(final ObjectMapper objectMapper, final EventTypeService eventTypeService, - final FeatureToggleService featureToggleService) { - this.objectMapper = objectMapper; - this.eventTypeService = eventTypeService; + public NakadiKpiInitialization( + final SystemEventTypeInitializer systemEventTypeInitializer, + final FeatureToggleService featureToggleService) { + this.systemEventTypeInitializer = systemEventTypeInitializer; this.featureToggleService = featureToggleService; } @@ -50,12 +41,6 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep LOG.debug("KPI collection is disabled, skip creation of kpi event types"); return; } - - LOG.debug("Initializing KPI event types"); - - String kpiEventTypesString = Resources - .toString(Resources.getResource("kpi_event_types.json"), Charsets.UTF_8); - final Map replacements = new HashMap<>(); replacements.put("nakadi.event.type.log", nakadiEventTypeLog); replacements.put("nakadi.subscription.log", nakadiSubscriptionLog); @@ -64,24 +49,7 @@ public void onApplicationEvent(final ContextRefreshedEvent event) throws IOExcep replacements.put("nakadi.access.log", nakadiAccessLog); replacements.put("owning_application_placeholder", owningApplication); - for (final Map.Entry entry : replacements.entrySet()) { - kpiEventTypesString = kpiEventTypesString.replaceAll(entry.getKey(), entry.getValue()); - } - - final TypeReference> typeReference = new TypeReference>() { - }; - final List eventTypes = objectMapper.readValue(kpiEventTypesString, typeReference); - - - eventTypes.forEach(et -> { - try { - eventTypeService.create(et); - } catch (final DuplicatedEventTypeNameException e) { - LOG.debug("KPI event type already exists " + et.getName()); - } catch (final NakadiBaseException e) { - LOG.debug("Problem creating KPI event type " + et.getName(), e); - } - }); + systemEventTypeInitializer.createEventTypesFromResource("kpi_event_types.json", replacements); } public String getNakadiAccessLog() { diff --git a/src/main/java/org/zalando/nakadi/service/SystemEventTypeInitializer.java b/src/main/java/org/zalando/nakadi/service/SystemEventTypeInitializer.java new file mode 100644 index 0000000000..68f1e40101 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/SystemEventTypeInitializer.java @@ -0,0 +1,61 @@ +package org.zalando.nakadi.service; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.zalando.nakadi.domain.EventTypeBase; +import org.zalando.nakadi.exceptions.runtime.DuplicatedEventTypeNameException; +import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Component +public class SystemEventTypeInitializer { + private final ObjectMapper objectMapper; + private final EventTypeService eventTypeService; + private static final Logger LOG = LoggerFactory.getLogger(SystemEventTypeInitializer.class); + + @Autowired + public SystemEventTypeInitializer( + final ObjectMapper objectMapper, + final EventTypeService eventTypeService) { + this.objectMapper = objectMapper; + this.eventTypeService = eventTypeService; + } + + public void createEventTypesFromResource( + final String resourceName, + final Map nameReplacements) throws IOException { + LOG.debug("Initializing event types from {}", resourceName); + String eventTypesString = Resources.toString(Resources.getResource(resourceName), Charsets.UTF_8); + for (final Map.Entry entry : nameReplacements.entrySet()) { + eventTypesString = eventTypesString.replaceAll( + Pattern.quote(entry.getKey()), + Matcher.quoteReplacement(entry.getValue())); + } + + final TypeReference> typeReference = new TypeReference>() { + }; + final List eventTypes = objectMapper.readValue(eventTypesString, typeReference); + + eventTypes.forEach(et -> { + try { + eventTypeService.create(et, false); + } catch (final DuplicatedEventTypeNameException e) { + LOG.debug("Event type {} from {} already exists", et.getName(), resourceName); + } catch (final NakadiBaseException e) { + LOG.debug("Problem creating event type {} from {}", et.getName(), resourceName, e); + } + }); + + } +} diff --git a/src/main/resources/audit_event_type.json b/src/main/resources/audit_event_type.json deleted file mode 100644 index 5554f7f116..0000000000 --- a/src/main/resources/audit_event_type.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "name": "event_type_name_placeholder", - "owning_application": "owning_application_placeholder", - "category": "data", - "enrichment_strategies": [ - "metadata_enrichment" - ], - "partition_strategy": "hash", - "partition_key_fields": [ - "resource_id" - ], - "cleanup_policy": "delete", - "ordering_key_fields": [], - "ordering_instance_ids": [], - "schema": { - "type": "json_schema", - "schema": "{\"properties\": {\"previous_object\": { \"type\": \"object\", \"description\": \"When modifying an already existent entity, its value is captured in this field as a JSON object. So, for example, when changing an Event Type attribute, this field contains the entire state before the changes are applied\"},\"previous_text\": { \"type\": \"string\", \"description\": \"Contains the same information as the field `previous_object` but as text, since the data lake stores a flat map of all the fields in the object, destroying information about its structure. Storing the text makes sure that the original data is not lost by any transformation that the data lake may apply on the data\"},\"new_object\": { \"type\": \"object\", \"description\": \"New value submitted by the user\"},\"new_text\": { \"type\": \"string\", \"description\": \"New value submitted by the user as text, in order to preserve the structure, if needed\"},\"resource_type\": { \"x-extensible-enum\": [ \"event_type\", \"subscription\", \"timeline\", \"storage\", \"feature\", \"admins\", \"cursors\", \"blacklist_entry\" ], \"type\":\"string\" },\"resource_id\": { \"description\": \"Resource identifier. Together with `resource_type` allows for the selection of a resource\", \"type\": \"string\"},\"user\": { \"description\": \"User or service that requested the changes\", \"type\": \"string\"},\"user_hash\": { \"description\": \"User hashed\", \"type\": \"string\"}},\"required\": [\"user\", \"user_hash\", \"resource_id\", \"resource_type\"]}" - }, - "default_statistic": { - "messages_per_minute": 100, - "message_size": 100, - "read_parallelism": 10, - "write_parallelism": 10 - }, - "options": { - "retention_time": 345600000 - }, - "compatibility_mode": "forward", - "audience": "company-internal", - "authorization": { - "admins": [ - { - "data_type": "auth_data_type_placeholder", - "value": "auth_value_placeholder" - } - ], - "readers": [ - { - "data_type": "auth_data_type_placeholder", - "value": "auth_value_placeholder" - } - ], - "writers": [ - { - "data_type": "auth_data_type_placeholder", - "value": "auth_value_placeholder" - } - ] - } -} \ No newline at end of file diff --git a/src/main/resources/audit_event_types.json b/src/main/resources/audit_event_types.json new file mode 100644 index 0000000000..d5854f08a3 --- /dev/null +++ b/src/main/resources/audit_event_types.json @@ -0,0 +1,52 @@ +[ + { + "name": "event_type_name_placeholder", + "owning_application": "owning_application_placeholder", + "category": "data", + "enrichment_strategies": [ + "metadata_enrichment" + ], + "partition_strategy": "hash", + "partition_key_fields": [ + "resource_id" + ], + "cleanup_policy": "delete", + "ordering_key_fields": [], + "ordering_instance_ids": [], + "schema": { + "type": "json_schema", + "schema": "{\"properties\": {\"previous_object\": { \"type\": \"object\", \"description\": \"When modifying an already existent entity, its value is captured in this field as a JSON object. So, for example, when changing an Event Type attribute, this field contains the entire state before the changes are applied\"},\"previous_text\": { \"type\": \"string\", \"description\": \"Contains the same information as the field `previous_object` but as text, since the data lake stores a flat map of all the fields in the object, destroying information about its structure. Storing the text makes sure that the original data is not lost by any transformation that the data lake may apply on the data\"},\"new_object\": { \"type\": \"object\", \"description\": \"New value submitted by the user\"},\"new_text\": { \"type\": \"string\", \"description\": \"New value submitted by the user as text, in order to preserve the structure, if needed\"},\"resource_type\": { \"x-extensible-enum\": [ \"event_type\", \"subscription\", \"timeline\", \"storage\", \"feature\", \"admins\", \"cursors\", \"blacklist_entry\" ], \"type\":\"string\" },\"resource_id\": { \"description\": \"Resource identifier. Together with `resource_type` allows for the selection of a resource\", \"type\": \"string\"},\"user\": { \"description\": \"User or service that requested the changes\", \"type\": \"string\"},\"user_hash\": { \"description\": \"User hashed\", \"type\": \"string\"}},\"required\": [\"user\", \"user_hash\", \"resource_id\", \"resource_type\"]}" + }, + "default_statistic": { + "messages_per_minute": 100, + "message_size": 100, + "read_parallelism": 10, + "write_parallelism": 10 + }, + "options": { + "retention_time": 345600000 + }, + "compatibility_mode": "forward", + "audience": "company-internal", + "authorization": { + "admins": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "readers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ], + "writers": [ + { + "data_type": "auth_data_type_placeholder", + "value": "auth_value_placeholder" + } + ] + } + } +] \ No newline at end of file diff --git a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java index b40d4ba684..1aae7b107f 100644 --- a/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventTypeServiceTest.java @@ -135,23 +135,23 @@ public void testFeatureToggleAllowsDeleteEventTypeWithSubscriptions() throws Exc } @Test(expected = FeatureNotAvailableException.class) - public void testFeatureToggleDisableLogCompaction() throws Exception { + public void testFeatureToggleDisableLogCompaction() { final EventType eventType = buildDefaultEventType(); eventType.setCleanupPolicy(CleanupPolicy.COMPACT); when(featureToggleService.isFeatureEnabled(FeatureToggleService.Feature.DISABLE_LOG_COMPACTION)) .thenReturn(true); - eventTypeService.create(eventType); + eventTypeService.create(eventType, true); } @Test - public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception { + public void shouldRemoveEventTypeWhenTimelineCreationFails() { final EventType eventType = buildDefaultEventType(); when(timelineService.createDefaultTimeline(any(), anyInt())) .thenThrow(new TopicCreationException("Failed to create topic")); try { - eventTypeService.create(eventType); + eventTypeService.create(eventType, true); fail("should throw TopicCreationException"); } catch (final TopicCreationException e) { // expected @@ -161,9 +161,9 @@ public void shouldRemoveEventTypeWhenTimelineCreationFails() throws Exception { } @Test - public void whenEventTypeCreatedThenKPIEventSubmitted() throws Exception { + public void whenEventTypeCreatedThenKPIEventSubmitted() { final EventType et = buildDefaultEventType(); - eventTypeService.create(et); + eventTypeService.create(et, true); checkKPIEventSubmitted(nakadiKpiPublisher, KPI_ET_LOG_EVENT_TYPE, new JSONObject() .put("event_type", et.getName())