diff --git a/app/build.gradle b/app/build.gradle index e0dd5ef..68537ee 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -62,6 +62,7 @@ dependencies { implementation project(':courier-stream-adapter-rxjava2') implementation project(':courier-message-adapter-gson') + implementation project(':courier-message-adapter-text') implementation project(':courier-message-adapter-moshi') implementation project(':courier-message-adapter-protobuf') implementation project(':adaptive-keep-alive') diff --git a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt index 7e296dc..40b0654 100644 --- a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt +++ b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt @@ -6,6 +6,8 @@ import com.gojek.courier.annotation.Data import com.gojek.courier.annotation.Path import com.gojek.courier.annotation.Send import com.gojek.courier.annotation.Subscribe +import com.gojek.courier.annotation.SubscribeMultiple +import com.gojek.courier.annotation.TopicMap import com.gojek.courier.annotation.Unsubscribe import com.gojek.courier.app.data.network.model.Message import com.gojek.courier.callback.SendMessageCallback @@ -16,8 +18,11 @@ interface CourierService { fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback) @Subscribe(topic = "{topic}") - fun subscribe(@Path("topic") topic: String): Observable + fun subscribe(@Path("topic") topic: String): Observable @Unsubscribe(topics = ["{topic}"]) fun unsubscribe(@Path("topic") topic: String) + + @SubscribeMultiple + fun subscribeAll(@TopicMap topicMap: Map): Observable } \ No newline at end of file diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index 68848d1..2c4e524 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -7,12 +7,15 @@ import com.gojek.chuckmqtt.external.MqttChuckConfig import com.gojek.chuckmqtt.external.MqttChuckInterceptor import com.gojek.chuckmqtt.external.Period import com.gojek.courier.Courier +import com.gojek.courier.QoS +import com.gojek.courier.QoS.ZERO import com.gojek.courier.app.R import com.gojek.courier.app.data.network.CourierService import com.gojek.courier.app.data.network.model.Message import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.logging.ILogger import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory +import com.gojek.courier.messageadapter.text.TextMessageAdapterFactory import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory import com.gojek.mqtt.auth.Authenticator import com.gojek.mqtt.client.MqttClient @@ -102,7 +105,15 @@ class MainActivity : AppCompatActivity() { } subscribe.setOnClickListener { - courierService.subscribe(topic = topic.text.toString()) + val topics = topic.text.toString().split(",") + val stream = if (topics.size == 1) { + courierService.subscribe(topic = topics[0]) + } else { + val topicMap = mutableMapOf() + for (topic in topics) { topicMap[topic] = ZERO } + courierService.subscribeAll(topicMap = topicMap) + } + stream.subscribe { Log.d("Courier", "Message received: $it") } } unsubscribe.setOnClickListener { @@ -161,7 +172,7 @@ class MainActivity : AppCompatActivity() { val configuration = Courier.Configuration( client = mqttClient, streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()), - messageAdapterFactories = listOf(GsonMessageAdapterFactory()), + messageAdapterFactories = listOf(TextMessageAdapterFactory(), GsonMessageAdapterFactory()), logger = getLogger() ) val courier = Courier(configuration) diff --git a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt index 401135f..42b5afa 100644 --- a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt +++ b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt @@ -137,28 +137,40 @@ internal class MethodAnnotationsParser( } private fun parseSubscribeAllMethodAnnotations(method: Method) { - method.requireReturnTypeIsOneOf(Void.TYPE) { - "SubscribeAll method must return Void: $method" - } - method.requireParameterTypes(ParameterizedType::class.java) { - "Only one argument with parameterized type is allowed" + method.requireReturnTypeIsOneOf(Void.TYPE, ParameterizedType::class.java) { + "Subscribe method must return Void or ParameterizedType: $method" } val annotations = method.parameterAnnotations[0].filter { it.isParameterAnnotation() } require(annotations.size == 1) { "A parameter must have one and only one parameter annotation" } require(method.parameterTypes[0] == Map::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" } val actualTypeArguments = (method.genericParameterTypes[0] as ParameterizedType).actualTypeArguments require(actualTypeArguments[0] == String::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" } require(actualTypeArguments[1].getRawType() == QoS::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" + } + + if (method.genericReturnType == Void.TYPE) { + stubMethod = StubMethod.SubscribeAll + } else { + method.requireReturnTypeIsResolvable { + "Method return type must not include a type variable or wildcard: ${method.genericReturnType}" + } + + val streamType = method.genericReturnType as ParameterizedType + val messageType = streamType.getFirstTypeArgument() + + val streamAdapter = streamAdapterResolver.resolve(streamType) + val messageAdapter = messageAdapterResolver.resolve(messageType, method.annotations) + + stubMethod = StubMethod.SubscribeAllWithStream(messageAdapter, streamAdapter) } - stubMethod = StubMethod.SubscribeAll } private fun parseUnsubscribeMethodAnnotations(method: Method) { diff --git a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt index 453f980..38e75f8 100644 --- a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt +++ b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt @@ -148,6 +148,44 @@ internal class Coordinator( return status } + override fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array): Any { + logger.d("Coordinator", "Subscribe method invoked for multiple topics") + val topicList = (args[0] as Map).toList() + if (topicList.size == 1) { + client.subscribe(topicList[0]) + } else { + client.subscribe(topicList[0], *topicList.toTypedArray().sliceArray(IntRange(1, topicList.size - 1))) + } + logger.d("Coordinator", "Subscribed topics: $topicList") + val flowable = Flowable.create( + FlowableOnSubscribe { emitter -> + val listener = object : MessageListener { + override fun onMessageReceived(mqttMessage: MqttMessage) { + if (emitter.isCancelled.not()) { + emitter.onNext(mqttMessage) + } + } + } + for (topic in topicList) { + client.addMessageListener(topic.first, listener) + emitter.setCancellable { client.removeMessageListener(topic.first, listener) } + } + }, + BackpressureStrategy.BUFFER + ) + + val stream = flowable + .observeOn(Schedulers.computation()) + .flatMap { mqttMessage -> + mqttMessage.message.adapt( + mqttMessage.topic, + stubMethod.messageAdapter + )?.let { Flowable.just(it) } ?: Flowable.empty() + } + .toStream() + return stubMethod.streamAdapter.adapt(stream) + } + override fun getEventStream(): Stream { return object : Stream { override fun start(observer: Observer): Disposable { diff --git a/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt b/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt index 35918cb..9d14201 100644 --- a/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt +++ b/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt @@ -28,6 +28,9 @@ internal class StubInterface( is StubMethod.SubscribeAll -> { callback.subscribeAll(stubMethod, args) } + is StubMethod.SubscribeAllWithStream -> { + callback.subscribeAllWithStream(stubMethod, args) + } is StubMethod.Unsubscribe -> { callback.unsubscribe(stubMethod, args) } @@ -41,6 +44,7 @@ internal class StubInterface( fun subscribeWithStream(stubMethod: StubMethod.SubscribeWithStream, args: Array): Any fun unsubscribe(stubMethod: StubMethod.Unsubscribe, args: Array): Any fun subscribeAll(stubMethod: StubMethod.SubscribeAll, args: Array): Any + fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array): Any fun getEventStream(): Stream fun getConnectionState(): ConnectionState } diff --git a/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt b/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt index 783d0d9..0d44215 100644 --- a/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt +++ b/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt @@ -40,6 +40,11 @@ internal sealed class StubMethod { object SubscribeAll : StubMethod() + class SubscribeAllWithStream( + val messageAdapter: MessageAdapter, + val streamAdapter: StreamAdapter + ) : StubMethod() + class Unsubscribe( val argumentProcessor: UnsubscriptionArgumentProcessor ) : StubMethod() diff --git a/docs/docs/CourierService.md b/docs/docs/CourierService.md index 2885eca..72e5677 100644 --- a/docs/docs/CourierService.md +++ b/docs/docs/CourierService.md @@ -4,7 +4,7 @@ Courier provides the functionalities like Send, Receive, Subscribe, Unsubscribe ### Usage -Declare a service interface for various actions like Send, Receive, Subscribe, Unsubscribe. +Declare a service interface for various actions like Send, Receive, Subscribe, SubscribeMultiple, Unsubscribe. ~~~ kotlin interface MessageService { @@ -16,12 +16,17 @@ interface MessageService { @Subscribe(topic = "topic/{id}/receive", qos = QoS.ONE) fun subscribe(@Path("id") identifier: String): Observable + + @SubscribeMultiple + fun subscribe(@TopicMap topicMap: Map): Observable @Unsubscribe(topics = ["topic/{id}/receive"]) fun unsubscribe(@Path("id") identifier: String) } ~~~ + + Use Courier to create an implementation of service interface. ~~~ kotlin @@ -44,12 +49,14 @@ Following annotations are supported for service interface. - **@Subscribe** : A method annotation used for subscribing a single topic over the MQTT connection. -- **@SubscribeMultiple** : A method annotation used for subscribing multiple topic over the MQTT connection. +- **@SubscribeMultiple** : A method annotation used for subscribing multiple topics over the MQTT connection. -- **@Unsubscribe** : A method annotation used for unsubscribing a single topic over the MQTT connection. +- **@Unsubscribe** : A method annotation used for unsubscribing topics over the MQTT connection. - **@Path** : A parameter annotation used for specifying a path variable in an MQTT topic. - **@Data** : A parameter annotation used for specifying the message object while sending a message over the MQTT connection. -- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics. \ No newline at end of file +- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics. + +**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format. \ No newline at end of file diff --git a/docs/docs/SubscribeUnsubscribe.md b/docs/docs/SubscribeUnsubscribe.md index f20ab07..1b2649e 100644 --- a/docs/docs/SubscribeUnsubscribe.md +++ b/docs/docs/SubscribeUnsubscribe.md @@ -10,7 +10,7 @@ interface MessageService { fun subscribe(@Path("id") identifier: String): Observable @SubscribeMultiple - fun subscribeMultiple(@TopicMap topics: Map) + fun subscribeMultiple(@TopicMap topics: Map): Observable @Unsubscribe(topics = ["topic/{id}/receive"]) fun unsubscribe(@Path("id") identifier: String) @@ -22,7 +22,9 @@ messageService.subscribe("user-id").subscribe { message -> print(message) } -messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO)) +messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO)).subscribe { message -> + print(message) +} messageService.unsubscribe("user-id") ~~~ @@ -35,6 +37,6 @@ mqttClient.subscribe("topic1" to QoS.ZERO, "topic2" to QoS.ONE) mqttClient.unsubscribe("topic1", "topic2") ~~~ - +**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format.