diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..36716aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +*.iml +*.class +*.jar +*dependency-reduced-pom.xml +.classpath +.project +.settings/ +target/ +devenv +*.log* +*.iml +.idea/ +*.versionsBackup +.DS_Store \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/pom.xml b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/pom.xml new file mode 100644 index 0000000..152bc0c --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/pom.xml @@ -0,0 +1,39 @@ + + + + openmessaging-spring-boot + io.openmessaging + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring-boot-autoconfigure + + + + io.openmessaging + openmessaging-spring-core + ${project.version} + + + org.springframework + spring-beans + + + org.springframework + spring-context + + + + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework.boot + spring-boot-configuration-processor + + + \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/OMSSpringBootConsts.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/OMSSpringBootConsts.java new file mode 100644 index 0000000..898c1e3 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/OMSSpringBootConsts.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot; + +/** + * This class defined constants. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class OMSSpringBootConsts { + + public static final String PREFIX = "spring.oms"; +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/BatchMessageListenerReflectAdapter.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/BatchMessageListenerReflectAdapter.java new file mode 100644 index 0000000..7460106 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/BatchMessageListenerReflectAdapter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.adapter; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.message.Message; + +import java.lang.reflect.Method; +import java.util.List; + +/** + * Adapter for the BatchMessageListener. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class BatchMessageListenerReflectAdapter implements BatchMessageListener { + + private Object instance; + private Method method; + + public BatchMessageListenerReflectAdapter(Object instance, Method method) { + this.instance = instance; + this.method = method; + this.method.setAccessible(true); + } + + @Override + public void onReceived(List batchMessage, Context context) { + try { + method.invoke(instance, batchMessage, context); + } catch (Exception e) { + throw new OMSRuntimeException(-1, e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/MessageListenerReflectAdapter.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/MessageListenerReflectAdapter.java new file mode 100644 index 0000000..ead8f38 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/adapter/MessageListenerReflectAdapter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.adapter; + +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.message.Message; + +import java.lang.reflect.Method; + +/** + * Adapter for the MessageListener. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class MessageListenerReflectAdapter implements MessageListener { + + private Object instance; + private Method method; + + public MessageListenerReflectAdapter(Object instance, Method method) { + this.instance = instance; + this.method = method; + this.method.setAccessible(true); + } + + @Override + public void onReceived(Message message, Context context) { + try { + method.invoke(instance, message, context); + } catch (Exception e) { + throw new OMSRuntimeException(-1, e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSInterceptor.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSInterceptor.java new file mode 100644 index 0000000..d5c22ba --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.annotation; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that an annotated class is a ProducerInterceptor or ConsumerInterceptor. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Component +public @interface OMSInterceptor { + + @AliasFor(annotation = Component.class) + String value() default ""; +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSMessageListener.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSMessageListener.java new file mode 100644 index 0000000..6b55cee --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSMessageListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that an annotated class or method is a MessageListener. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface OMSMessageListener { + + String queueName(); +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSTransactionStateCheckListener.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSTransactionStateCheckListener.java new file mode 100644 index 0000000..0e4d32b --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/annotation/OMSTransactionStateCheckListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.annotation; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that an annotated class is a TransactionStateCheckListener. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Component +public @interface OMSTransactionStateCheckListener { + + @AliasFor(annotation = Component.class) + String value() default ""; +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/KeyValueConverter.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/KeyValueConverter.java new file mode 100644 index 0000000..d63203b --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/KeyValueConverter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.config; + +import io.openmessaging.KeyValue; +import io.openmessaging.OMS; + +import java.util.Map; + +/** + * Convert attributes to KeyValue. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class KeyValueConverter { + + public static KeyValue convert(Map attributes) { + KeyValue result = OMS.newKeyValue(); + if (attributes != null && !attributes.isEmpty()) { + for (Map.Entry entry : attributes.entrySet()) { + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + } +} diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/OMSProperties.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/OMSProperties.java new file mode 100644 index 0000000..afd7bf2 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/config/OMSProperties.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.config; + +import io.openmessaging.spring.boot.OMSSpringBootConsts; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.Map; + +/** + * Configuration properties for OpenMessaging. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@ConfigurationProperties(prefix = OMSSpringBootConsts.PREFIX) +public class OMSProperties { + + private String url; + private Map attributes; + + public void setUrl(String url) { + this.url = url; + } + + public String getUrl() { + return url; + } + + public void setAttributes(Map attributes) { + this.attributes = attributes; + } + + public Map getAttributes() { + return attributes; + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/configuration/OMSAutoConfiguration.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/configuration/OMSAutoConfiguration.java new file mode 100644 index 0000000..e52c558 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/configuration/OMSAutoConfiguration.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.configuration; + +import io.openmessaging.KeyValue; +import io.openmessaging.producer.Producer; +import io.openmessaging.spring.boot.OMSSpringBootConsts; +import io.openmessaging.spring.boot.config.KeyValueConverter; +import io.openmessaging.spring.boot.config.OMSProperties; +import io.openmessaging.spring.boot.registrar.ConsumerRegistrar; +import io.openmessaging.spring.boot.registrar.InterceptorRegistrar; +import io.openmessaging.spring.boot.registrar.TransactionStateCheckListenerRegistrar; +import io.openmessaging.spring.support.AccessPointContainer; +import io.openmessaging.spring.support.ProducerContainer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.Assert; + +/** + * Auto-configuration for OpenMessaging. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Configuration +@EnableConfigurationProperties(OMSProperties.class) +@ConditionalOnProperty(prefix = OMSSpringBootConsts.PREFIX, name = "url") +public class OMSAutoConfiguration { + + private OMSProperties properties; + + public OMSAutoConfiguration(OMSProperties properties) { + this.properties = properties; + } + + @Bean + @ConditionalOnMissingBean(AccessPointContainer.class) + public AccessPointContainer createContainer() { + String url = properties.getUrl(); + Assert.hasText(url, "url can not be blank"); + + KeyValue attributes = KeyValueConverter.convert(properties.getAttributes()); + return new AccessPointContainer(url, attributes); + } + + @Bean + @ConditionalOnBean(AccessPointContainer.class) + @ConditionalOnProperty(prefix = OMSSpringBootConsts.PREFIX + ".producer.transaction.check", name = "enable", matchIfMissing = true, havingValue = "true") + public TransactionStateCheckListenerRegistrar createTransactionStateCheckListenerRegistrar(AccessPointContainer accessPointContainer) { + return new TransactionStateCheckListenerRegistrar(accessPointContainer); + } + + @Bean + @ConditionalOnBean(AccessPointContainer.class) + @ConditionalOnProperty(prefix = OMSSpringBootConsts.PREFIX + ".interceptor", name = "enable", matchIfMissing = true, havingValue = "true") + public InterceptorRegistrar createInterceptorRegistrar(AccessPointContainer accessPointContainer) { + return new InterceptorRegistrar(accessPointContainer); + } + + @Bean + @ConditionalOnBean(AccessPointContainer.class) + @ConditionalOnMissingBean(Producer.class) + @ConditionalOnProperty(prefix = OMSSpringBootConsts.PREFIX + ".producer", name = "enable", matchIfMissing = true, havingValue = "true") + public ProducerContainer createProducer(AccessPointContainer accessPointContainer) { + return new ProducerContainer(accessPointContainer); + } + + @Bean + @ConditionalOnBean(AccessPointContainer.class) + @ConditionalOnProperty(prefix = OMSSpringBootConsts.PREFIX + ".consumer", name = "enable", matchIfMissing = true, havingValue = "true") + public ConsumerRegistrar createConsumerRegistrar(AccessPointContainer accessPointContainer) { + return new ConsumerRegistrar(accessPointContainer); + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/ConsumerRegistrar.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/ConsumerRegistrar.java new file mode 100644 index 0000000..48eb2c8 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/ConsumerRegistrar.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.registrar; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.message.Message; +import io.openmessaging.spring.OMSSpringConsts; +import io.openmessaging.spring.boot.adapter.BatchMessageListenerReflectAdapter; +import io.openmessaging.spring.boot.adapter.MessageListenerReflectAdapter; +import io.openmessaging.spring.boot.annotation.OMSMessageListener; +import io.openmessaging.spring.support.AccessPointContainer; +import io.openmessaging.spring.support.ConsumerContainer; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.core.annotation.AnnotationUtils; + +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.LinkedList; +import java.util.List; + +/** + * Register {@link OMSMessageListener} to BeanDefinitionRegistry. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class ConsumerRegistrar implements BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton { + + private static final String CONSUMER_CONTAINER_ID = "%s.consumer.container.%s"; + + private int consumerSequence = 0; + + private AccessPointContainer accessPointContainer; + private BeanFactory beanFactory; + + private List consumerIds = new LinkedList<>(); + + public ConsumerRegistrar(AccessPointContainer accessPointContainer) { + this.accessPointContainer = accessPointContainer; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + + @Override + public void afterSingletonsInstantiated() { + for (String consumerId : consumerIds) { + beanFactory.getBean(consumerId); + } + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + Class beanClass = AopUtils.getTargetClass(bean); + OMSMessageListener classMessageListenerAnnotation = findClassMessageListenerAnnotation(beanClass); + + if (classMessageListenerAnnotation != null) { + registerListener(classMessageListenerAnnotation, getMessageListener(classMessageListenerAnnotation, bean)); + } else { + for (Method method : beanClass.getDeclaredMethods()) { + OMSMessageListener methodMessageListenerAnnotation = findMethodMessageListenerAnnotation(method); + if (methodMessageListenerAnnotation == null) { + continue; + } + Object messageListener = getMethodMessageListener(classMessageListenerAnnotation, bean, method); + registerListener(methodMessageListenerAnnotation, messageListener); + } + } + + return bean; + } + + protected OMSMessageListener findClassMessageListenerAnnotation(Class beanClass) { + return AnnotationUtils.findAnnotation(beanClass, OMSMessageListener.class); + } + + protected OMSMessageListener findMethodMessageListenerAnnotation(Method method) { + return AnnotationUtils.findAnnotation(method, OMSMessageListener.class); + } + + protected Object getMessageListener(OMSMessageListener messageListenerAnnotation, Object bean) { + if (!(bean instanceof MessageListener) && !(bean instanceof BatchMessageListener)) { + throw new IllegalArgumentException("listener type error, need MessageListener or BatchMessageListener"); + } + return bean; + } + + protected Object getMethodMessageListener(OMSMessageListener messageListenerAnnotation, Object bean, Method method) { + Class[] parameterTypes = method.getParameterTypes(); + Type[] genericParameterTypes = method.getGenericParameterTypes(); + + boolean isListener = (parameterTypes.length == 2 + && parameterTypes[0].equals(Message.class) + && parameterTypes[1].equals(MessageListener.Context.class)); + + boolean isBatchListener = (parameterTypes.length == 2 + && genericParameterTypes[0].getTypeName().equals(String.format("%s<%s>", List.class.getName(), Message.class.getName())) + && parameterTypes[1].equals(BatchMessageListener.Context.class)); + + if (!isListener && !isBatchListener) { + throw new IllegalArgumentException("listener parameters error, need MessageListener.onReceived or BatchMessageListener.onReceived"); + } + + if (isListener) { + return new MessageListenerReflectAdapter(bean, method); + } else { + return new BatchMessageListenerReflectAdapter(bean, method); + } + } + + protected void registerListener(OMSMessageListener omsMessageListener, Object listenerBean) { + BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) beanFactory; + String id = String.format(CONSUMER_CONTAINER_ID, OMSSpringConsts.BEAN_ID_PREFIX, consumerSequence++); + + BeanDefinition consumerBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(ConsumerContainer.class) + .addConstructorArgValue(omsMessageListener.queueName()) + .addConstructorArgValue(accessPointContainer) + .addConstructorArgValue(listenerBean) + .getBeanDefinition(); + + beanDefinitionRegistry.registerBeanDefinition(id, consumerBeanDefinition); + consumerIds.add(id); + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/InterceptorRegistrar.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/InterceptorRegistrar.java new file mode 100644 index 0000000..77fb017 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/InterceptorRegistrar.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.registrar; + +import io.openmessaging.interceptor.ConsumerInterceptor; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.spring.boot.annotation.OMSInterceptor; +import io.openmessaging.spring.support.AccessPointContainer; +import io.openmessaging.spring.support.InterceptorContainer; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.support.GenericApplicationContext; + +import java.util.Map; + +/** + * Register {@link OMSInterceptor} to AccessPointContainer. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class InterceptorRegistrar implements ApplicationContextAware, InitializingBean, BeanPostProcessor { + + private AccessPointContainer accessPointContainer; + private GenericApplicationContext applicationContext; + + public InterceptorRegistrar(AccessPointContainer accessPointContainer) { + this.accessPointContainer = accessPointContainer; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (GenericApplicationContext) applicationContext; + } + + @Override + public void afterPropertiesSet() { + Map interceptorMap = applicationContext.getBeansWithAnnotation(OMSInterceptor.class); + if (interceptorMap == null || interceptorMap.isEmpty()) { + return; + } + for (Map.Entry entry : interceptorMap.entrySet()) { + registerInterceptor(entry.getKey(), entry.getValue()); + } + } + + protected void registerInterceptor(String beanName, Object bean) { + if (!(bean instanceof ProducerInterceptor) && !(bean instanceof ConsumerInterceptor)) { + throw new IllegalArgumentException(String.format("%s (%s) is not ProducerInterceptor or ConsumerInterceptor", beanName, bean.getClass())); + } + + OMSInterceptor interceptorAnnotation = bean.getClass().getAnnotation(OMSInterceptor.class); + InterceptorContainer interceptorContainer = new InterceptorContainer(bean); + accessPointContainer.addInterceptorContainer(interceptorContainer); + } + +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/TransactionStateCheckListenerRegistrar.java b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/TransactionStateCheckListenerRegistrar.java new file mode 100644 index 0000000..08a07d6 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/java/io/openmessaging/spring/boot/registrar/TransactionStateCheckListenerRegistrar.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.boot.registrar; + +import io.openmessaging.producer.TransactionStateCheckListener; +import io.openmessaging.spring.boot.annotation.OMSTransactionStateCheckListener; +import io.openmessaging.spring.support.AccessPointContainer; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.support.GenericApplicationContext; + +import java.util.Map; + +/** + * Register {@link OMSTransactionStateCheckListener} to AccessPointContainer. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class TransactionStateCheckListenerRegistrar implements ApplicationContextAware, InitializingBean, BeanPostProcessor { + + private AccessPointContainer accessPointContainer; + private GenericApplicationContext applicationContext; + + public TransactionStateCheckListenerRegistrar(AccessPointContainer accessPointContainer) { + this.accessPointContainer = accessPointContainer; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = (GenericApplicationContext) applicationContext; + } + + @Override + public void afterPropertiesSet() { + Map listenerMap = applicationContext.getBeansWithAnnotation(OMSTransactionStateCheckListener.class); + if (listenerMap == null || listenerMap.isEmpty()) { + return; + } + for (Map.Entry entry : listenerMap.entrySet()) { + registerListener(entry.getKey(), entry.getValue()); + } + } + + protected void registerListener(String beanName, Object bean) { + if (!(bean instanceof TransactionStateCheckListener)) { + throw new IllegalArgumentException(String.format("%s (%s) is not TransactionStateCheckListener", beanName, bean.getClass())); + } + if (accessPointContainer.getTransactionStateCheckListener() != null) { + throw new IllegalArgumentException(String.format("transactionStateCheckListener already exists, instance: %s", accessPointContainer.getTransactionStateCheckListener())); + } + + OMSTransactionStateCheckListener listenerAnnotation = bean.getClass().getAnnotation(OMSTransactionStateCheckListener.class); + accessPointContainer.setTransactionStateCheckListener((TransactionStateCheckListener) bean); + } +} \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..9058c2d --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +io.openmessaging.spring.boot.configuration.OMSAutoConfiguration \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-starter/pom.xml b/openmessaging-spring-boot/openmessaging-spring-boot-starter/pom.xml new file mode 100644 index 0000000..25680b9 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-starter/pom.xml @@ -0,0 +1,25 @@ + + + + openmessaging-spring-boot + io.openmessaging + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring-boot-starter + + + + io.openmessaging + openmessaging-spring-boot-autoconfigure + ${project.version} + + + org.springframework.boot + spring-boot-starter + + + \ No newline at end of file diff --git a/openmessaging-spring-boot/openmessaging-spring-boot-starter/src/main/resources/META-INF/spring.provides b/openmessaging-spring-boot/openmessaging-spring-boot-starter/src/main/resources/META-INF/spring.provides new file mode 100644 index 0000000..bdb01f7 --- /dev/null +++ b/openmessaging-spring-boot/openmessaging-spring-boot-starter/src/main/resources/META-INF/spring.provides @@ -0,0 +1 @@ +provides: openmessaging-spring-boot-autoconfigure \ No newline at end of file diff --git a/openmessaging-spring-boot/pom.xml b/openmessaging-spring-boot/pom.xml new file mode 100644 index 0000000..70a98ff --- /dev/null +++ b/openmessaging-spring-boot/pom.xml @@ -0,0 +1,18 @@ + + + + openmessaging-spring + io.openmessaging + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring-boot + pom + + openmessaging-spring-boot-starter + openmessaging-spring-boot-autoconfigure + + \ No newline at end of file diff --git a/openmessaging-spring-core/pom.xml b/openmessaging-spring-core/pom.xml new file mode 100644 index 0000000..c4e7a7b --- /dev/null +++ b/openmessaging-spring-core/pom.xml @@ -0,0 +1,24 @@ + + + + openmessaging-spring + io.openmessaging + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring-core + + + + org.springframework + spring-beans + + + org.springframework + spring-context + + + \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/OMSSpringConsts.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/OMSSpringConsts.java new file mode 100644 index 0000000..d34b786 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/OMSSpringConsts.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring; + +/** + * This class defined constants. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class OMSSpringConsts { + + public static final String BEAN_ID_PREFIX = "oms"; + + public static final String DEFAULT_ACCESS_POINT_ID = BEAN_ID_PREFIX + ".defaultAccessPoint"; +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/AccessPointBeanDefinitionParser.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/AccessPointBeanDefinitionParser.java new file mode 100644 index 0000000..f6fd3e1 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/AccessPointBeanDefinitionParser.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.config; + +import io.openmessaging.KeyValue; +import io.openmessaging.OMS; +import io.openmessaging.spring.OMSSpringConsts; +import io.openmessaging.spring.support.AccessPointContainer; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.BeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.springframework.util.xml.DomUtils; +import org.w3c.dom.Element; + +import java.util.List; + +/** + * Parser for the access-point element. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class AccessPointBeanDefinitionParser implements BeanDefinitionParser { + + private static final String ATTRIBUTE_ID = "id"; + private static final String ATTRIBUTE_URL = "url"; + private static final String ELEMENT_ATTRIBUTE = "attribute"; + private static final String ELEMENT_ATTRIBUTE_KEY = "key"; + private static final String ELEMENT_ATTRIBUTE_VALUE = "value"; + + @Override + public BeanDefinition parse(Element element, ParserContext parserContext) { + String id = element.getAttribute(ATTRIBUTE_ID); + String url = element.getAttribute(ATTRIBUTE_URL); + + Assert.hasText(url, String.format("%s can not be blank", ATTRIBUTE_URL)); + + if (!StringUtils.hasText(id)) { + id = OMSSpringConsts.DEFAULT_ACCESS_POINT_ID; + } + + KeyValue attributes = parseAttributes(element, parserContext); + BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(AccessPointContainer.class) + .addConstructorArgValue(id) + .addConstructorArgValue(url) + .addConstructorArgValue(attributes); + + AbstractBeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition(); + parserContext.getRegistry().registerBeanDefinition(id, beanDefinition); + return beanDefinition; + } + + protected KeyValue parseAttributes(Element element, ParserContext parserContext) { + KeyValue attributes = OMS.newKeyValue(); + List attributeElements = DomUtils.getChildElementsByTagName(element, ELEMENT_ATTRIBUTE); + + for (Element attributeElement : attributeElements) { + String key = attributeElement.getAttribute(ELEMENT_ATTRIBUTE_KEY); + String value = attributeElement.getAttribute(ELEMENT_ATTRIBUTE_VALUE); + attributes.put(key, value); + } + return attributes; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ConsumerBeanDefinitionParser.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ConsumerBeanDefinitionParser.java new file mode 100644 index 0000000..639b504 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ConsumerBeanDefinitionParser.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.config; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.spring.OMSSpringConsts; +import io.openmessaging.spring.support.ConsumerContainer; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.BeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.w3c.dom.Element; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Parser for the consumer element. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class ConsumerBeanDefinitionParser implements BeanDefinitionParser { + + private static final String CONSUMER_CONTAINER_ID = "%s.consumer.container.%s"; + private static final String CONSUMER_ID = "%s.consumer.%s"; + + private static final String ATTRIBUTE_ID = "id"; + private static final String ATTRIBUTE_ACCESS_POINT = "access-point"; + private static final String ATTRIBUTE_QUEUE_NAME = "queue-name"; + private static final String ATTRIBUTE_LISTENER_CLASS_NAME = "listener"; + private static final String ATTRIBUTE_LISTENER_REF = "listener-ref"; + + private final AtomicInteger consumerSequence = new AtomicInteger(); + + @Override + public BeanDefinition parse(Element element, ParserContext parserContext) { + String id = element.getAttribute(ATTRIBUTE_ID); + String accessPoint = element.getAttribute(ATTRIBUTE_ACCESS_POINT); + String queueName = element.getAttribute(ATTRIBUTE_QUEUE_NAME); + + Assert.hasText(queueName, String.format("%s can not be blank", ATTRIBUTE_QUEUE_NAME)); + + if (!StringUtils.hasText(id)) { + id = String.format(CONSUMER_CONTAINER_ID, OMSSpringConsts.BEAN_ID_PREFIX, consumerSequence.getAndIncrement()); + } + if (!StringUtils.hasText(accessPoint)) { + accessPoint = OMSSpringConsts.DEFAULT_ACCESS_POINT_ID; + } + + String listenerBeanId = parseListener(element, parserContext); + + BeanDefinitionBuilder consumerBuilder = BeanDefinitionBuilder.rootBeanDefinition(ConsumerContainer.class) + .addConstructorArgValue(queueName) + .addConstructorArgReference(accessPoint) + .addConstructorArgReference(listenerBeanId); + + parserContext.getRegistry().registerBeanDefinition(id, consumerBuilder.getBeanDefinition()); + return consumerBuilder.getBeanDefinition(); + } + + protected String parseListener(Element element, ParserContext parserContext) { + String listenerClassName = element.getAttribute(ATTRIBUTE_LISTENER_CLASS_NAME); + String listenerRef = element.getAttribute(ATTRIBUTE_LISTENER_REF); + + Assert.isTrue(StringUtils.hasText(listenerClassName) || StringUtils.hasText(listenerRef), "listener must exist"); + + String listenerBeanId = listenerRef; + BeanDefinition listenerBeanDefinition = null; + Class listenerClass = null; + + if (StringUtils.hasText(listenerRef)) { + listenerBeanDefinition = parserContext.getRegistry().getBeanDefinition(listenerRef); + Assert.notNull(listenerBeanDefinition, String.format("listener not found, listenerName: %s", listenerRef)); + + try { + listenerClass = Class.forName(listenerBeanDefinition.getBeanClassName()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("listener class not found, className: %s", + listenerBeanDefinition.getBeanClassName()), e); + } + } else { + try { + listenerClass = Class.forName(listenerClassName); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("listener class not found, className %s", listenerClassName), e); + } + listenerBeanId = String.format(CONSUMER_ID, OMSSpringConsts.BEAN_ID_PREFIX, consumerSequence.getAndIncrement()); + listenerBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(listenerClass).getBeanDefinition(); + parserContext.getRegistry().registerBeanDefinition(listenerBeanId, listenerBeanDefinition); + } + + if (!MessageListener.class.isAssignableFrom(listenerClass) && !BatchMessageListener.class.isAssignableFrom(listenerClass)) { + throw new IllegalArgumentException(String.format("%s type error, need MessageListener or BatchMessageListener", listenerClassName)); + } + + return listenerBeanId; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/InterceptorBeanDefinitionParser.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/InterceptorBeanDefinitionParser.java new file mode 100644 index 0000000..9cda715 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/InterceptorBeanDefinitionParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.config; + +import io.openmessaging.interceptor.ConsumerInterceptor; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.spring.OMSSpringConsts; +import io.openmessaging.spring.support.InterceptorContainer; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.BeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.w3c.dom.Element; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Parser for the interceptor element. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class InterceptorBeanDefinitionParser implements BeanDefinitionParser { + + private static final String INTERCEPTOR_CONTAINER_ID = "%s.interceptor.container.%s"; + private static final String INTERCEPTOR_ID = "%s.interceptor.%s"; + + private static final String ATTRIBUTE_ACCESS_POINT = "access-point"; + private static final String ATTRIBUTE_INTERCEPTOR_CLASS_NAME = "class"; + private static final String ATTRIBUTE_INTERCEPTOR_REF = "ref"; + + private final AtomicInteger interceptorSequence = new AtomicInteger(); + + @Override + public BeanDefinition parse(Element element, ParserContext parserContext) { + String id = String.format(INTERCEPTOR_CONTAINER_ID, OMSSpringConsts.BEAN_ID_PREFIX, interceptorSequence.getAndIncrement()); + String accessPoint = element.getAttribute(ATTRIBUTE_ACCESS_POINT); + + if (!StringUtils.hasText(accessPoint)) { + accessPoint = OMSSpringConsts.DEFAULT_ACCESS_POINT_ID; + } + + String interceptorBeanId = parseInterceptor(element, parserContext); + + BeanDefinitionBuilder consumerBuilder = BeanDefinitionBuilder.rootBeanDefinition(InterceptorContainer.class) + .addConstructorArgValue(accessPoint) + .addConstructorArgReference(interceptorBeanId); + + parserContext.getRegistry().registerBeanDefinition(id, consumerBuilder.getBeanDefinition()); + return consumerBuilder.getBeanDefinition(); + } + + protected String parseInterceptor(Element element, ParserContext parserContext) { + String interceptorClassName = element.getAttribute(ATTRIBUTE_INTERCEPTOR_CLASS_NAME); + String interceptorRef = element.getAttribute(ATTRIBUTE_INTERCEPTOR_REF); + + Assert.isTrue(StringUtils.hasText(interceptorClassName) || StringUtils.hasText(interceptorRef), "interceptor must exist"); + + String interceptorBeanId = interceptorRef; + BeanDefinition interceptorBeanDefinition = null; + Class interceptorClass = null; + + if (StringUtils.hasText(interceptorRef)) { + interceptorBeanDefinition = parserContext.getRegistry().getBeanDefinition(interceptorRef); + Assert.notNull(interceptorBeanDefinition, String.format("interceptor not found, interceptorName: %s", interceptorRef)); + + try { + interceptorClass = Class.forName(interceptorBeanDefinition.getBeanClassName()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("interceptor class not found, className: %s", + interceptorBeanDefinition.getBeanClassName()), e); + } + } else { + try { + interceptorClass = Class.forName(interceptorClassName); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("interceptor class not found, className: %s", interceptorClassName), e); + } + interceptorBeanId = String.format(INTERCEPTOR_ID, OMSSpringConsts.BEAN_ID_PREFIX, interceptorSequence.getAndIncrement()); + interceptorBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(interceptorClass).getBeanDefinition(); + parserContext.getRegistry().registerBeanDefinition(interceptorBeanId, interceptorBeanDefinition); + } + + if (!ProducerInterceptor.class.isAssignableFrom(interceptorClass) && !ConsumerInterceptor.class.isAssignableFrom(interceptorClass)) { + throw new IllegalArgumentException(String.format("%s type error, need ProducerInterceptor or ConsumerInterceptor", interceptorClassName)); + } + + return interceptorBeanId; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/OMSNamespaceHandler.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/OMSNamespaceHandler.java new file mode 100644 index 0000000..151ba10 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/OMSNamespaceHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.config; + +import org.springframework.beans.factory.xml.NamespaceHandlerSupport; + +/** + * {@code OMSNamespaceHandler} for the {@code oms} namespace. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class OMSNamespaceHandler extends NamespaceHandlerSupport { + + @Override + public void init() { + registerBeanDefinitionParser("access-point", new AccessPointBeanDefinitionParser()); + registerBeanDefinitionParser("producer", new ProducerBeanDefinitionParser()); + registerBeanDefinitionParser("consumer", new ConsumerBeanDefinitionParser()); + registerBeanDefinitionParser("interceptor", new InterceptorBeanDefinitionParser()); + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ProducerBeanDefinitionParser.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ProducerBeanDefinitionParser.java new file mode 100644 index 0000000..723cd48 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/config/ProducerBeanDefinitionParser.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.config; + +import io.openmessaging.producer.TransactionStateCheckListener; +import io.openmessaging.spring.OMSSpringConsts; +import io.openmessaging.spring.support.ProducerContainer; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import org.w3c.dom.Element; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Parser for the producer element. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class ProducerBeanDefinitionParser extends AbstractBeanDefinitionParser { + + private static final String PRODUCER_TRANSACTION_LISTENER_ID = "%s.transaction.listener.%s"; + + private static final String ATTRIBUTE_ID = "id"; + private static final String ATTRIBUTE_ACCESS_POINT = "access-point"; + private static final String ATTRIBUTE_LISTENER_CLASS_NAME = "listener"; + private static final String ATTRIBUTE_LISTENER_REF = "listener-ref"; + + private final AtomicInteger producerSequence = new AtomicInteger(); + + @Override + protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { + String id = element.getAttribute(ATTRIBUTE_ID); + String accessPoint = element.getAttribute(ATTRIBUTE_ACCESS_POINT); + + Assert.hasText(id, String.format("%s can not be blank", ATTRIBUTE_ID)); + + if (!StringUtils.hasText(accessPoint)) { + accessPoint = OMSSpringConsts.DEFAULT_ACCESS_POINT_ID; + } + + String listenerBeanId = parseListener(element, parserContext); + + BeanDefinitionBuilder producerBeanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(ProducerContainer.class) + .addConstructorArgReference(accessPoint); + + if (listenerBeanId != null) { + producerBeanDefinitionBuilder.addConstructorArgReference(listenerBeanId); + } + + return producerBeanDefinitionBuilder.getBeanDefinition(); + } + + protected String parseListener(Element element, ParserContext parserContext) { + String listenerClassName = element.getAttribute(ATTRIBUTE_LISTENER_CLASS_NAME); + String listenerRef = element.getAttribute(ATTRIBUTE_LISTENER_REF); + + if (!StringUtils.hasText(listenerClassName) && !StringUtils.hasText(listenerRef)) { + return null; + } + + String listenerBeanId = listenerRef; + BeanDefinition listenerBeanDefinition = null; + Class listenerClass = null; + + if (StringUtils.hasText(listenerRef)) { + listenerBeanDefinition = parserContext.getRegistry().getBeanDefinition(listenerRef); + Assert.notNull(listenerBeanDefinition, String.format("listener not found, listenerName: %s", listenerRef)); + + try { + listenerClass = Class.forName(listenerBeanDefinition.getBeanClassName()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("transactionCheckListener class not found, className: %s", + listenerBeanDefinition.getBeanClassName()), e); + } + } else { + try { + listenerClass = Class.forName(listenerClassName); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(String.format("transactionCheckListener class not found, className: %s", listenerClassName), e); + } + listenerBeanId = String.format(PRODUCER_TRANSACTION_LISTENER_ID, OMSSpringConsts.BEAN_ID_PREFIX, producerSequence.getAndIncrement()); + listenerBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(listenerClass).getBeanDefinition(); + parserContext.getRegistry().registerBeanDefinition(listenerBeanId, listenerBeanDefinition); + } + + if (!TransactionStateCheckListener.class.isAssignableFrom(listenerClass)) { + throw new IllegalArgumentException(String.format("%s type error, need TransactionStateCheckListener", listenerClassName)); + } + + return listenerBeanId; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/AccessPointContainer.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/AccessPointContainer.java new file mode 100644 index 0000000..dd73f51 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/AccessPointContainer.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.support; + +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.OMS; +import io.openmessaging.interceptor.ConsumerInterceptor; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.producer.TransactionStateCheckListener; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.ListableBeanFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Container for the accessPoint. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class AccessPointContainer implements BeanFactoryAware, InitializingBean { + + private String id; + private String url; + private KeyValue attributes; + + private MessagingAccessPoint accessPoint; + private ListableBeanFactory beanFactory; + + private List interceptorContainers = new LinkedList<>(); + private TransactionStateCheckListener transactionStateCheckListener; + + public AccessPointContainer(String url, KeyValue attributes) { + this.url = url; + this.attributes = attributes; + } + + public AccessPointContainer(String id, String url, KeyValue attributes) { + this.id = id; + this.url = url; + this.attributes = attributes; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = (ListableBeanFactory) beanFactory; + } + + @Override + public void afterPropertiesSet() { + accessPoint = OMS.getMessagingAccessPoint(url, attributes); + } + + public List getProducerInterceptors() { + List result = new LinkedList<>(); + List interceptorContainers = new LinkedList<>(); + Map factoryInterceptorContainers = beanFactory.getBeansOfType(InterceptorContainer.class); + + if (!factoryInterceptorContainers.isEmpty()) { + interceptorContainers.addAll(factoryInterceptorContainers.values()); + } + + if (!this.interceptorContainers.isEmpty()) { + interceptorContainers.addAll(this.interceptorContainers); + } + + for (InterceptorContainer interceptorContainer : interceptorContainers) { + if (((id == null && interceptorContainer.getAccessPoint() == null) + || (id != null && id.equals(interceptorContainer.getAccessPoint()))) + && (interceptorContainer.getInterceptor() instanceof ProducerInterceptor)) { + result.add((ProducerInterceptor) interceptorContainer.getInterceptor()); + } + } + return result; + } + + public List getConsumerInterceptors() { + List result = new LinkedList<>(); + List interceptorContainers = new LinkedList<>(); + Map factoryInterceptorContainers = beanFactory.getBeansOfType(InterceptorContainer.class); + + if (!factoryInterceptorContainers.isEmpty()) { + interceptorContainers.addAll(factoryInterceptorContainers.values()); + } + + if (!this.interceptorContainers.isEmpty()) { + interceptorContainers.addAll(this.interceptorContainers); + } + + for (InterceptorContainer interceptorContainer : interceptorContainers) { + if (((id == null && interceptorContainer.getAccessPoint() == null) + || (id != null && id.equals(interceptorContainer.getAccessPoint()))) + && (interceptorContainer.getInterceptor() instanceof ConsumerInterceptor)) { + result.add((ConsumerInterceptor) interceptorContainer.getInterceptor()); + } + } + return result; + } + + public void setTransactionStateCheckListener(TransactionStateCheckListener transactionStateCheckListener) { + this.transactionStateCheckListener = transactionStateCheckListener; + } + + public TransactionStateCheckListener getTransactionStateCheckListener() { + return transactionStateCheckListener; + } + + public void addInterceptorContainer(InterceptorContainer interceptorContainer) { + interceptorContainers.add(interceptorContainer); + } + + public MessagingAccessPoint getAccessPoint() { + return accessPoint; + } + + public void setId(String id) { + this.id = id; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setAttributes(KeyValue attributes) { + this.attributes = attributes; + } + + public String getId() { + return id; + } + + public String getUrl() { + return url; + } + + public KeyValue getAttributes() { + return attributes; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ConsumerContainer.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ConsumerContainer.java new file mode 100644 index 0000000..87f3921 --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ConsumerContainer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.support; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.interceptor.ConsumerInterceptor; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.InitializingBean; + +/** + * Container for the consumer. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class ConsumerContainer implements InitializingBean, DisposableBean, FactoryBean { + + private String queueName; + private AccessPointContainer accessPointContainer; + private Object messageListener; + + private Consumer consumer; + + public ConsumerContainer(String queueName, AccessPointContainer accessPointContainer, Object messageListener) { + this.queueName = queueName; + this.accessPointContainer = accessPointContainer; + this.messageListener = messageListener; + } + + @Override + public void afterPropertiesSet() { + Consumer consumer = accessPointContainer.getAccessPoint().createConsumer(); + + if (messageListener instanceof MessageListener) { + consumer.bindQueue(queueName, (MessageListener) messageListener); + } else if (messageListener instanceof BatchMessageListener) { + consumer.bindQueue(queueName, (BatchMessageListener) messageListener); + } else { + throw new IllegalArgumentException("listener type error, need MessageListener or BatchMessageListener"); + } + + for (ConsumerInterceptor interceptor : accessPointContainer.getConsumerInterceptors()) { + consumer.addInterceptor(interceptor); + } + + consumer.start(); + this.consumer = consumer; + } + + @Override + public void destroy() { + if (consumer != null) { + consumer.stop(); + } + } + + @Override + public Object getObject() throws Exception { + return consumer; + } + + @Override + public Class getObjectType() { + return Consumer.class; + } + + @Override + public boolean isSingleton() { + return true; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/InterceptorContainer.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/InterceptorContainer.java new file mode 100644 index 0000000..495a25f --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/InterceptorContainer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.support; + +/** + * Container for the interceptor. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class InterceptorContainer { + + private String accessPoint; + private Object interceptor; + + public InterceptorContainer(Object interceptor) { + this.interceptor = interceptor; + } + + public InterceptorContainer(String accessPoint, Object interceptor) { + this.accessPoint = accessPoint; + this.interceptor = interceptor; + } + + public String getAccessPoint() { + return accessPoint; + } + + public Object getInterceptor() { + return interceptor; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ProducerContainer.java b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ProducerContainer.java new file mode 100644 index 0000000..a5bbf4c --- /dev/null +++ b/openmessaging-spring-core/src/main/java/io/openmessaging/spring/support/ProducerContainer.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.spring.support; + +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.TransactionStateCheckListener; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.InitializingBean; + +/** + * Container for the producer. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class ProducerContainer implements InitializingBean, DisposableBean, FactoryBean { + + private AccessPointContainer accessPointContainer; + private Producer producer; + private TransactionStateCheckListener transactionStateCheckListener; + + public ProducerContainer(AccessPointContainer accessPointContainer) { + this(accessPointContainer, null); + } + + public ProducerContainer(AccessPointContainer accessPointContainer, TransactionStateCheckListener transactionStateCheckListener) { + this.accessPointContainer = accessPointContainer; + this.transactionStateCheckListener = transactionStateCheckListener; + } + + @Override + public void afterPropertiesSet() { + if (transactionStateCheckListener == null) { + transactionStateCheckListener = accessPointContainer.getTransactionStateCheckListener(); + } + if (transactionStateCheckListener == null) { + producer = accessPointContainer.getAccessPoint().createProducer(); + } else { + producer = accessPointContainer.getAccessPoint().createProducer(transactionStateCheckListener); + } + for (ProducerInterceptor interceptor : accessPointContainer.getProducerInterceptors()) { + producer.addInterceptor(interceptor); + } + producer.start(); + } + + @Override + public void destroy() { + if (producer != null) { + producer.stop(); + } + } + + @Override + public Class getObjectType() { + return Producer.class; + } + + @Override + public Object getObject() { + return producer; + } + + @Override + public boolean isSingleton() { + return true; + } +} \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/resources/META-INF/spring.handlers b/openmessaging-spring-core/src/main/resources/META-INF/spring.handlers new file mode 100644 index 0000000..36ddd85 --- /dev/null +++ b/openmessaging-spring-core/src/main/resources/META-INF/spring.handlers @@ -0,0 +1 @@ +http\://openmessaging.io/schema=io.openmessaging.spring.config.OMSNamespaceHandler \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/resources/META-INF/spring.schemas b/openmessaging-spring-core/src/main/resources/META-INF/spring.schemas new file mode 100644 index 0000000..a5e9579 --- /dev/null +++ b/openmessaging-spring-core/src/main/resources/META-INF/spring.schemas @@ -0,0 +1 @@ +http\://openmessaging.io/schema/oms.xsd=/io/openmessaging/schema/oms.xsd \ No newline at end of file diff --git a/openmessaging-spring-core/src/main/resources/io/openmessaging/schema/oms.xsd b/openmessaging-spring-core/src/main/resources/io/openmessaging/schema/oms.xsd new file mode 100644 index 0000000..199195e --- /dev/null +++ b/openmessaging-spring-core/src/main/resources/io/openmessaging/schema/oms.xsd @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/openmessaging-spring-samples/pom.xml b/openmessaging-spring-samples/pom.xml new file mode 100644 index 0000000..5933839 --- /dev/null +++ b/openmessaging-spring-samples/pom.xml @@ -0,0 +1,32 @@ + + + + openmessaging-spring + io.openmessaging + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring-samples + + + + io.openmessaging + openmessaging-spring-core + ${project.version} + + + io.openmessaging + openmessaging-spring-boot-starter + ${project.version} + + + junit + junit + 4.11 + test + + + \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleConsumerInterceptor.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleConsumerInterceptor.java new file mode 100644 index 0000000..b690017 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleConsumerInterceptor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.spring; + +import io.openmessaging.interceptor.ConsumerInterceptor; +import io.openmessaging.interceptor.Context; +import io.openmessaging.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleConsumerInterceptor implements ConsumerInterceptor { + + protected final Logger logger = LoggerFactory.getLogger(SimpleConsumerInterceptor.class); + + @Override + public void preReceive(Message message, Context attributes) { + logger.info("preReceive, message: {}", message); + } + + @Override + public void postReceive(Message message, Context attributes) { + logger.info("postReceive, message: {}", message); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleMessageListener.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleMessageListener.java new file mode 100644 index 0000000..ea69eaf --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleMessageListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.spring; + +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleMessageListener implements MessageListener { + + protected final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class); + + @Override + public void onReceived(Message message, Context context) { + logger.info("onReceived, message: {}", message); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleProducerInterceptor.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleProducerInterceptor.java new file mode 100644 index 0000000..5157845 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SimpleProducerInterceptor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.spring; + +import io.openmessaging.interceptor.Context; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleProducerInterceptor implements ProducerInterceptor { + + protected final Logger logger = LoggerFactory.getLogger(SimpleProducerInterceptor.class); + + @Override + public void preSend(Message message, Context attributes) { + logger.info("preSend, message: {}", message); + } + + @Override + public void postSend(Message message, Context attributes) { + logger.info("postSend, message: {}", message); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SpringMain.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SpringMain.java new file mode 100644 index 0000000..dd001b7 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/spring/SpringMain.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.spring; + +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class SpringMain { + + public static void main(String[] args) { + new ClassPathXmlApplicationContext("spring-sample.xml"); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleConsumerInterceptor.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleConsumerInterceptor.java new file mode 100644 index 0000000..a070e3c --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleConsumerInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.interceptor.ConsumerInterceptor; +import io.openmessaging.interceptor.Context; +import io.openmessaging.message.Message; +import io.openmessaging.spring.boot.annotation.OMSInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@OMSInterceptor +public class SimpleConsumerInterceptor implements ConsumerInterceptor { + + protected final Logger logger = LoggerFactory.getLogger(SimpleConsumerInterceptor.class); + + @Override + public void preReceive(Message message, Context attributes) { + logger.info("preReceive, message: {}", message); + } + + @Override + public void postReceive(Message message, Context attributes) { + logger.info("postReceive, message: {}", message); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMessageListener.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMessageListener.java new file mode 100644 index 0000000..5fd7807 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMessageListener.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.message.Message; +import io.openmessaging.spring.boot.annotation.OMSMessageListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +@OMSMessageListener(queueName = "test_topic_2") +public class SimpleMessageListener implements MessageListener { + + protected final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class); + + @Override + public void onReceived(Message message, MessageListener.Context context) { + logger.info("receive, message: {}", message); + context.ack(); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMethodMessageListener.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMethodMessageListener.java new file mode 100644 index 0000000..3221c51 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleMethodMessageListener.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.message.Message; +import io.openmessaging.spring.boot.annotation.OMSMessageListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class SimpleMethodMessageListener { + + protected final Logger logger = LoggerFactory.getLogger(SimpleMethodMessageListener.class); + + @OMSMessageListener(queueName = "test_topic_0") + public void onReceived1(Message message, MessageListener.Context context) { + logger.info("receive, message: {}", message); + context.ack(); + } + + @OMSMessageListener(queueName = "test_topic_1") + public void onReceived2(List messages, BatchMessageListener.Context context) { + for (Message message : messages) { + logger.info("receive, message: {}", message); + } + context.ack(); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleProducerInterceptor.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleProducerInterceptor.java new file mode 100644 index 0000000..18a0153 --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleProducerInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.interceptor.Context; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; +import io.openmessaging.spring.boot.annotation.OMSInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@OMSInterceptor +public class SimpleProducerInterceptor implements ProducerInterceptor { + + protected final Logger logger = LoggerFactory.getLogger(SimpleProducerInterceptor.class); + + @Override + public void preSend(Message message, Context attributes) { + logger.info("preSend, message: {}", message); + } + + @Override + public void postSend(Message message, Context attributes) { + logger.info("postSend, message: {}", message); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleTransactionStateCheckListener.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleTransactionStateCheckListener.java new file mode 100644 index 0000000..7f34d2c --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SimpleTransactionStateCheckListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.message.Message; +import io.openmessaging.producer.TransactionStateCheckListener; +import io.openmessaging.spring.boot.annotation.OMSTransactionStateCheckListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@OMSTransactionStateCheckListener +public class SimpleTransactionStateCheckListener implements TransactionStateCheckListener { + + protected final Logger logger = LoggerFactory.getLogger(SimpleTransactionStateCheckListener.class); + + @Override + public void check(Message message, TransactionalContext context) { + logger.info("check transaction, message: {}", message); + context.commit(); + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SpringBootMain.java b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SpringBootMain.java new file mode 100644 index 0000000..a48467b --- /dev/null +++ b/openmessaging-spring-samples/src/main/java/io/openmessaging/samples/springboot/SpringBootMain.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.samples.springboot; + +import io.openmessaging.message.Message; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +import javax.annotation.Resource; + +@SpringBootApplication +@ComponentScan("io.openmessaging.samples.springboot") +public class SpringBootMain implements InitializingBean { + + protected final Logger logger = LoggerFactory.getLogger(SpringBootMain.class); + + @Resource + private Producer producer; + + public static void main(String[] args) { + SpringApplication.run(SpringBootMain.class); + } + + @Override + public void afterPropertiesSet() throws Exception { + for (int i = 0; i < 10; i++) { + Message message = producer.createMessage("test_topic_0", "test".getBytes()); + SendResult sendResult = producer.send(message); + logger.info("sendResult: {}", sendResult); + } + } +} \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/resources/application.properties b/openmessaging-spring-samples/src/main/resources/application.properties new file mode 100644 index 0000000..2e0574a --- /dev/null +++ b/openmessaging-spring-samples/src/main/resources/application.properties @@ -0,0 +1,7 @@ +spring.oms.url=oms:jmq://test_app@nameserver-jmq.jd.local/UNKNOWN +spring.oms.attributes[ACCOUNT_KEY]=test_token + +#spring.oms.consumer.enable=false +#spring.oms.producer.enable=false +#spring.oms.interceptor.enable=false +#spring.oms.producer.transaction.check.enable=false \ No newline at end of file diff --git a/openmessaging-spring-samples/src/main/resources/spring-sample.xml b/openmessaging-spring-samples/src/main/resources/spring-sample.xml new file mode 100644 index 0000000..34815dc --- /dev/null +++ b/openmessaging-spring-samples/src/main/resources/spring-sample.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..3178bf7 --- /dev/null +++ b/pom.xml @@ -0,0 +1,70 @@ + + + + io.openmessaging + parent + 1.0.0-beta-SNAPSHOT + + 4.0.0 + + openmessaging-spring + 1.0.0-beta-SNAPSHOT + pom + + openmessaging-spring-core + openmessaging-spring-boot + openmessaging-spring-samples + + + + 1.0.0-beta-SNAPSHOT + 1.7.5 + 5.1.5.RELEASE + 2.1.3.RELEASE + + + + + io.openmessaging + openmessaging-api + ${openmessaging-api.version} + + + org.slf4j + slf4j-api + ${slf4j-version} + + + + + + + org.springframework.boot + spring-boot-starter + ${spring-boot.version} + + + org.springframework.boot + spring-boot-autoconfigure + ${spring-boot.version} + + + org.springframework.boot + spring-boot-configuration-processor + ${spring-boot.version} + + + org.springframework + spring-beans + ${spring.version} + + + org.springframework + spring-context + ${spring.version} + + + + \ No newline at end of file