Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dubboctl] add docker api client logic #573

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions dubboctl/pkg/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package builder

import (
"context"
"github.com/apache/dubbo-kubernetes/dubboctl/pkg/sdk/dubbo"
)

type Builder struct{}

func NewBuilder() *Builder {
return &Builder{}
}

func (b Builder) Build(ctx context.Context, f *dubbo.DubboConfig) error {
cli, _, err := docker.NewClient(client.DefaultDockerHost)
if err != nil {
return err
}
buildOpts := types.ImageBuildOptions{
Dockerfile: "Dockerfile",
Tags: []string{f.Image},
}

buildCtx, _ := archive.TarWithOptions(f.Root, &archive.TarOptions{})
resp, err := cli.ImageBuild(ctx, buildCtx, buildOpts)
if err != nil {
return err
}
defer resp.Body.Close()
termFd, isTerm := term.GetFdInfo(os.Stderr)
err = jsonmessage.DisplayJSONMessagesStream(resp.Body, os.Stderr, termFd, isTerm, nil)
if err != nil {
return err
}
return nil
}
190 changes: 190 additions & 0 deletions dubboctl/pkg/mirror/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package mirror

import (
"crypto/tls"
"crypto/x509"
"errors"
fnssh "github.com/apache/dubbo-kubernetes/dubboctl/pkg/mirror/ssh"
"github.com/docker/cli/cli/config"

Check failure on line 8 in dubboctl/pkg/mirror/client.go

View workflow job for this annotation

GitHub Actions / Go Test

missing go.sum entry for module providing package github.com/docker/cli/cli/config (imported by github.com/apache/dubbo-kubernetes/dubboctl/pkg/mirror); to add:
"github.com/docker/docker/client"
"github.com/docker/go-connections/tlsconfig"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"time"
)

var NoDockerAPIError = errors.New("docker API not available")

func NewClient(defaultHost string) (dockerClient client.CommonAPIClient, dockerHostInRemote string, err error) {
var u *url.URL

dockerHost := os.Getenv("DOCKER_HOST")
dockerHostSSHIdentity := os.Getenv("DOCKER_HOST_SSH_IDENTITY")
hostKeyCallback := fnssh.NewHostKeyCbk()

if dockerHost == "" {
u, err = url.Parse(defaultHost)
if err != nil {
return
}
_, err = os.Stat(u.Path)
switch {
case err == nil:
dockerHost = defaultHost
case err != nil && !os.IsNotExist(err):
return
}
}

if dockerHost == "" {
return nil, "", NoDockerAPIError
}

dockerHostInRemote = dockerHost

u, err = url.Parse(dockerHost)
isSSH := err == nil && u.Scheme == "ssh"
isTCP := err == nil && u.Scheme == "tcp"
isNPipe := err == nil && u.Scheme == "npipe"
isUnix := err == nil && u.Scheme == "unix"

if isTCP || isNPipe {
// With TCP or npipe, it's difficult to determine how to expose the daemon socket to lifecycle containers,
// so we are defaulting to standard docker location by returning empty string.
// This should work well most of the time.
dockerHostInRemote = ""
}

if isUnix && runtime.GOOS == "darwin" {
// A unix socket on macOS is most likely tunneled from VM,
// so it cannot be mounted under that path.
dockerHostInRemote = ""
}

if !isSSH {
opts := []client.Opt{client.FromEnv, client.WithAPIVersionNegotiation()}
if isTCP {
if httpClient := newHttpClient(); httpClient != nil {
opts = append(opts, client.WithHTTPClient(httpClient))
}
}
dockerClient, err = client.NewClientWithOpts(opts...)
return
}

credentialsConfig := fnssh.Config{
Identity: dockerHostSSHIdentity,
PassPhrase: os.Getenv("DOCKER_HOST_SSH_IDENTITY_PASSPHRASE"),
PasswordCallback: fnssh.NewPasswordCbk(),
PassPhraseCallback: fnssh.NewPassPhraseCbk(),
HostKeyCallback: hostKeyCallback,
}
contextDialer, dockerHostInRemote, err := fnssh.NewDialContext(u, credentialsConfig)
if err != nil {
return
}

httpClient := &http.Client{
// No tls
// No proxy
Transport: &http.Transport{
DialContext: contextDialer.DialContext,
},
}

dockerClient, err = client.NewClientWithOpts(
client.WithAPIVersionNegotiation(),
client.WithHTTPClient(httpClient),
client.WithHost("tcp://placeholder/"))

if closer, ok := contextDialer.(io.Closer); ok {
dockerClient = clientWithAdditionalCleanup{
CommonAPIClient: dockerClient,
cleanUp: func() {
closer.Close()
},
}
}

return dockerClient, dockerHostInRemote, err
}

type clientWithAdditionalCleanup struct {
client.CommonAPIClient
cleanUp func()
}

func (w clientWithAdditionalCleanup) Close() error {
defer w.cleanUp()
return w.CommonAPIClient.Close()
}

func newHttpClient() *http.Client {
tlsVerifyStr, tlsVerifyChanged := os.LookupEnv("DOCKER_TLS_VERIFY")

if !tlsVerifyChanged {
return nil
}

var tlsOpts []func(*tls.Config)

tlsVerify := true
if b, err := strconv.ParseBool(tlsVerifyStr); err == nil {
tlsVerify = b
}

if !tlsVerify {
tlsOpts = append(tlsOpts, func(t *tls.Config) {
t.InsecureSkipVerify = true
})
}

dockerCertPath := os.Getenv("DOCKER_CERT_PATH")
if dockerCertPath == "" {
dockerCertPath = config.Dir()
}

// Set root CA.
caData, err := os.ReadFile(filepath.Join(dockerCertPath, "ca.pem"))
if err == nil {
certPool := x509.NewCertPool()
if certPool.AppendCertsFromPEM(caData) {
tlsOpts = append(tlsOpts, func(t *tls.Config) {
t.RootCAs = certPool
})
}
}

// Set client certificate.
certData, certErr := os.ReadFile(filepath.Join(dockerCertPath, "cert.pem"))
keyData, keyErr := os.ReadFile(filepath.Join(dockerCertPath, "key.pem"))
if certErr == nil && keyErr == nil {
cliCert, err := tls.X509KeyPair(certData, keyData)
if err == nil {
tlsOpts = append(tlsOpts, func(cfg *tls.Config) {
cfg.Certificates = []tls.Certificate{cliCert}
})
}
}

dialer := &net.Dialer{
KeepAlive: 30 * time.Second,
Timeout: 30 * time.Second,
}

tlsConfig := tlsconfig.ClientDefault(tlsOpts...)

return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
DialContext: dialer.DialContext,
},
CheckRedirect: client.CheckRedirect,
}
}
38 changes: 38 additions & 0 deletions dubboctl/pkg/mirror/ssh/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ssh

import (
"context"
"golang.org/x/crypto/ssh"
"net"
)

type (
PasswordCallback func() (string, error)
PassPhraseCallback func() (string, error)
HostKeyCallback func(hostPort string, pubKey ssh.PublicKey) error
)

type Config struct {
Identity string
PassPhrase string
PasswordCallback PasswordCallback
PassPhraseCallback PassPhraseCallback
HostKeyCallback HostKeyCallback
}

type dialer struct {
sshClient *ssh.Client
network string
addr string
}

type ContextDialer interface {
DialContext(ctx context.Context, network, address string) (net.Conn, error)
}
type DialContextFn = func(ctx context.Context, network, addr string) (net.Conn, error)

type contextDialerFn DialContextFn

func (n contextDialerFn) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return n(ctx, network, address)
}
111 changes: 111 additions & 0 deletions dubboctl/pkg/mirror/ssh/terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package ssh

import (
"bufio"
"bytes"
"encoding/base64"
"errors"
"fmt"
"golang.org/x/crypto/ssh"
"golang.org/x/term"
"io"
"os"
"strings"
)

func readSecret(prompt string) (pw []byte, err error) {
fd := int(os.Stdin.Fd())
if term.IsTerminal(fd) {
fmt.Fprint(os.Stderr, prompt)
pw, err = term.ReadPassword(fd)
fmt.Fprintln(os.Stderr)
return
}
var b [1]byte
for {
n, err := os.Stdin.Read(b[:])
if n > 0 && b[0] != '\r' {
if b[0] == '\n' {
return pw, nil
}
pw = append(pw, b[0])
if len(pw) > 1024 {
err = errors.New("password too long, 1024 byte limit")
}
}
if err != nil {
if errors.Is(err, io.EOF) && len(pw) > 0 {
err = nil
}
return pw, err
}
}

}

func NewPasswordCbk() PasswordCallback {
var pwdSet bool
var pwd string
return func() (string, error) {
if pwdSet {
return pwd, nil
}

p, err := readSecret("please enter password:")
if err != nil {
return "", err
}
pwdSet = true
pwd = string(p)

return pwd, err
}
}

func NewPassPhraseCbk() PassPhraseCallback {
var pwdSet bool
var pwd string
return func() (string, error) {
if pwdSet {
return pwd, nil
}

p, err := readSecret("please enter passphrase to private key:")
if err != nil {
return "", err
}
pwdSet = true
pwd = string(p)

return pwd, err
}
}

func NewHostKeyCbk() HostKeyCallback {
var trust []byte
return func(hostPort string, pubKey ssh.PublicKey) error {
if bytes.Equal(trust, pubKey.Marshal()) {
return nil
}
msg := `The authenticity of host %s cannot be established.
%s key fingerprint is %s
Are you sure you want to continue connecting (yes/no)? `
fmt.Fprintf(os.Stderr, msg, hostPort, pubKey.Type(), ssh.FingerprintSHA256(pubKey))
reader := bufio.NewReader(os.Stdin)
answer, err := reader.ReadString('\n')
if err != nil {
return err
}
answer = strings.TrimRight(answer, "\r\n")
answer = strings.ToLower(answer)

if answer == "yes" || answer == "y" {
trust = pubKey.Marshal()
fmt.Fprintf(os.Stderr, "To avoid this in future add following line into your ~/.ssh/known_hosts:\n%s %s %s\n",
hostPort, pubKey.Type(), base64.StdEncoding.EncodeToString(trust))
return nil
}

return errors.New("key rejected")
}
}
Loading
Loading