Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Feb 14, 2023
2 parents 5c403ba + a99436b commit 72a353a
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 488 deletions.
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.28 h1:ndAExarwr5Y+GaHE6VCaY1kyS/HwwGGyuimVhWsHOEM=
github.com/Azure/go-autorest/autorest v0.11.28/go.mod h1:MrkzG3Y3AH668QyF9KRk5neJnGgmhQ6krbhR8Q5eMvA=
github.com/Azure/go-autorest/autorest/adal v0.9.18 h1:kLnPsRjzZZUF3K5REu/Kc+qMQrvuza2bwSnNdhmzLfQ=
github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ=
github.com/Azure/go-autorest/autorest/adal v0.9.22 h1:/GblQdIudfEM3AWWZ0mrYJQSd7JS4S/Mbzh6F0ov0Xc=
github.com/Azure/go-autorest/autorest/adal v0.9.22/go.mod h1:XuAbAEUv2Tta//+voMI038TrJBqjKam0me7qR+L8Cmk=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
Expand Down
106 changes: 62 additions & 44 deletions kusto/conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package kusto

// conn.go holds the connection to the Kusto server and provides methods to do queries
// Conn.go holds the connection to the Kusto server and provides methods to do queries
// and receive Kusto frames back.

import (
Expand Down Expand Up @@ -33,18 +33,18 @@ var bufferPool = sync.Pool{
},
}

// conn provides connectivity to a Kusto instance.
type conn struct {
endpoint string
auth Authorization
endMgmt, endQuery, streamQuery *url.URL
client *http.Client
endpointValidated atomic.Bool
clientDetails *ClientDetails
// Conn provides connectivity to a Kusto instance.
type Conn struct {
endpoint string
auth Authorization
endMgmt, endQuery, endStreamIngest *url.URL
client *http.Client
endpointValidated atomic.Bool
clientDetails *ClientDetails
}

// newConn returns a new conn object with an injected http.Client
func newConn(endpoint string, auth Authorization, client *http.Client, clientDetails *ClientDetails) (*conn, error) {
// NewConn returns a new Conn object with an injected http.Client
func NewConn(endpoint string, auth Authorization, client *http.Client, clientDetails *ClientDetails) (*Conn, error) {
if !validURL.MatchString(endpoint) {
return nil, errors.ES(errors.OpServConn, errors.KClientArgs, "endpoint is not valid(%s), should be https://<cluster name>.*", endpoint).SetNoRetry()
}
Expand All @@ -54,13 +54,13 @@ func newConn(endpoint string, auth Authorization, client *http.Client, clientDet
return nil, errors.ES(errors.OpServConn, errors.KClientArgs, "could not parse the endpoint(%s): %s", endpoint, err).SetNoRetry()
}

c := &conn{
auth: auth,
endMgmt: &url.URL{Scheme: "https", Host: u.Host, Path: "/v1/rest/mgmt"},
endQuery: &url.URL{Scheme: "https", Host: u.Host, Path: "/v2/rest/query"},
streamQuery: &url.URL{Scheme: "https", Host: u.Host, Path: "/v1/rest/ingest/"},
client: client,
clientDetails: clientDetails,
c := &Conn{
auth: auth,
endMgmt: &url.URL{Scheme: "https", Host: u.Host, Path: "/v1/rest/mgmt"},
endQuery: &url.URL{Scheme: "https", Host: u.Host, Path: "/v2/rest/query"},
endStreamIngest: &url.URL{Scheme: "https", Host: u.Host, Path: "/v1/rest/ingest/"},
client: client,
clientDetails: clientDetails,
}

return c, nil
Expand All @@ -79,7 +79,7 @@ type connOptions struct {

// query makes a query for the purpose of extracting data from Kusto. Context can be used to set
// a timeout or cancel the query. Queries cannot take longer than 5 minutes.
func (c *conn) query(ctx context.Context, db string, query Stmt, options *queryOptions) (execResp, error) {
func (c *Conn) query(ctx context.Context, db string, query Stmt, options *queryOptions) (execResp, error) {
if strings.HasPrefix(strings.TrimSpace(query.String()), ".") {
return execResp{}, errors.ES(errors.OpQuery, errors.KClientArgs, "a Stmt to Query() cannot begin with a period(.), only Mgmt() calls can do that").SetNoRetry()
}
Expand All @@ -88,11 +88,11 @@ func (c *conn) query(ctx context.Context, db string, query Stmt, options *queryO
}

// mgmt is used to do management queries to Kusto.
func (c *conn) mgmt(ctx context.Context, db string, query Stmt, options *mgmtOptions) (execResp, error) {
func (c *Conn) mgmt(ctx context.Context, db string, query Stmt, options *mgmtOptions) (execResp, error) {
return c.execute(ctx, execMgmt, db, query, *options.requestProperties)
}

func (c *conn) queryToJson(ctx context.Context, db string, query Stmt, options *queryOptions) (string, error) {
func (c *Conn) queryToJson(ctx context.Context, db string, query Stmt, options *queryOptions) (string, error) {
_, _, _, body, e := c.doRequest(ctx, execQuery, db, query, *options.requestProperties)
if e != nil {
return "", e
Expand All @@ -114,7 +114,7 @@ type execResp struct {
frameCh chan frames.Frame
}

func (c *conn) execute(ctx context.Context, execType int, db string, query Stmt, properties requestProperties) (execResp, error) {
func (c *Conn) execute(ctx context.Context, execType int, db string, query Stmt, properties requestProperties) (execResp, error) {
op, reqHeader, respHeader, body, e := c.doRequest(ctx, execType, db, query, properties)
if e != nil {
return execResp{}, e
Expand All @@ -135,7 +135,7 @@ func (c *conn) execute(ctx context.Context, execType int, db string, query Stmt,
return execResp{reqHeader: reqHeader, respHeader: respHeader, frameCh: frameCh}, nil
}

func (c *conn) doRequest(ctx context.Context, execType int, db string, query Stmt, properties requestProperties) (errors.Op, http.Header, http.Header,
func (c *Conn) doRequest(ctx context.Context, execType int, db string, query Stmt, properties requestProperties) (errors.Op, http.Header, http.Header,
io.ReadCloser, error) {
err := c.validateEndpoint()
var op errors.Op
Expand All @@ -145,9 +145,8 @@ func (c *conn) doRequest(ctx context.Context, execType int, db string, query Stm
op = errors.OpMgmt
}

header := c.getHeaders(properties)

var endpoint *url.URL

buff := bufferPool.Get().(*bytes.Buffer)
buff.Reset()
defer bufferPool.Put(buff)
Expand All @@ -173,40 +172,53 @@ func (c *conn) doRequest(ctx context.Context, execType int, db string, query Stm
return 0, nil, nil, nil, errors.ES(op, errors.KInternal, "internal error: did not understand the type of execType: %d", execType)
}

headers := c.getHeaders(properties)
responseHeaders, closer, err := c.doRequestImpl(ctx, op, endpoint, io.NopCloser(buff), headers, fmt.Sprintf("With query: %s", query.String()))
return op, headers, responseHeaders, closer, err
}

func (c *Conn) doRequestImpl(
ctx context.Context,
op errors.Op,
endpoint *url.URL,
buff io.ReadCloser,
headers http.Header,
errorContext string) (http.Header, io.ReadCloser, error) {

if c.auth.TokenProvider != nil && c.auth.TokenProvider.AuthorizationRequired() {
c.auth.TokenProvider.SetHttp(c.client)
token, tokenType, tkerr := c.auth.TokenProvider.AcquireToken(ctx)
if tkerr != nil {
return 0, nil, nil, nil, errors.ES(op, errors.KInternal, "Error while getting token : %s", tkerr)
return nil, nil, errors.ES(op, errors.KInternal, "Error while getting token : %s", tkerr)
}
header.Add("Authorization", fmt.Sprintf("%s %s", tokenType, token))
headers.Add("Authorization", fmt.Sprintf("%s %s", tokenType, token))
}

req := &http.Request{
Method: http.MethodPost,
URL: endpoint,
Header: header,
Body: io.NopCloser(buff),
Header: headers,
Body: buff,
}

resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
// TODO(jdoak): We need a http error unwrap function that pulls out an *errors.Error.
return 0, nil, nil, nil, errors.E(op, errors.KHTTPError, fmt.Errorf("with query %q: %w", query.String(), err))
return nil, nil, errors.E(op, errors.KHTTPError, fmt.Errorf("%v, %w", errorContext, err))
}

body, err := response.TranslateBody(resp, op)
if err != nil {
return 0, nil, nil, nil, err
return nil, nil, err
}

if resp.StatusCode != http.StatusOK {
return 0, nil, nil, nil, errors.HTTP(op, resp.Status, resp.StatusCode, body, fmt.Sprintf("error from Kusto endpoint for query %q: ", query.String()))
return nil, nil, errors.HTTP(op, resp.Status, resp.StatusCode, body, fmt.Sprintf("error from Kusto endpoint, %v", errorContext))
}
return op, header, resp.Header, body, nil
return resp.Header, body, nil
}

func (c *conn) validateEndpoint() error {
func (c *Conn) validateEndpoint() error {
if !c.endpointValidated.Load() {
var err error
if cloud, err := GetMetadata(c.endpoint, c.client); err == nil {
Expand All @@ -222,36 +234,42 @@ func (c *conn) validateEndpoint() error {
return nil
}

func (c *conn) getHeaders(properties requestProperties) http.Header {
const ClientRequestIdHeader = "x-ms-client-request-id"
const ApplicationHeader = "x-ms-app"
const UserHeader = "x-ms-user"
const ClientVersionHeader = "x-ms-client-version"

func (c *Conn) getHeaders(properties requestProperties) http.Header {
header := http.Header{}
header.Add("Accept", "application/json")
header.Add("Accept-Encoding", "gzip")
header.Add("Accept-Encoding", "gzip, deflate")
header.Add("Content-Type", "application/json; charset=utf-8")
header.Add("Connection", "Keep-Alive")
header.Add("x-ms-version", "2019-02-13")

if properties.ClientRequestID != "" {
header.Add("x-ms-client-request-id", properties.ClientRequestID)
header.Add(ClientRequestIdHeader, properties.ClientRequestID)
} else {
header.Add("x-ms-client-request-id", "KGC.execute;"+uuid.New().String())
header.Add(ClientRequestIdHeader, "KGC.execute;"+uuid.New().String())
}

if properties.Application != "" {
header.Add("x-ms-app", properties.Application)
header.Add(ApplicationHeader, properties.Application)
} else {
header.Add("x-ms-app", c.clientDetails.ApplicationForTracing())
header.Add(ApplicationHeader, c.clientDetails.ApplicationForTracing())
}

if properties.User != "" {
header.Add("x-ms-user", properties.User)
header.Add(UserHeader, properties.User)
} else {
header.Add("x-ms-user", c.clientDetails.UserNameForTracing())
header.Add(UserHeader, c.clientDetails.UserNameForTracing())
}

header.Add("x-ms-client-version", c.clientDetails.ClientVersionForTracing())
header.Add(ClientVersionHeader, c.clientDetails.ClientVersionForTracing())
return header
}

func (c *conn) Close() error {
func (c *Conn) Close() error {
c.client.CloseIdleConnections()
return nil
}
61 changes: 61 additions & 0 deletions kusto/conn_streaming_ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package kusto

import (
"context"
"fmt"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/google/uuid"
"io"
"net/url"
)

type DataFormatForStreaming interface {
CamelCase() string
KnownOrDefault() DataFormatForStreaming
}

func (c *Conn) StreamIngest(ctx context.Context, db, table string, payload io.Reader, format DataFormatForStreaming, mappingName string, clientRequestId string) error {
streamUrl, err := url.Parse(c.endStreamIngest.String())
if err != nil {
return errors.ES(errors.OpIngestStream, errors.KClientArgs, "could not parse the stream endpoint(%s): %s", c.endStreamIngest.String(), err).SetNoRetry()
}
path, err := url.JoinPath(streamUrl.Path, db, table)
if err != nil {
return errors.ES(errors.OpIngestStream, errors.KClientArgs, "could not join the stream endpoint(%s) with the db(%s) and table(%s): %s", c.endStreamIngest.String(), db, table, err).SetNoRetry()
}
streamUrl.Path = path

qv := url.Values{}
if mappingName != "" {
qv.Add("mappingName", mappingName)
}
qv.Add("streamFormat", format.KnownOrDefault().CamelCase())
streamUrl.RawQuery = qv.Encode()

var closeablePayload io.ReadCloser
var ok bool
if closeablePayload, ok = payload.(io.ReadCloser); !ok {
closeablePayload = io.NopCloser(payload)
}

if clientRequestId == "" {
clientRequestId = "KGC.executeStreaming;" + uuid.New().String()
}

properties := requestProperties{}
properties.ClientRequestID = clientRequestId
headers := c.getHeaders(properties)
headers.Del("Content-Type")
headers.Add("Content-Encoding", "gzip")

_, body, err := c.doRequestImpl(ctx, errors.OpIngestStream, streamUrl, closeablePayload, headers, fmt.Sprintf("With db: %s, table: %s, mappingName: %s, clientRequestId: %s", db, table, mappingName, clientRequestId))
if body != nil {
body.Close()
}

if err != nil {
return errors.ES(errors.OpIngestStream, errors.KHTTPError, "streaming ingestion failed: endpoint(%s): %s", streamUrl.String(), err)
}

return nil
}
2 changes: 1 addition & 1 deletion kusto/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestHeaders(t *testing.T) {
client, err := New(kcsb)
require.NoError(t, err)

headers := client.conn.(*conn).getHeaders(*opts.requestProperties)
headers := client.conn.(*Conn).getHeaders(*opts.requestProperties)

if tt.expectedApplication != "" {
assert.Equal(t, tt.expectedApplication, headers.Get("x-ms-app"))
Expand Down
11 changes: 11 additions & 0 deletions kusto/ingest/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ingest

import (
"strings"
)

const ingestPrefix = "ingest-"

func removeIngestPrefix(s string) string {
return strings.Replace(s, ingestPrefix, "", 1)
}
8 changes: 4 additions & 4 deletions kusto/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"bytes"
"context"
"fmt"
"github.com/Azure/azure-kusto-go/kusto"
"io"
"sync"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/queued"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/resources"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/streaming_ingest"
"github.com/google/uuid"
)

Expand All @@ -32,7 +32,7 @@ type Ingestion struct {
fs queued.Queued

connMu sync.Mutex
streamConn *conn.Conn
streamConn streamIngestor

bufferSize int
maxBuffers int
Expand Down Expand Up @@ -225,15 +225,15 @@ func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataForma
return err
}

func (i *Ingestion) getStreamConn() (*conn.Conn, error) {
func (i *Ingestion) getStreamConn() (streamIngestor, error) {
i.connMu.Lock()
defer i.connMu.Unlock()

if i.streamConn != nil {
return i.streamConn, nil
}

sc, err := conn.New(i.client.Endpoint(), i.client.Auth(), i.client.HttpClient(), i.client.ClientDetails())
sc, err := kusto.NewConn(removeIngestPrefix(i.client.Endpoint()), i.client.Auth(), i.client.HttpClient(), i.client.ClientDetails())
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions kusto/ingest/internal/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/Azure/azure-kusto-go/kusto"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -174,6 +175,14 @@ func (d DataFormat) CamelCase() string {
return ""
}

func (d DataFormat) KnownOrDefault() kusto.DataFormatForStreaming {
if d == DFUnknown {
return CSV
}

return d
}

// MarshalJSON implements json.Marshaler.MarshalJSON.
func (d DataFormat) MarshalJSON() ([]byte, error) {
if d == 0 {
Expand Down
Loading

0 comments on commit 72a353a

Please sign in to comment.