Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update channel subscription to match docs and fix doubles #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions realtime/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (c *Client) GetChannelSubscriptions() ([]models.ChannelSubscription, error)
for _, sub := range channelSubs {
channelSubscription := models.ChannelSubscription{
ID: stringOrZero(sub.Path("_id").Data()),
RoomId: stringOrZero(sub.Path("rid").Data()),
Alert: sub.Path("alert").Data().(bool),
Name: stringOrZero(sub.Path("name").Data()),
DisplayName: stringOrZero(sub.Path("fname").Data()),
Expand Down
14 changes: 7 additions & 7 deletions realtime/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,16 @@ func (c *Client) UnPinMessage(message *models.Message) error {
// Returns a buffered channel
//
// https://rocket.chat/docs/developer-guides/realtime-api/subscriptions/stream-room-messages/
func (c *Client) SubscribeToMessageStream(channel *models.Channel, msgChannel chan models.Message) error {
func (c *Client) SubscribeToMessageStream(channel *models.Channel) (chan models.Message, error) {
Copy link
Collaborator

@geekgonecrazy geekgonecrazy Feb 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to pass the channel to use in is actually one of the benefits here. It would let you set up a loop to handle all messages.

Kinda like the example I sent in #golangsdk

ch := make(chan models.Message)
for _, channel := range channels {
	_ = client.SubscribeToMessageStream(&channel, ch)

	fmt.Println("Started listening channel: " + channel.ID)
}

go func () {
      for msg := range ch {
			fmt.Printf("Message [%s]: %+v\n", msg.RoomID, msg)
      }
}()

Here that loop handles all messages from all channels. Of course you could choose to divide it up and handle different rooms separately if you wished.

Copy link
Author

@CthulhuDen CthulhuDen Feb 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally be fine with this either way, because it's no great task to merge channels anyway, it's just that returning channel seemed ergonomically better and also matched function documentation.

If the other change (to the extractor part) makes sense to you I can resubmit these changes without returning channel.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the better thing here is to keep passing it in so we don't break existing that are using it this way


if err := c.ddp.Sub("stream-room-messages", channel.ID, send_added_event); err != nil {
return err
return nil, err
}

//msgChannel := make(chan models.Message, default_buffer_size)
c.ddp.CollectionByName("stream-room-messages").AddUpdateListener(messageExtractor{msgChannel, "update"})
msgChannel := make(chan models.Message, default_buffer_size)
c.ddp.CollectionByName("stream-room-messages").AddUpdateListener(messageExtractor{msgChannel, channel.ID})

return nil
return msgChannel, nil
}

func getMessagesFromUpdateEvent(update ddp.Update) []models.Message {
Expand Down Expand Up @@ -232,11 +232,11 @@ func stringOrZero(i interface{}) string {

type messageExtractor struct {
messageChannel chan models.Message
operation string
channelId string
}

func (u messageExtractor) CollectionUpdate(collection, operation, id string, doc ddp.Update) {
if operation == u.operation {
if operation == "update" && doc["eventName"] == u.channelId {
msgs := getMessagesFromUpdateEvent(doc)
for _, m := range msgs {
u.messageChannel <- m
Expand Down
6 changes: 2 additions & 4 deletions realtime/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ func TestClient_SubscribeToMessageStream(t *testing.T) {

general := models.Channel{ID: "GENERAL"}
textToSend := "RealtimeTest"
messageChannel := make(chan models.Message, 1)

err := c.SubscribeToMessageStream(&general, messageChannel)
messageChannel, err := c.SubscribeToMessageStream(&general)

assert.Nil(t, err, "Function returned error")
assert.NotNil(t, messageChannel, "Function didn't returned general")
Expand Down Expand Up @@ -52,9 +51,8 @@ func TestClient_SubscribeToMessageStream_UnknownChannel(t *testing.T) {

c := getLoggedInClient(t)
channel := models.Channel{ID: "unknown"}
messageChannel := make(chan models.Message, 1)

err := c.SubscribeToMessageStream(&channel, messageChannel)
_, err := c.SubscribeToMessageStream(&channel)

assert.NotNil(t, err, "Function didn't return error")
}