Skip to content

Commit

Permalink
feat: inject topics/subscriptions in go
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Nov 15, 2024
1 parent e7b0ee2 commit e747065
Show file tree
Hide file tree
Showing 50 changed files with 857 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@ type Event struct {
//
// Used to test encryption of topic_events and async_calls tables

var Topic = ftl.Topic[Event]("topic")
var _ = ftl.Subscription(Topic, "subscription")
type Topic = ftl.TopicHandle[Event]
type Subscription = ftl.SubscriptionHandle[Topic, ConsumeClient, Event]

//ftl:verb
func Publish(ctx context.Context, e Event) error {
func Publish(ctx context.Context, e Event, topic Topic) error {
fmt.Printf("Publishing event: %s\n", e.Name)
return Topic.Publish(ctx, e)
return topic.Publish(ctx, e)
}

//ftl:verb
//ftl:subscribe subscription
func Consume(ctx context.Context, e Event) error {
fmt.Printf("Received event: %s\n", e.Name)
if e.Name != "AliceInWonderland" {
Expand Down
27 changes: 27 additions & 0 deletions backend/controller/encryption/testdata/go/encryption/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions backend/controller/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
)

//ftl:export
var TestTopic = ftl.Topic[PubSubEvent]("testTopic")
type TestTopic = ftl.TopicHandle[PubSubEvent]

type PubSubEvent struct {
Time time.Time
}

//ftl:verb
func PublishTen(ctx context.Context) error {
func PublishTen(ctx context.Context, topic TestTopic) error {
logger := ftl.LoggerFromContext(ctx)
for i := 0; i < 10; i++ {
t := time.Now()
logger.Infof("Publishing %v", t)
err := TestTopic.Publish(ctx, PubSubEvent{Time: t})
err := topic.Publish(ctx, PubSubEvent{Time: t})
if err != nil {
return err
}
Expand All @@ -30,20 +30,20 @@ func PublishTen(ctx context.Context) error {
}

//ftl:verb
func PublishOne(ctx context.Context) error {
func PublishOne(ctx context.Context, topic TestTopic) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing %v", t)
return TestTopic.Publish(ctx, PubSubEvent{Time: t})
return topic.Publish(ctx, PubSubEvent{Time: t})
}

//ftl:export
var Topic2 = ftl.Topic[PubSubEvent]("topic2")
type Topic2 = ftl.TopicHandle[PubSubEvent]

//ftl:verb
func PublishOneToTopic2(ctx context.Context) error {
func PublishOneToTopic2(ctx context.Context, topic Topic2) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to topic_2 %v", t)
return Topic2.Publish(ctx, PubSubEvent{Time: t})
return topic.Publish(ctx, PubSubEvent{Time: t})
}
27 changes: 27 additions & 0 deletions backend/controller/pubsub/testdata/go/publisher/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions backend/controller/pubsub/testdata/go/slow/slow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

var Topic = ftl.Topic[Event]("topic")
var _ = ftl.Subscription(Topic, "slowSubscription")
type Topic = ftl.TopicHandle[Event]
type SlowSubscription = ftl.SubscriptionHandle[Topic, ConsumeClient, Event]

type Event struct {
Duration int
Expand All @@ -21,9 +21,9 @@ type PublishRequest struct {
}

//ftl:verb
func Publish(ctx context.Context, req PublishRequest) error {
func Publish(ctx context.Context, req PublishRequest, topic Topic) error {
for _, duration := range req.Durations {
err := Topic.Publish(ctx, Event{Duration: duration})
err := topic.Publish(ctx, Event{Duration: duration})
if err != nil {
return err
}
Expand All @@ -32,7 +32,6 @@ func Publish(ctx context.Context, req PublishRequest) error {
}

//ftl:verb
//ftl:subscribe slowSubscription
func Consume(ctx context.Context, event Event) error {
for i := range event.Duration {
select {
Expand Down
22 changes: 22 additions & 0 deletions backend/controller/pubsub/testdata/go/slow/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 8 additions & 13 deletions backend/controller/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,43 @@ package subscriber
import (
"context"
"fmt"
"strings"
"time"

"ftl/builtin"
"ftl/publisher"
"strings"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl"
"github.com/alecthomas/atomic"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
//"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

var _ = ftl.Subscription(publisher.TestTopic, "testTopicSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription2")
type TestTopicSubscription = ftl.SubscriptionHandle[publisher.TestTopic, ConsumeClient, publisher.PubSubEvent]
type DoomedSubscription = ftl.SubscriptionHandle[publisher.Topic2, ConsumeButFailAndRetryClient, publisher.PubSubEvent]
type DoomedSubscription2 = ftl.SubscriptionHandle[publisher.Topic2, PublishToExternalModuleClient, publisher.PubSubEvent]

var catchCount atomic.Value[int]

//ftl:verb
//ftl:subscribe testTopicSubscription
func Consume(ctx context.Context, req publisher.PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time)
return nil
}

//ftl:verb
//ftl:subscribe doomedSubscription
//ftl:retry 2 1s 1s catch catch
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
//ftl:subscribe doomedSubscription2
//ftl:retry 1 1s 1s catch catchAny
func ConsumeButFailAndCatchAny(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
func PublishToExternalModule(ctx context.Context, topic publisher.TestTopic) error {
// Get around compile-time checks
var topic = publisher.TestTopic
return topic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}

Expand Down
44 changes: 44 additions & 0 deletions backend/controller/pubsub/testdata/go/subscriber/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion docs/content/docs/reference/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ Subscribers can have a retry policy. For example:
<!-- go -->

```go
//ftl:subscribe exampleSubscription
//ftl:retry 5 1s catch recoverPaymentProcessing
func ProcessPayment(ctx context.Context, payment Payment) error {
...
Expand Down
7 changes: 3 additions & 4 deletions examples/go/pubsub/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ type OrderPizzaResponse struct {
}

//ftl:verb export
func OrderPizza(ctx context.Context, req OrderPizzaRequest) (OrderPizzaResponse, error) {
func OrderPizza(ctx context.Context, req OrderPizzaRequest, topic NewOrderTopic) (OrderPizzaResponse, error) {
randomID := rand.Intn(1000)
p := Pizza{
ID: randomID,
Type: req.Type.Default("cheese"),
Customer: req.Customer,
}
ftl.LoggerFromContext(ctx).Infof("Ordering pizza with ID: %d", randomID)
NewOrderTopic.Publish(ctx, p)
topic.Publish(ctx, p)
return OrderPizzaResponse{ID: randomID}, nil
}

var _ = ftl.Subscription(PizzaReadyTopic, "deliverPizzaSub")
type DeliverPizzaSub = ftl.SubscriptionHandle[PizzaReadyTopic, DeliverPizzaClient, Pizza]

//ftl:subscribe deliverPizzaSub
func DeliverPizza(ctx context.Context, pizza Pizza) error {
ftl.LoggerFromContext(ctx).Infof("Delivering pizza: %v", pizza)
return nil
Expand Down
11 changes: 5 additions & 6 deletions examples/go/pubsub/kitchen.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ import (
)

//ftl:export
var NewOrderTopic = ftl.Topic[Pizza]("newOrderTopic")
type NewOrderTopic = ftl.TopicHandle[Pizza]

//ftl:export
var PizzaReadyTopic = ftl.Topic[Pizza]("pizzaReadyTopic")
type PizzaReadyTopic = ftl.TopicHandle[Pizza]

var _ = ftl.Subscription(NewOrderTopic, "cookPizzaSub")
type CookPizzaSub = ftl.SubscriptionHandle[NewOrderTopic, CookPizzaClient, Pizza]

type Pizza struct {
ID int
Type string
Customer string
}

//ftl:subscribe cookPizzaSub
func CookPizza(ctx context.Context, pizza Pizza) error {
func CookPizza(ctx context.Context, pizza Pizza, topic PizzaReadyTopic) error {
ftl.LoggerFromContext(ctx).Infof("Cooking pizza: %v", pizza)
return PizzaReadyTopic.Publish(ctx, pizza)
return topic.Publish(ctx, pizza)
}
12 changes: 11 additions & 1 deletion examples/go/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@ package time
import (
"context"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

type Message struct {
Content string
}

type TimeRequest struct{}
type TimeResponse struct {
Time time.Time
}

type T = ftl.TopicHandle[Message]
type E = ftl.SubscriptionHandle[T, TimeClient, Message]

// Time returns the current time.
//
//ftl:verb export
func Time(ctx context.Context, req TimeRequest, ic InternalClient) (TimeResponse, error) {
func Time(ctx context.Context, req TimeRequest, ic InternalClient, topic ftl.TopicHandle[Message]) (TimeResponse, error) {
topic.Publish(ctx, Message{Content: "Time was requested"})
internalTime, err := ic(ctx, req)
if err != nil {
return TimeResponse{}, err
Expand Down
2 changes: 1 addition & 1 deletion examples/go/time/types.ftl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion frontend/cli/testdata/go/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func Echo(ctx context.Context, req EchoRequest, tc time.TimeClient) (EchoRespons

var _ = ftl.Subscription(time.Invoices, "emailInvoices")

//ftl:subscribe emailInvoices
func SendInvoiceEmail(ctx context.Context, in time.Invoice) error {
if in.Amount == 10 {
return fmt.Errorf("can't process $10 invoices")
Expand Down
Loading

0 comments on commit e747065

Please sign in to comment.