Skip to content

Commit

Permalink
enable user token reading
Browse files Browse the repository at this point in the history
Signed-off-by: mteodor <[email protected]>
  • Loading branch information
mteodor committed Oct 6, 2021
1 parent 1518187 commit c56c919
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
49 changes: 45 additions & 4 deletions cmd/cassandra-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthTimeout = "1s"
defUsersAuthTimeout = "1s"

envLogLevel = "MF_CASSANDRA_READER_LOG_LEVEL"
envPort = "MF_CASSANDRA_READER_PORT"
Expand All @@ -63,6 +65,7 @@ const (
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)

type config struct {
Expand All @@ -75,7 +78,9 @@ type config struct {
serverKey string
jaegerURL string
thingsAuthURL string
usersAuthURL string
thingsAuthTimeout time.Duration
usersAuthTimeout time.Duration
}

func main() {
Expand All @@ -96,11 +101,21 @@ func main() {
defer thingsCloser.Close()

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()

authConn := connectToAuth(cfg, logger)
defer authConn.Close()

auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout)

authReader := readers.NewAuthService(tc, auth)

repo := newService(session, logger)

errs := make(chan error, 2)

go startHTTPServer(repo, tc, cfg, errs, logger)
go startHTTPServer(repo, authReader, cfg, errs, logger)

go func() {
c := make(chan os.Signal)
Expand All @@ -112,6 +127,32 @@ func main() {
logger.Error(fmt.Sprintf("Cassandra reader service terminated: %s", err))
}

func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("connecting to auth via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}

conn, err := grpc.Dial(cfg.usersAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}

return conn
}

func loadConfig() config {
dbPort, err := strconv.Atoi(mainflux.Env(envDBPort, defDBPort))
if err != nil {
Expand Down Expand Up @@ -230,14 +271,14 @@ func newService(session *gocql.Session, logger logger.Logger) readers.MessageRep
return repo
}

func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, cfg config, errs chan error, logger logger.Logger) {
func startHTTPServer(repo readers.MessageRepository, auth readers.Auth, cfg config, errs chan error, logger logger.Logger) {
p := fmt.Sprintf(":%s", cfg.port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("Cassandra reader service started using https on port %s with cert %s key %s",
cfg.port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, "cassandra-reader"))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, auth, "cassandra-reader"))
return
}
logger.Info(fmt.Sprintf("Cassandra reader service started, exposed port %s", cfg.port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader"))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, auth, "cassandra-reader"))
}
1 change: 0 additions & 1 deletion cmd/influxdb-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/influxdb"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"

opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
Expand Down
48 changes: 45 additions & 3 deletions cmd/postgres-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
Expand Down Expand Up @@ -50,6 +51,7 @@ const (
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthTimeout = "1s"
defUsersAuthTimeout = "1s"

envLogLevel = "MF_POSTGRES_READER_LOG_LEVEL"
envPort = "MF_POSTGRES_READER_PORT"
Expand All @@ -67,6 +69,8 @@ const (
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
envAuthURL = "MF_AUTH_GRPC_URL"
envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)

type config struct {
Expand All @@ -77,7 +81,9 @@ type config struct {
dbConfig postgres.Config
jaegerURL string
thingsAuthURL string
usersAuthURL string
thingsAuthTimeout time.Duration
usersAuthTimeout time.Duration
}

func main() {
Expand All @@ -96,14 +102,24 @@ func main() {

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()

authConn := connectToAuth(cfg, logger)
defer authConn.Close()

auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout)

authReader := readers.NewAuthService(tc, auth)

db := connectToDB(cfg.dbConfig, logger)
defer db.Close()

repo := newService(db, logger)

errs := make(chan error, 2)

go startHTTPServer(repo, tc, cfg.port, logger, errs)
go startHTTPServer(repo, authReader, cfg.port, logger, errs)

go func() {
c := make(chan os.Signal)
Expand All @@ -115,6 +131,32 @@ func main() {
logger.Error(fmt.Sprintf("Postgres reader service terminated: %s", err))
}

func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("connecting to auth via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}

conn, err := grpc.Dial(cfg.usersAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}

return conn
}

func loadConfig() config {
dbConfig := postgres.Config{
Host: mainflux.Env(envDBHost, defDBHost),
Expand Down Expand Up @@ -229,8 +271,8 @@ func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository {
return svc
}

func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) {
func startHTTPServer(repo readers.MessageRepository, auth readers.Auth, port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Postgres reader service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, svcName))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, auth, svcName))
}

0 comments on commit c56c919

Please sign in to comment.