From c56c919936e8ac33355c157c113dbfab01e86185 Mon Sep 17 00:00:00 2001 From: mteodor Date: Wed, 6 Oct 2021 14:41:56 +0200 Subject: [PATCH] enable user token reading Signed-off-by: mteodor --- cmd/cassandra-reader/main.go | 49 +++++++++++++++++++++++++++++++++--- cmd/influxdb-reader/main.go | 1 - cmd/postgres-reader/main.go | 48 ++++++++++++++++++++++++++++++++--- 3 files changed, 90 insertions(+), 8 deletions(-) diff --git a/cmd/cassandra-reader/main.go b/cmd/cassandra-reader/main.go index 34adafee1bb..830d6c2a0e0 100644 --- a/cmd/cassandra-reader/main.go +++ b/cmd/cassandra-reader/main.go @@ -19,6 +19,7 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/gocql/gocql" "github.com/mainflux/mainflux" + authapi "github.com/mainflux/mainflux/auth/api/grpc" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers/api" @@ -48,6 +49,7 @@ const ( defJaegerURL = "" defThingsAuthURL = "localhost:8181" defThingsAuthTimeout = "1s" + defUsersAuthTimeout = "1s" envLogLevel = "MF_CASSANDRA_READER_LOG_LEVEL" envPort = "MF_CASSANDRA_READER_PORT" @@ -63,6 +65,7 @@ const ( envJaegerURL = "MF_JAEGER_URL" envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL" envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT" + envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT" ) type config struct { @@ -75,7 +78,9 @@ type config struct { serverKey string jaegerURL string thingsAuthURL string + usersAuthURL string thingsAuthTimeout time.Duration + usersAuthTimeout time.Duration } func main() { @@ -96,11 +101,21 @@ func main() { defer thingsCloser.Close() tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) + authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger) + defer authCloser.Close() + + authConn := connectToAuth(cfg, logger) + defer authConn.Close() + + auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout) + + authReader := readers.NewAuthService(tc, auth) + repo := newService(session, logger) errs := make(chan error, 2) - go startHTTPServer(repo, tc, cfg, errs, logger) + go startHTTPServer(repo, authReader, cfg, errs, logger) go func() { c := make(chan os.Signal) @@ -112,6 +127,32 @@ func main() { logger.Error(fmt.Sprintf("Cassandra reader service terminated: %s", err)) } +func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn { + var opts []grpc.DialOption + logger.Info("connecting to auth via gRPC") + if cfg.clientTLS { + if cfg.caCerts != "" { + tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "") + if err != nil { + logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err)) + os.Exit(1) + } + opts = append(opts, grpc.WithTransportCredentials(tpc)) + } + } else { + opts = append(opts, grpc.WithInsecure()) + logger.Info("gRPC communication is not encrypted") + } + + conn, err := grpc.Dial(cfg.usersAuthURL, opts...) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err)) + os.Exit(1) + } + + return conn +} + func loadConfig() config { dbPort, err := strconv.Atoi(mainflux.Env(envDBPort, defDBPort)) if err != nil { @@ -230,14 +271,14 @@ func newService(session *gocql.Session, logger logger.Logger) readers.MessageRep return repo } -func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, cfg config, errs chan error, logger logger.Logger) { +func startHTTPServer(repo readers.MessageRepository, auth readers.Auth, cfg config, errs chan error, logger logger.Logger) { p := fmt.Sprintf(":%s", cfg.port) if cfg.serverCert != "" || cfg.serverKey != "" { logger.Info(fmt.Sprintf("Cassandra reader service started using https on port %s with cert %s key %s", cfg.port, cfg.serverCert, cfg.serverKey)) - errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, "cassandra-reader")) + errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, auth, "cassandra-reader")) return } logger.Info(fmt.Sprintf("Cassandra reader service started, exposed port %s", cfg.port)) - errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader")) + errs <- http.ListenAndServe(p, api.MakeHandler(repo, auth, "cassandra-reader")) } diff --git a/cmd/influxdb-reader/main.go b/cmd/influxdb-reader/main.go index 809f27a5e79..ac09be184d5 100644 --- a/cmd/influxdb-reader/main.go +++ b/cmd/influxdb-reader/main.go @@ -21,7 +21,6 @@ import ( "github.com/mainflux/mainflux/readers/api" "github.com/mainflux/mainflux/readers/influxdb" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" - opentracing "github.com/opentracing/opentracing-go" stdprometheus "github.com/prometheus/client_golang/prometheus" jconfig "github.com/uber/jaeger-client-go/config" diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index f94acbbfc61..c41479c5060 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -18,6 +18,7 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux" + authapi "github.com/mainflux/mainflux/auth/api/grpc" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers/api" @@ -50,6 +51,7 @@ const ( defJaegerURL = "" defThingsAuthURL = "localhost:8181" defThingsAuthTimeout = "1s" + defUsersAuthTimeout = "1s" envLogLevel = "MF_POSTGRES_READER_LOG_LEVEL" envPort = "MF_POSTGRES_READER_PORT" @@ -67,6 +69,8 @@ const ( envJaegerURL = "MF_JAEGER_URL" envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL" envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT" + envAuthURL = "MF_AUTH_GRPC_URL" + envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT" ) type config struct { @@ -77,7 +81,9 @@ type config struct { dbConfig postgres.Config jaegerURL string thingsAuthURL string + usersAuthURL string thingsAuthTimeout time.Duration + usersAuthTimeout time.Duration } func main() { @@ -96,6 +102,16 @@ func main() { tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) + authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger) + defer authCloser.Close() + + authConn := connectToAuth(cfg, logger) + defer authConn.Close() + + auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout) + + authReader := readers.NewAuthService(tc, auth) + db := connectToDB(cfg.dbConfig, logger) defer db.Close() @@ -103,7 +119,7 @@ func main() { errs := make(chan error, 2) - go startHTTPServer(repo, tc, cfg.port, logger, errs) + go startHTTPServer(repo, authReader, cfg.port, logger, errs) go func() { c := make(chan os.Signal) @@ -115,6 +131,32 @@ func main() { logger.Error(fmt.Sprintf("Postgres reader service terminated: %s", err)) } +func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn { + var opts []grpc.DialOption + logger.Info("connecting to auth via gRPC") + if cfg.clientTLS { + if cfg.caCerts != "" { + tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "") + if err != nil { + logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err)) + os.Exit(1) + } + opts = append(opts, grpc.WithTransportCredentials(tpc)) + } + } else { + opts = append(opts, grpc.WithInsecure()) + logger.Info("gRPC communication is not encrypted") + } + + conn, err := grpc.Dial(cfg.usersAuthURL, opts...) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err)) + os.Exit(1) + } + + return conn +} + func loadConfig() config { dbConfig := postgres.Config{ Host: mainflux.Env(envDBHost, defDBHost), @@ -229,8 +271,8 @@ func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository { return svc } -func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) { +func startHTTPServer(repo readers.MessageRepository, auth readers.Auth, port string, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", port) logger.Info(fmt.Sprintf("Postgres reader service started, exposed port %s", port)) - errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, svcName)) + errs <- http.ListenAndServe(p, api.MakeHandler(repo, auth, svcName)) }