Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add tls support on port 10248 #56

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@ import (
"os"
"sync"

"github.com/alibaba/sentinel-golang/util"

"github.com/opensergo/opensergo-control-plane/pkg/controller"
"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
"github.com/pkg/errors"
)

type ControlPlane struct {
operator *controller.KubernetesOperator
server *transport.Server
operator *controller.KubernetesOperator
server *transport.Server
secureServer *transport.Server

protoDesc *trpb.ControlPlaneDesc

mux sync.RWMutex
ch chan error
}

func NewControlPlane() (*ControlPlane, error) {
Expand All @@ -44,6 +47,8 @@ func NewControlPlane() (*ControlPlane, error) {
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
// On port 10248, it can use tls transport
cp.secureServer = transport.NewSecureServer(uint32(10248), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
cp.operator = operator

hostname, herr := os.Hostname()
Expand All @@ -62,20 +67,49 @@ func (c *ControlPlane) Start() error {
if err != nil {
return err
}
// Run the transport server
err = c.server.Run()
if err != nil {
return err
}

return nil
go util.RunWithRecover(func() {
// Run the transport server
log.Println("Starting grpc server on port 10246!")
err = c.server.Run()
if err != nil {
c.ch <- err
log.Fatal("Failed to run the grpc server")
}
})

go util.RunWithRecover(func() {
// Run the secure transport server
log.Println("Starting secure grpc server on port 10248!")
err = c.secureServer.Run()
if err != nil {
c.ch <- err
log.Fatal("Failed to run the secure grpc server")
}
})
err = <-c.ch
return err
}

func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
connections, exists := c.server.ConnectionManager().Get(namespace, app, kind)
var connections []*transport.Connection
var exists bool
scs, exists := c.secureServer.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
return errors.New("There is no connection for this kind")
log.Printf("There is no secure connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections = append(connections, scs...)
}
cs, exists := c.server.ConnectionManager().Get(namespace, app, kind)
if !exists || connections == nil {
log.Printf("There is no connection for app %s kind %s in ns %s", app, kind, namespace)
} else {
connections = append(connections, cs...)
}
return c.innerSendMessage(namespace, app, kind, dataWithVersion, status, respId, connections)
}

func (c *ControlPlane) innerSendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string, connections []*transport.Connection) error {
for _, connection := range connections {
if connection == nil || !connection.IsValid() {
// TODO: log.Debug
Expand Down Expand Up @@ -106,22 +140,13 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
// var labels []model.LabelKV
// if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
// labels = append(labels, model.LabelKV{
// Key: label.Key,
// Value: label.Value,
// })
// }
// }
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream, isSecure bool) error {
for _, kind := range request.Target.Kinds {
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{
Namespace: request.Target.Namespace,
AppName: request.Target.App,
Kind: kind,
})
}, isSecure)
if err != nil {
status := &trpb.Status{
Code: transport.RegisterWatcherError,
Expand All @@ -135,8 +160,13 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
continue
}
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
// send if the watcher cache is not empty

if isSecure {
_ = c.secureServer.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
} else {
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
}

rules, version := crdWatcher.GetRules(model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
Expand Down
30 changes: 30 additions & 0 deletions pkg/cert/cert_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cert

import "crypto/tls"

var provider CertProvider

// Use EnvCertProvider by default
func init() {
provider = &EnvCertProvider{
certEnvKey: "OPENSERGO_10248_CERT",
pkEnvKey: "OPENSERGO_10248_KEY",
}
}

func GetCertificate(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
return provider.GetCert(info)
}
58 changes: 58 additions & 0 deletions pkg/cert/cert_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cert

import (
"crypto/tls"
"os"
"strings"
"sync"

"github.com/pkg/errors"
)

var certMU sync.RWMutex

type CertProvider interface {
GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error)
}

// EnvCertProvider reads cert and secret from ENV
type EnvCertProvider struct {
certEnvKey string
pkEnvKey string
}

func (e *EnvCertProvider) GetCert(info *tls.ClientHelloInfo) (*tls.Certificate, error) {
certMU.Lock()
defer certMU.Unlock()
c, s := os.Getenv(e.certEnvKey), os.Getenv(e.pkEnvKey)
if c == "" || s == "" {
return nil, errors.New("Read empty certificate or secret from env")
}
// In environment variable, the \n is replaced by whitespace character
// So we need to replace the whitespace with \n character in the first and last line of PEM file
// If not, the X509KeyPair func can not recognize the string from environment variable
c, s = e.polishCert(c, s)
keyPair, err := tls.X509KeyPair([]byte(c), []byte(s))
return &keyPair, err
}

func (e *EnvCertProvider) polishCert(c, s string) (string, string) {
c = strings.Replace(c, "----- ", "-----\n", -1)
c = strings.Replace(c, " -----", "\n-----", -1)
s = strings.Replace(s, "----- ", "-----\n", -1)
s = strings.Replace(s, " -----", "\n-----", -1)
return c, s
}
1 change: 1 addition & 0 deletions pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err != nil {
logger.Error(err, "Failed to send rules", "kind", r.kind)
}

return ctrl.Result{}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern
return k, nil
}

func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget) error {
_, err := k.RegisterWatcher(info)
func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTarget, isSecure bool) error {
_, err := k.RegisterWatcher(info, isSecure)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func (k *KubernetesOperator) RegisterControllersAndStart(info model.SubscribeTar

// RegisterWatcher registers given CRD type and CRD name.
// For each CRD type, it can be registered only once.
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRDWatcher, error) {
func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget, isSecure bool) (*CRDWatcher, error) {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

Expand Down Expand Up @@ -160,7 +160,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD
return k.controllers[target.Kind], nil
}

func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget, isSecure bool) error {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

Expand Down
1 change: 1 addition & 0 deletions pkg/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ func main() {
if err != nil {
log.Fatal(err)
}

}
2 changes: 1 addition & 1 deletion pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ type ClientIdentifier string

type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_SubscribeConfigServer

type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error
type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream, bool) error

type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error
30 changes: 27 additions & 3 deletions pkg/transport/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
package grpc

import (
"crypto/tls"
"fmt"
"io"
"log"
"net"

"github.com/opensergo/opensergo-control-plane/pkg/cert"

"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
"github.com/opensergo/opensergo-control-plane/pkg/util"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
Expand All @@ -42,10 +46,27 @@ type Server struct {
started *atomic.Bool
}

func NewSecureServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
connectionManager := NewConnectionManager()
cfg := &tls.Config{
GetCertificate: cert.GetCertificate,
ClientAuth: tls.VerifyClientCertIfGiven,
MinVersion: tls.VersionTLS12,
}
tlsCreds := credentials.NewTLS(cfg)
return &Server{
transportServer: newTransportServer(connectionManager, subscribeHandlers, true),
port: port,
grpcServer: grpc.NewServer(grpc.Creds(tlsCreds)),
started: atomic.NewBool(false),
connectionManager: connectionManager,
}
}

func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server {
connectionManager := NewConnectionManager()
return &Server{
transportServer: newTransportServer(connectionManager, subscribeHandlers),
transportServer: newTransportServer(connectionManager, subscribeHandlers, false),
port: port,
grpcServer: grpc.NewServer(),
started: atomic.NewBool(false),
Expand Down Expand Up @@ -84,6 +105,8 @@ type TransportServer struct {
connectionManager *ConnectionManager

subscribeHandlers []model.SubscribeRequestHandler
// whether the server use tls
isSecure bool
}

const (
Expand Down Expand Up @@ -144,7 +167,7 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
}

for _, handler := range s.subscribeHandlers {
err = handler(clientIdentifier, recvData, stream)
err = handler(clientIdentifier, recvData, stream, s.isSecure)
if err != nil {
// TODO: handle error
log.Printf("Failed to handle SubscribeRequest, err=%s\n", err.Error())
Expand All @@ -155,9 +178,10 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor
}
}

func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler) *TransportServer {
func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler, isSecure bool) *TransportServer {
return &TransportServer{
connectionManager: connectionManager,
subscribeHandlers: subscribeHandlers,
isSecure: isSecure,
}
}