Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RDO-6-1-liner-sdk-examples #150

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
75 changes: 75 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,81 @@ go get github.com/memphisdev/memphis.go
import "github.com/memphisdev/memphis.go"
```

### Quickstart - Producing and Consuming

The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages.

First, a connection to Memphis must be made:

```js
const { memphis } = require('memphis-dev');

// Connecting to the broker
memphis = Memphis()

let conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountid>), //You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored
memphis.Password(<memphis-password>),
)

if err != nil{
return
}

defer conn.Close()

```

Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function:

```go
message := make(map[string]any)

message["Hello"] = "World"

err := conn.Produce(
"<station-name>",
"<producer-name>",
message,
[]memphis.ProducerOpt{},
[]memphis.ProduceOpt{},
)

if err != nil{
return
}
```

Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function:

```go
messages, err := conn.FetchMessages(
"<station-name>",
"<consumer-name>",
)

if err != nil{
return
}

var msg_map map[string]any
for _, message := range messages{
err = json.Unmarshal(message.Data(), &msg_map)
if err != nil{
continue
}

// Do something with the message

err = message.Ack()
if err != nil{
continue
}
}
```

### Connecting to Memphis
```go
c, err := memphis.Connect("<memphis-host>",
Expand Down
71 changes: 43 additions & 28 deletions examples/consumer.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,62 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/memphisdev/memphis.go"
)

func main() {
conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("<broker-token>"))
if err != nil {
fmt.Printf("Connection failed: %v", err)
os.Exit(1)
func main(){
conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountid>),
memphis.Password(<mempis-password>),
)

if err != nil{
fmt.Print(err)
return
}

defer conn.Close()

consumer, err := conn.CreateConsumer("<station-name>", "<consumer-name>", memphis.PullInterval(15*time.Second))
consumer, err := conn.CreateConsumer("<station-name>", "<consumer-name>")

if err != nil {
fmt.Printf("Consumer creation failed: %v\n", err)
os.Exit(1)
if err != nil{
fmt.Println(err)
return
}

handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
if err != nil {
fmt.Printf("Fetch failed: %v\n", err)
return
for true {
messages, err := consumer.Fetch()

if len(messages) == 0 {
continue
}

for _, msg := range msgs {
fmt.Println(string(msg.Data()))
msg.Ack()
headers := msg.GetHeaders()
fmt.Println(headers)
if err != nil{
fmt.Print(err)
return
}

var msg_map map[string]any
for _, message := range messages{
err = json.Unmarshal(message.Data(), &msg_map)
if err != nil{
fmt.Print(err)
continue
}

// Do something with the message

err = message.Ack()
if err != nil{
fmt.Print(err)
continue
}
}
}

consumer.Consume(handler)

// The program will close the connection after 30 seconds,
// the message handler may be called after the connection closed
// so the handler may receive a timeout error
time.Sleep(30 * time.Second)
}
}
47 changes: 25 additions & 22 deletions examples/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,36 @@ import (
"github.com/memphisdev/memphis.go"
)

func main() {
conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("<broker-token>"))
if err != nil {
fmt.Printf("Connection failed: %v", err)
os.Exit(1)
func main(){
conn, err := memphis.Connect(
"<memphis-host>",
"<memphis-username>",
memphis.AccountId(<memphis-accountID>),
memphis.Password(<memphis-password>),
)

if err != nil{
fmt.Print(err)
return
}
defer conn.Close()
p, err := conn.CreateProducer("<station-name>", "<producer-name>")

if err != nil {
fmt.Printf("Create Producer failed: %v", err)
os.Exit(1)
}
defer conn.Close()

hdrs := memphis.Headers{}
hdrs.New()
err = hdrs.Add("key", "value")
producer, err := conn.CreateProducer("<station-name>", "<producer-name>")

if err != nil {
fmt.Printf("Header failed: %v", err)
os.Exit(1)
if err != nil{
fmt.Print(err)
return
}

err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
message := make(map[string]any)

if err != nil {
fmt.Printf("Produce failed: %v", err)
os.Exit(1)
message["Hello"] = "World"
for i := 0; i < 3; i++{
err = producer.Produce(message)
}

if err != nil{
return
}
}
}
Loading