Skip to content

Commit

Permalink
ISSUE#76 Add support for message stream with multiple subscription in…
Browse files Browse the repository at this point in the history
… courier service interface (#77)
  • Loading branch information
deepanshu42 authored Sep 14, 2023
1 parent 84784b2 commit 3ca9fb1
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 19 deletions.
1 change: 1 addition & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dependencies {

implementation project(':courier-stream-adapter-rxjava2')
implementation project(':courier-message-adapter-gson')
implementation project(':courier-message-adapter-text')
implementation project(':courier-message-adapter-moshi')
implementation project(':courier-message-adapter-protobuf')
implementation project(':adaptive-keep-alive')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.gojek.courier.annotation.Data
import com.gojek.courier.annotation.Path
import com.gojek.courier.annotation.Send
import com.gojek.courier.annotation.Subscribe
import com.gojek.courier.annotation.SubscribeMultiple
import com.gojek.courier.annotation.TopicMap
import com.gojek.courier.annotation.Unsubscribe
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
Expand All @@ -16,8 +18,11 @@ interface CourierService {
fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback)

@Subscribe(topic = "{topic}")
fun subscribe(@Path("topic") topic: String): Observable<Message>
fun subscribe(@Path("topic") topic: String): Observable<String>

@Unsubscribe(topics = ["{topic}"])
fun unsubscribe(@Path("topic") topic: String)

@SubscribeMultiple
fun subscribeAll(@TopicMap topicMap: Map<String, QoS>): Observable<String>
}
15 changes: 13 additions & 2 deletions app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import com.gojek.chuckmqtt.external.MqttChuckConfig
import com.gojek.chuckmqtt.external.MqttChuckInterceptor
import com.gojek.chuckmqtt.external.Period
import com.gojek.courier.Courier
import com.gojek.courier.QoS
import com.gojek.courier.QoS.ZERO
import com.gojek.courier.app.R
import com.gojek.courier.app.data.network.CourierService
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.courier.logging.ILogger
import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory
import com.gojek.courier.messageadapter.text.TextMessageAdapterFactory
import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory
import com.gojek.mqtt.auth.Authenticator
import com.gojek.mqtt.client.MqttClient
Expand Down Expand Up @@ -102,7 +105,15 @@ class MainActivity : AppCompatActivity() {
}

subscribe.setOnClickListener {
courierService.subscribe(topic = topic.text.toString())
val topics = topic.text.toString().split(",")
val stream = if (topics.size == 1) {
courierService.subscribe(topic = topics[0])
} else {
val topicMap = mutableMapOf<String, QoS>()
for (topic in topics) { topicMap[topic] = ZERO }
courierService.subscribeAll(topicMap = topicMap)
}
stream.subscribe { Log.d("Courier", "Message received: $it") }
}

unsubscribe.setOnClickListener {
Expand Down Expand Up @@ -161,7 +172,7 @@ class MainActivity : AppCompatActivity() {
val configuration = Courier.Configuration(
client = mqttClient,
streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()),
messageAdapterFactories = listOf(GsonMessageAdapterFactory()),
messageAdapterFactories = listOf(TextMessageAdapterFactory(), GsonMessageAdapterFactory()),
logger = getLogger()
)
val courier = Courier(configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,40 @@ internal class MethodAnnotationsParser(
}

private fun parseSubscribeAllMethodAnnotations(method: Method) {
method.requireReturnTypeIsOneOf(Void.TYPE) {
"SubscribeAll method must return Void: $method"
}
method.requireParameterTypes(ParameterizedType::class.java) {
"Only one argument with parameterized type is allowed"
method.requireReturnTypeIsOneOf(Void.TYPE, ParameterizedType::class.java) {
"Subscribe method must return Void or ParameterizedType: $method"
}
val annotations = method.parameterAnnotations[0].filter { it.isParameterAnnotation() }
require(annotations.size == 1) {
"A parameter must have one and only one parameter annotation"
}
require(method.parameterTypes[0] == Map::class.java) {
"Parameter should be of Map<String, Qos> type $method"
"Parameter should be of Map<String, QoS> type $method"
}
val actualTypeArguments =
(method.genericParameterTypes[0] as ParameterizedType).actualTypeArguments
require(actualTypeArguments[0] == String::class.java) {
"Parameter should be of Map<String, Qos> type $method"
"Parameter should be of Map<String, QoS> type $method"
}
require(actualTypeArguments[1].getRawType() == QoS::class.java) {
"Parameter should be of Map<String, Qos> type $method"
"Parameter should be of Map<String, QoS> type $method"
}

if (method.genericReturnType == Void.TYPE) {
stubMethod = StubMethod.SubscribeAll
} else {
method.requireReturnTypeIsResolvable {
"Method return type must not include a type variable or wildcard: ${method.genericReturnType}"
}

val streamType = method.genericReturnType as ParameterizedType
val messageType = streamType.getFirstTypeArgument()

val streamAdapter = streamAdapterResolver.resolve(streamType)
val messageAdapter = messageAdapterResolver.resolve(messageType, method.annotations)

stubMethod = StubMethod.SubscribeAllWithStream(messageAdapter, streamAdapter)
}
stubMethod = StubMethod.SubscribeAll
}

private fun parseUnsubscribeMethodAnnotations(method: Method) {
Expand Down
38 changes: 38 additions & 0 deletions courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,44 @@ internal class Coordinator(
return status
}

override fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array<Any>): Any {
logger.d("Coordinator", "Subscribe method invoked for multiple topics")
val topicList = (args[0] as Map<String, QoS>).toList()
if (topicList.size == 1) {
client.subscribe(topicList[0])
} else {
client.subscribe(topicList[0], *topicList.toTypedArray().sliceArray(IntRange(1, topicList.size - 1)))
}
logger.d("Coordinator", "Subscribed topics: $topicList")
val flowable = Flowable.create(
FlowableOnSubscribe<MqttMessage> { emitter ->
val listener = object : MessageListener {
override fun onMessageReceived(mqttMessage: MqttMessage) {
if (emitter.isCancelled.not()) {
emitter.onNext(mqttMessage)
}
}
}
for (topic in topicList) {
client.addMessageListener(topic.first, listener)
emitter.setCancellable { client.removeMessageListener(topic.first, listener) }
}
},
BackpressureStrategy.BUFFER
)

val stream = flowable
.observeOn(Schedulers.computation())
.flatMap { mqttMessage ->
mqttMessage.message.adapt(
mqttMessage.topic,
stubMethod.messageAdapter
)?.let { Flowable.just(it) } ?: Flowable.empty()
}
.toStream()
return stubMethod.streamAdapter.adapt(stream)
}

override fun getEventStream(): Stream<MqttEvent> {
return object : Stream<MqttEvent> {
override fun start(observer: Observer<MqttEvent>): Disposable {
Expand Down
4 changes: 4 additions & 0 deletions courier/src/main/java/com/gojek/courier/stub/StubInterface.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ internal class StubInterface(
is StubMethod.SubscribeAll -> {
callback.subscribeAll(stubMethod, args)
}
is StubMethod.SubscribeAllWithStream -> {
callback.subscribeAllWithStream(stubMethod, args)
}
is StubMethod.Unsubscribe -> {
callback.unsubscribe(stubMethod, args)
}
Expand All @@ -41,6 +44,7 @@ internal class StubInterface(
fun subscribeWithStream(stubMethod: StubMethod.SubscribeWithStream, args: Array<Any>): Any
fun unsubscribe(stubMethod: StubMethod.Unsubscribe, args: Array<Any>): Any
fun subscribeAll(stubMethod: StubMethod.SubscribeAll, args: Array<Any>): Any
fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array<Any>): Any
fun getEventStream(): Stream<MqttEvent>
fun getConnectionState(): ConnectionState
}
Expand Down
5 changes: 5 additions & 0 deletions courier/src/main/java/com/gojek/courier/stub/StubMethod.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ internal sealed class StubMethod {

object SubscribeAll : StubMethod()

class SubscribeAllWithStream(
val messageAdapter: MessageAdapter<Any>,
val streamAdapter: StreamAdapter<Any, Any>
) : StubMethod()

class Unsubscribe(
val argumentProcessor: UnsubscriptionArgumentProcessor
) : StubMethod()
Expand Down
15 changes: 11 additions & 4 deletions docs/docs/CourierService.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Courier provides the functionalities like Send, Receive, Subscribe, Unsubscribe

### Usage

Declare a service interface for various actions like Send, Receive, Subscribe, Unsubscribe.
Declare a service interface for various actions like Send, Receive, Subscribe, SubscribeMultiple, Unsubscribe.

~~~ kotlin
interface MessageService {
Expand All @@ -16,12 +16,17 @@ interface MessageService {

@Subscribe(topic = "topic/{id}/receive", qos = QoS.ONE)
fun subscribe(@Path("id") identifier: String): Observable<Message>

@SubscribeMultiple
fun subscribe(@TopicMap topicMap: Map<String, QoS>): Observable<Message>

@Unsubscribe(topics = ["topic/{id}/receive"])
fun unsubscribe(@Path("id") identifier: String)
}
~~~



Use Courier to create an implementation of service interface.

~~~ kotlin
Expand All @@ -44,12 +49,14 @@ Following annotations are supported for service interface.

- **@Subscribe** : A method annotation used for subscribing a single topic over the MQTT connection.

- **@SubscribeMultiple** : A method annotation used for subscribing multiple topic over the MQTT connection.
- **@SubscribeMultiple** : A method annotation used for subscribing multiple topics over the MQTT connection.

- **@Unsubscribe** : A method annotation used for unsubscribing a single topic over the MQTT connection.
- **@Unsubscribe** : A method annotation used for unsubscribing topics over the MQTT connection.

- **@Path** : A parameter annotation used for specifying a path variable in an MQTT topic.

- **@Data** : A parameter annotation used for specifying the message object while sending a message over the MQTT connection.

- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics.
- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics.

**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format.
8 changes: 5 additions & 3 deletions docs/docs/SubscribeUnsubscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface MessageService {
fun subscribe(@Path("id") identifier: String): Observable<Message>

@SubscribeMultiple
fun subscribeMultiple(@TopicMap topics: Map<String, QoS>)
fun subscribeMultiple(@TopicMap topics: Map<String, QoS>): Observable<Message>

@Unsubscribe(topics = ["topic/{id}/receive"])
fun unsubscribe(@Path("id") identifier: String)
Expand All @@ -22,7 +22,9 @@ messageService.subscribe("user-id").subscribe { message ->
print(message)
}

messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO))
messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO)).subscribe { message ->
print(message)
}

messageService.unsubscribe("user-id")
~~~
Expand All @@ -35,6 +37,6 @@ mqttClient.subscribe("topic1" to QoS.ZERO, "topic2" to QoS.ONE)
mqttClient.unsubscribe("topic1", "topic2")
~~~


**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format.


0 comments on commit 3ca9fb1

Please sign in to comment.