Skip to content

Commit

Permalink
register a grpc-gateway to mux http request (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Oct 13, 2023
1 parent 4481888 commit 55e7183
Show file tree
Hide file tree
Showing 20 changed files with 7,940 additions and 243 deletions.
6 changes: 6 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ managed:
enabled: true
go_package_prefix:
default: generated/protos
except:
- buf.build/googleapis/googleapis
plugins:
- plugin: buf.build/protocolbuffers/go:v1.31.0
out: flow/generated/protos
Expand All @@ -27,3 +29,7 @@ plugins:
opt:
- esModuleInterop=true
- outputServices=grpc-js
- plugin: buf.build/grpc-ecosystem/gateway:v2.18.0
out: flow/generated/protos
opt:
- paths=source_relative
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ services:
dockerfile: stacks/flow-api.Dockerfile
ports:
- 8112:8112
- 8113:8113
environment:
<<: [*catalog-config]
TEMPORAL_HOST_PORT: temporal:7233
Expand Down
44 changes: 44 additions & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"fmt"
"net"
"net/http"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"

"go.temporal.io/sdk/client"
Expand All @@ -19,9 +23,37 @@ import (
type APIServerParams struct {
ctx context.Context
Port uint
GatewayPort uint
TemporalHostPort string
}

// setupGRPCGatewayServer sets up the grpc-gateway mux
func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
conn, err := grpc.DialContext(
context.Background(),
fmt.Sprintf("0.0.0.0:%d", args.Port),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

if err != nil {
return nil, fmt.Errorf("unable to dial grpc server: %w", err)
}

gwmux := runtime.NewServeMux()
err = protos.RegisterFlowServiceHandler(context.Background(), gwmux, conn)
if err != nil {
return nil, fmt.Errorf("unable to register gateway: %w", err)
}

server := &http.Server{
Addr: fmt.Sprintf(":%d", args.GatewayPort),
Handler: gwmux,
ReadHeaderTimeout: 5 * time.Minute,
}
return server, nil
}

func APIMain(args *APIServerParams) error {
ctx := args.ctx

Expand Down Expand Up @@ -58,6 +90,18 @@ func APIMain(args *APIServerParams) error {
}
}()

gateway, err := setupGRPCGatewayServer(args)
if err != nil {
return fmt.Errorf("unable to setup gateway server: %w", err)
}

log.Infof("Starting API gateway on port %d", args.GatewayPort)
go func() {
if err := gateway.ListenAndServe(); err != nil {
log.Fatalf("failed to serve http: %v", err)
}
}()

<-ctx.Done()

grpcServer.GracefulStop()
Expand Down
6 changes: 6 additions & 0 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func main() {
Aliases: []string{"p"},
Value: 8110,
},
// gateway port is the port that the grpc-gateway listens on
&cli.UintFlag{
Name: "gateway-port",
Value: 8111,
},
temporalHostPortFlag,
},
Action: func(ctx *cli.Context) error {
Expand All @@ -119,6 +124,7 @@ func main() {
ctx: appCtx,
Port: ctx.Uint("port"),
TemporalHostPort: temporalHostPort,
GatewayPort: ctx.Uint("gateway-port"),
})
},
},
Expand Down
120 changes: 120 additions & 0 deletions flow/generated/protos/google/api/annotations.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 55e7183

Please sign in to comment.