From f980c9e8650b87120d31cf655d676d01dca9ed71 Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 26 Dec 2023 21:30:08 -0600 Subject: [PATCH 1/8] "Added quickstart section" --- README.md | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/README.md b/README.md index 27b40f8..82ba83d 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,76 @@ go get github.com/memphisdev/memphis.go import "github.com/memphisdev/memphis.go" ``` +### Quickstart - Producing and Consuming + +The most basic functionaly of memphis is the ability to produce messages to a station and to consume those messages. + +First, a connection to Memphis must be made: + +```js +const { memphis } = require('memphis-dev'); + +// Connecting to the broker +memphis = Memphis() + +let conn = await memphis.connect({ + host: "aws-us-east-1.cloud.memphis.dev", + username: "test_user", // (root/application type user) + accountId: process.env.memphis_account_id, //You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored + password: process.env.memphis_pass +}); +``` + +Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function: + +```go +accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) + +conn, err := memphis.Connect( + "aws-us-east-1.cloud.memphis.dev", + "test_user", + memphis.AccountId(accountID), + memphis.Password(os.Getenv("memphis_pass")), +) + +if err != nil{ + return +} + +defer conn.Close() +``` + +Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function: + +```go +messages, err := conn.FetchMessages( + "test_station", + "consumer", +) + +if err != nil{ + fmt.Print(err) + return +} + +var msg_map map[string]any +for _, message := range messages{ + err = json.Unmarshal(message.Data(), &msg_map) + if err != nil{ + fmt.Print(err) + continue + } + + // Do something with the message + + err = message.Ack() + if err != nil{ + fmt.Print(err) + continue + } +} +``` + ### Connecting to Memphis ```go c, err := memphis.Connect("", From 3b939a0eec4046f7448f64aeac62432389158ed2 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 28 Dec 2023 19:31:45 -0600 Subject: [PATCH 2/8] "Updated examples" --- examples/consumer.go | 68 ++++++++++++++++++++++++++------------------ examples/producer.go | 55 ++++++++++++++++++----------------- 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/examples/consumer.go b/examples/consumer.go index 68b5e7a..610240f 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -1,47 +1,59 @@ package main import ( - "context" + "encoding/json" "fmt" "os" - "time" + "strconv" "github.com/memphisdev/memphis.go" ) -func main() { - conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("")) - if err != nil { - fmt.Printf("Connection failed: %v", err) - os.Exit(1) +func main(){ + accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) + + conn, err := memphis.Connect( + "aws-us-east-1.cloud.memphis.dev", + "test_user", + memphis.AccountId(accountID), + memphis.Password(os.Getenv("memphis_pass")), + ) + + if err != nil{ + fmt.Print(err) + return } + defer conn.Close() - consumer, err := conn.CreateConsumer("", "", memphis.PullInterval(15*time.Second)) + consumer, _ := conn.CreateConsumer("name", "name") + + consumer.Fetch() - if err != nil { - fmt.Printf("Consumer creation failed: %v\n", err) - os.Exit(1) + messages, err := conn.FetchMessages( + "test_station", + "consumer", + ) + + if err != nil{ + fmt.Print(err) + return } - handler := func(msgs []*memphis.Msg, err error, ctx context.Context) { - if err != nil { - fmt.Printf("Fetch failed: %v\n", err) - return + var msg_map map[string]any + for _, message := range messages{ + err = json.Unmarshal(message.Data(), &msg_map) + if err != nil{ + fmt.Print(err) + continue } - for _, msg := range msgs { - fmt.Println(string(msg.Data())) - msg.Ack() - headers := msg.GetHeaders() - fmt.Println(headers) + // Do something with the message + + err = message.Ack() + if err != nil{ + fmt.Print(err) + continue } } - - consumer.Consume(handler) - - // The program will close the connection after 30 seconds, - // the message handler may be called after the connection closed - // so the handler may receive a timeout error - time.Sleep(30 * time.Second) -} +} \ No newline at end of file diff --git a/examples/producer.go b/examples/producer.go index 2a4a525..d15ac51 100644 --- a/examples/producer.go +++ b/examples/producer.go @@ -1,39 +1,42 @@ package main import ( - "fmt" "os" - + "strconv" + "github.com/memphisdev/memphis.go" ) -func main() { - conn, err := memphis.Connect("localhost", "root", memphis.ConnectionToken("")) - if err != nil { - fmt.Printf("Connection failed: %v", err) - os.Exit(1) - } - defer conn.Close() - p, err := conn.CreateProducer("", "") +func main(){ + accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) - if err != nil { - fmt.Printf("Create Producer failed: %v", err) - os.Exit(1) - } + conn, err := memphis.Connect( + "aws-us-east-1.cloud.memphis.dev", + "test_user", + memphis.AccountId(accountID), + memphis.Password(os.Getenv("memphis_pass")), + ) - hdrs := memphis.Headers{} - hdrs.New() - err = hdrs.Add("key", "value") - - if err != nil { - fmt.Printf("Header failed: %v", err) - os.Exit(1) + if err != nil{ + return } - err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs)) + defer conn.Close() - if err != nil { - fmt.Printf("Produce failed: %v", err) - os.Exit(1) + message := make(map[string]any) + + message["Hello"] = "World" + for i := 0; i < 3; i++{ + err = conn.Produce( + "test_station", + "producer", + message, + []memphis.ProducerOpt{}, + []memphis.ProduceOpt{}, + ) + } + + if err != nil{ + return } -} +} \ No newline at end of file From 9665a3a5a17fa80b9628ec79db9fc5167860ccc7 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 28 Dec 2023 20:18:46 -0600 Subject: [PATCH 3/8] "Updated examples" --- examples/consumer.go | 9 ++------- examples/producer.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/examples/consumer.go b/examples/consumer.go index 610240f..b5c5445 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -26,14 +26,9 @@ func main(){ defer conn.Close() - consumer, _ := conn.CreateConsumer("name", "name") + consumer, _ := conn.CreateConsumer("test_station", "consumer") - consumer.Fetch() - - messages, err := conn.FetchMessages( - "test_station", - "consumer", - ) + messages, err := consumer.Fetch() if err != nil{ fmt.Print(err) diff --git a/examples/producer.go b/examples/producer.go index d15ac51..de74a09 100644 --- a/examples/producer.go +++ b/examples/producer.go @@ -1,9 +1,10 @@ package main import ( + "fmt" "os" "strconv" - + "github.com/memphisdev/memphis.go" ) @@ -18,22 +19,24 @@ func main(){ ) if err != nil{ + fmt.Print(err) return } defer conn.Close() + producer, err := conn.CreateProducer("test_station", "producer") + + if err != nil{ + fmt.Print(err) + return + } + message := make(map[string]any) message["Hello"] = "World" for i := 0; i < 3; i++{ - err = conn.Produce( - "test_station", - "producer", - message, - []memphis.ProducerOpt{}, - []memphis.ProduceOpt{}, - ) + err = producer.Produce(message) } if err != nil{ From c97028f5b66f312b88732b27c2aed5801e85736b Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 2 Jan 2024 09:27:22 -0600 Subject: [PATCH 4/8] "Added missing produce quickstart example" --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 82ba83d..4a3e8ad 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,27 @@ if err != nil{ } defer conn.Close() + +if err != nil{ + fmt.Print(err) + return +} + +message := make(map[string]any) + +message["Hello"] = "World" + +err := conn.Produce( + "test_station", + "producer", + message, + []memphis.ProducerOpt{}, + []memphis.ProduceOpt{}, +) + +if err != nil{ + return +} ``` Lastly, to consume this message, call the `memphis.fetch_messages` function or create a consumer and call its `consumer.fetch` function: From 356583fd04867f6a3864dea7fd45ba4e4e84d23d Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 2 Jan 2024 09:43:44 -0600 Subject: [PATCH 5/8] "Changed params to generic params and cleaned up the example to be a little smaller" --- README.md | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 4a3e8ad..cad9a8c 100644 --- a/README.md +++ b/README.md @@ -55,24 +55,11 @@ const { memphis } = require('memphis-dev'); // Connecting to the broker memphis = Memphis() -let conn = await memphis.connect({ - host: "aws-us-east-1.cloud.memphis.dev", - username: "test_user", // (root/application type user) - accountId: process.env.memphis_account_id, //You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored - password: process.env.memphis_pass -}); -``` - -Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function: - -```go -accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) - -conn, err := memphis.Connect( - "aws-us-east-1.cloud.memphis.dev", - "test_user", - memphis.AccountId(accountID), - memphis.Password(os.Getenv("memphis_pass")), +let conn, err := memphis.Connect( + "", + "", + memphis.AccountId(), //You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored + memphis.Password(), ) if err != nil{ @@ -81,18 +68,18 @@ if err != nil{ defer conn.Close() -if err != nil{ - fmt.Print(err) - return -} +``` +Then, to produce a message, call the `memphis.produce` function or create a producer and call its `producer.produce` function: + +```go message := make(map[string]any) message["Hello"] = "World" err := conn.Produce( - "test_station", - "producer", + "", + "", message, []memphis.ProducerOpt{}, []memphis.ProduceOpt{}, @@ -107,12 +94,11 @@ Lastly, to consume this message, call the `memphis.fetch_messages` function or c ```go messages, err := conn.FetchMessages( - "test_station", - "consumer", + "", + "", ) if err != nil{ - fmt.Print(err) return } @@ -120,7 +106,6 @@ var msg_map map[string]any for _, message := range messages{ err = json.Unmarshal(message.Data(), &msg_map) if err != nil{ - fmt.Print(err) continue } @@ -128,7 +113,6 @@ for _, message := range messages{ err = message.Ack() if err != nil{ - fmt.Print(err) continue } } From 8cc2d71b478b1ca43e127fea49a5e5cfec9f36df Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 4 Jan 2024 13:40:40 -0600 Subject: [PATCH 6/8] "Made examples use generic names" --- examples/consumer.go | 13 +++++-------- examples/producer.go | 13 +++++-------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/examples/consumer.go b/examples/consumer.go index b5c5445..4107831 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -4,19 +4,16 @@ import ( "encoding/json" "fmt" "os" - "strconv" "github.com/memphisdev/memphis.go" ) func main(){ - accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) - conn, err := memphis.Connect( - "aws-us-east-1.cloud.memphis.dev", - "test_user", - memphis.AccountId(accountID), - memphis.Password(os.Getenv("memphis_pass")), + "", + "", + memphis.AccountId(), + memphis.Password(), ) if err != nil{ @@ -26,7 +23,7 @@ func main(){ defer conn.Close() - consumer, _ := conn.CreateConsumer("test_station", "consumer") + consumer, _ := conn.CreateConsumer("", "") messages, err := consumer.Fetch() diff --git a/examples/producer.go b/examples/producer.go index de74a09..5f7f12f 100644 --- a/examples/producer.go +++ b/examples/producer.go @@ -3,19 +3,16 @@ package main import ( "fmt" "os" - "strconv" "github.com/memphisdev/memphis.go" ) func main(){ - accountID, _ := strconv.Atoi(os.Getenv("memphis_account_id")) - conn, err := memphis.Connect( - "aws-us-east-1.cloud.memphis.dev", - "test_user", - memphis.AccountId(accountID), - memphis.Password(os.Getenv("memphis_pass")), + "", + "", + memphis.AccountId(), + memphis.Password(), ) if err != nil{ @@ -25,7 +22,7 @@ func main(){ defer conn.Close() - producer, err := conn.CreateProducer("test_station", "producer") + producer, err := conn.CreateProducer("", "") if err != nil{ fmt.Print(err) From 6708c8ce04fc6fc1e6f3c73424567d93e0c68e8f Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 9 Jan 2024 14:34:10 -0600 Subject: [PATCH 7/8] "Consume now uses a for true in the example" --- examples/consumer.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/examples/consumer.go b/examples/consumer.go index 4107831..717ca23 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -25,27 +25,33 @@ func main(){ consumer, _ := conn.CreateConsumer("", "") - messages, err := consumer.Fetch() + for true { + messages, err := consumer.Fetch() - if err != nil{ - fmt.Print(err) - return - } - - var msg_map map[string]any - for _, message := range messages{ - err = json.Unmarshal(message.Data(), &msg_map) - if err != nil{ - fmt.Print(err) + if len(messages) == 0 { continue } - // Do something with the message - - err = message.Ack() if err != nil{ fmt.Print(err) - continue + return + } + + var msg_map map[string]any + for _, message := range messages{ + err = json.Unmarshal(message.Data(), &msg_map) + if err != nil{ + fmt.Print(err) + continue + } + + // Do something with the message + + err = message.Ack() + if err != nil{ + fmt.Print(err) + continue + } } } } \ No newline at end of file From ca882124a10a1fae1395be2cfa08643510153c81 Mon Sep 17 00:00:00 2001 From: John Peters Date: Mon, 15 Jan 2024 19:35:10 -0600 Subject: [PATCH 8/8] "Printing and returning on createconsumer error" --- examples/consumer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/consumer.go b/examples/consumer.go index 717ca23..9b6281a 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -23,7 +23,12 @@ func main(){ defer conn.Close() - consumer, _ := conn.CreateConsumer("", "") + consumer, err := conn.CreateConsumer("", "") + + if err != nil{ + fmt.Println(err) + return + } for true { messages, err := consumer.Fetch()