diff --git a/realtime/channels.go b/realtime/channels.go index 57669cd..9d1c86d 100644 --- a/realtime/channels.go +++ b/realtime/channels.go @@ -69,6 +69,7 @@ func (c *Client) GetChannelSubscriptions() ([]models.ChannelSubscription, error) for _, sub := range channelSubs { channelSubscription := models.ChannelSubscription{ ID: stringOrZero(sub.Path("_id").Data()), + RoomId: stringOrZero(sub.Path("rid").Data()), Alert: sub.Path("alert").Data().(bool), Name: stringOrZero(sub.Path("name").Data()), DisplayName: stringOrZero(sub.Path("fname").Data()), diff --git a/realtime/messages.go b/realtime/messages.go index 2022656..d6294e3 100644 --- a/realtime/messages.go +++ b/realtime/messages.go @@ -158,16 +158,16 @@ func (c *Client) UnPinMessage(message *models.Message) error { // Returns a buffered channel // // https://rocket.chat/docs/developer-guides/realtime-api/subscriptions/stream-room-messages/ -func (c *Client) SubscribeToMessageStream(channel *models.Channel, msgChannel chan models.Message) error { +func (c *Client) SubscribeToMessageStream(channel *models.Channel) (chan models.Message, error) { if err := c.ddp.Sub("stream-room-messages", channel.ID, send_added_event); err != nil { - return err + return nil, err } - //msgChannel := make(chan models.Message, default_buffer_size) - c.ddp.CollectionByName("stream-room-messages").AddUpdateListener(messageExtractor{msgChannel, "update"}) + msgChannel := make(chan models.Message, default_buffer_size) + c.ddp.CollectionByName("stream-room-messages").AddUpdateListener(messageExtractor{msgChannel, channel.ID}) - return nil + return msgChannel, nil } func getMessagesFromUpdateEvent(update ddp.Update) []models.Message { @@ -232,11 +232,11 @@ func stringOrZero(i interface{}) string { type messageExtractor struct { messageChannel chan models.Message - operation string + channelId string } func (u messageExtractor) CollectionUpdate(collection, operation, id string, doc ddp.Update) { - if operation == u.operation { + if operation == "update" && doc["eventName"] == u.channelId { msgs := getMessagesFromUpdateEvent(doc) for _, m := range msgs { u.messageChannel <- m diff --git a/realtime/messages_test.go b/realtime/messages_test.go index 9e1b875..77a380c 100644 --- a/realtime/messages_test.go +++ b/realtime/messages_test.go @@ -12,9 +12,8 @@ func TestClient_SubscribeToMessageStream(t *testing.T) { general := models.Channel{ID: "GENERAL"} textToSend := "RealtimeTest" - messageChannel := make(chan models.Message, 1) - err := c.SubscribeToMessageStream(&general, messageChannel) + messageChannel, err := c.SubscribeToMessageStream(&general) assert.Nil(t, err, "Function returned error") assert.NotNil(t, messageChannel, "Function didn't returned general") @@ -52,9 +51,8 @@ func TestClient_SubscribeToMessageStream_UnknownChannel(t *testing.T) { c := getLoggedInClient(t) channel := models.Channel{ID: "unknown"} - messageChannel := make(chan models.Message, 1) - err := c.SubscribeToMessageStream(&channel, messageChannel) + _, err := c.SubscribeToMessageStream(&channel) assert.NotNil(t, err, "Function didn't return error") }