Skip to content

Commit

Permalink
onRecord: support for multiple returns
Browse files Browse the repository at this point in the history
onRecord: support for specifying non-value fields on record
ui: add partitioner option

also remove dofile/loadfile
  • Loading branch information
serprex committed Mar 20, 2024
1 parent 8ab3fd2 commit ff10e12
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 37 deletions.
109 changes: 96 additions & 13 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"strings"
"sync"
"unsafe"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
Expand All @@ -31,12 +32,37 @@ type KafkaConnector struct {
logger log.Logger
}

func unsafeFastStringToReadOnlyBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s))
}

func LVAsReadOnlyBytes(ls *lua.LState, v lua.LValue) ([]byte, error) {
str, err := LVAsStringOrNil(ls, v)
if err != nil {
return nil, err
} else if str == "" {
return nil, nil
} else {
return unsafeFastStringToReadOnlyBytes(str), nil
}
}

func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) {
if lstr, ok := v.(lua.LString); ok {
return string(lstr), nil
} else if v == lua.LNil {
return "", nil
} else {
return "", fmt.Errorf("invalid bytes, must be nil or string: %s", v)
}
}

func NewKafkaConnector(
ctx context.Context,
config *protos.KafkaConfig,
) (*KafkaConnector, error) {
optionalOpts := append(
make([]kgo.Opt, 0, 6),
make([]kgo.Opt, 0, 7),
kgo.SeedBrokers(config.Servers...),
kgo.AllowAutoTopicCreation(),
kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx
Expand All @@ -45,6 +71,18 @@ func NewKafkaConnector(
if !config.DisableTls {
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}))
}
switch config.Partitioner {
case "LeastBackup":
optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.LeastBackupPartitioner()))
case "Manual":
optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.ManualPartitioner()))
case "RoundRobin":
optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.RoundRobinPartitioner()))
case "StickyKey":
optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.StickyKeyPartitioner(nil)))
case "Sticky":
optionalOpts = append(optionalOpts, kgo.RecordPartitioner(kgo.StickyPartitioner()))
}
if config.Username != "" {
switch config.Sasl {
case "PLAIN":
Expand Down Expand Up @@ -194,24 +232,69 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}
ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, 1, nil)
err := ls.PCall(1, -1, nil)
if err != nil {
return nil, fmt.Errorf("script failed: %w", err)
}
value := ls.Get(-1)
ls.SetTop(0)
if value != lua.LNil {
lstr, ok := value.(lua.LString)
if !ok {
return nil, fmt.Errorf("script returned non-nil non-string: %s", value)
args := ls.GetTop()
for i := range args {
value := ls.Get(i - args)
var kr *kgo.Record
switch v := value.(type) {
case lua.LString:
kr = kgo.StringRecord(string(v))
case *lua.LTable:
key, err := LVAsReadOnlyBytes(ls, ls.GetField(v, "key"))
if err != nil {
return nil, fmt.Errorf("invalid key, %w", err)
}
value, err := LVAsReadOnlyBytes(ls, ls.GetField(v, "value"))
if err != nil {
return nil, fmt.Errorf("invalid value, %w", err)
}
topic, err := LVAsStringOrNil(ls, ls.GetField(v, "topic"))
if err != nil {
return nil, fmt.Errorf("invalid topic, %w", err)
}
partition := int32(lua.LVAsNumber(ls.GetField(v, "partition")))
kr = &kgo.Record{
Key: key,
Value: value,
Topic: topic,
Partition: partition,
}
lheaders := ls.GetField(v, "headers")
if headers, ok := lheaders.(*lua.LTable); ok {
headers.ForEach(func(k, v lua.LValue) {
kstr := k.String()
vbytes, err := LVAsReadOnlyBytes(ls, v)
if err != nil {
vbytes = unsafeFastStringToReadOnlyBytes(err.Error())
}
kr.Headers = append(kr.Headers, kgo.RecordHeader{
Key: kstr,
Value: vbytes,
})
})
} else if lua.LVAsBool(lheaders) {
return nil, fmt.Errorf("invalid headers, must be nil or table: %s", lheaders)
}
case *lua.LNilType:
default:
return nil, fmt.Errorf("script returned invalid value: %s", value)
}
if kr != nil {
if kr.Topic == "" {
kr.Topic = record.GetDestinationTableName()
}

wg.Add(1)
topic := record.GetDestinationTableName()
c.client.Produce(wgCtx, &kgo.Record{Topic: topic, Value: []byte(lstr)}, produceCb)
numRecords += 1
tableNameRowsMapping[topic] += 1
wg.Add(1)
c.client.Produce(wgCtx, kr, produceCb)
numRecords += 1
tableNameRowsMapping[kr.Topic] += 1
}
}
ls.SetTop(0)
}

waitChan := make(chan struct{})
Expand Down
3 changes: 3 additions & 0 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ var (
)

func RegisterTypes(ls *lua.LState) {
ls.Env.RawSetString("loadfile", lua.LNil)
ls.Env.RawSetString("dofile", lua.LNil)

// gopher-lua provides 2 loaders {preload, file}
// overwrite file loader with one retrieving scripts from database
loaders := ls.G.Registry.RawGetString("_LOADERS").(*lua.LTable)
Expand Down
14 changes: 11 additions & 3 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,15 +830,23 @@ fn parse_db_options(
.collect::<Vec<_>>(),
username: opts
.get("user")
.context("no username specified")?
.cloned()
.unwrap_or_default()
.to_string(),
password: opts
.get("password")
.context("no password specified")?
.cloned()
.unwrap_or_default()
.to_string(),
sasl: opts
.get("sasl_mechanism")
.context("no sasl mechanism specified")?
.cloned()
.unwrap_or_default()
.to_string(),
partitioner: opts
.get("sasl_mechanism")
.cloned()
.unwrap_or_default()
.to_string(),
disable_tls: opts
.get("disable_tls")
Expand Down
1 change: 1 addition & 0 deletions protos/peers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ message KafkaConfig {
string password = 3;
string sasl = 4;
bool disable_tls = 5;
string partitioner = 6;
}

enum DBType {
Expand Down
2 changes: 2 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export interface PeerSetting {
tips?: string;
helpfulLink?: string;
default?: string | number;
placeholder?: string;
options?: { value: string; label: string }[];
}

export const getBlankSetting = (dbType: string): PeerConfig => {
Expand Down
23 changes: 23 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/ka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,30 @@ export const kaSetting: PeerSetting[] = [
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, sasl: value as string })),
type: 'select',
placeholder: 'Select a mechanism',
helpfulLink:
'https://docs.redpanda.com/current/manage/security/authentication/#scram',
options: [
{ value: 'PLAIN', label: 'PLAIN' },
{ value: 'SCRAM-SHA-256', label: 'SCRAM-SHA-256' },
{ value: 'SCRAM-SHA-512', label: 'SCRAM-SHA-512' },
],
},
{
label: 'Partitioner',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, partitioner: value as string })),
type: 'select',
placeholder: 'Select a partitioner',
helpfulLink:
'https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo#Partitioner',
options: [
{ value: 'LeastBackup', label: 'Least Backup' },
{ value: 'Manual', label: 'Manual' },
{ value: 'RoundRobin', label: 'Round Robin' },
{ value: 'StickyKey', label: 'Sticky Key' },
{ value: 'Sticky', label: 'Sticky' },
],
},
{
label: 'Disable TLS?',
Expand All @@ -46,5 +68,6 @@ export const blankKaSetting: KafkaConfig = {
username: '',
password: '',
sasl: 'PLAIN',
partitioner: '',
disableTls: false,
};
27 changes: 15 additions & 12 deletions ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,21 @@ export const kaSchema = z.object({
username: z.string().optional(),
password: z.string().optional(),
sasl: z
.union(
[
z.literal('PLAIN'),
z.literal('SCRAM-SHA-256'),
z.literal('SCRAM-SHA-512'),
],
{
errorMap: () => ({
message: 'One of PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 is required.',
}),
}
)
.union([
z.literal('PLAIN'),
z.literal('SCRAM-SHA-256'),
z.literal('SCRAM-SHA-512'),
])
.optional(),
partitioner: z
.union([
z.literal('Default'),
z.literal('LeastBackup'),
z.literal('Manual'),
z.literal('RoundRobin'),
z.literal('StickyKey'),
z.literal('Sticky'),
])
.optional(),
disableTls: z.boolean().optional(),
});
Expand Down
12 changes: 3 additions & 9 deletions ui/components/PeerForms/KafkaConfig.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ interface KafkaProps {
setter: PeerSetter;
}

const saslOptions = [
{ value: 'PLAIN', label: 'PLAIN' },
{ value: 'SCRAM-SHA-256', label: 'SCRAM-SHA-256' },
{ value: 'SCRAM-SHA-512', label: 'SCRAM-SHA-512' },
];

const KafkaForm = ({ setter }: KafkaProps) => {
return (
<div>
Expand Down Expand Up @@ -55,15 +49,15 @@ const KafkaForm = ({ setter }: KafkaProps) => {
/>
) : setting.type === 'select' ? (
<RowWithSelect
label={<Label>SASL Mechanism</Label>}
label={<Label>{setting.label}</Label>}
action={
<ReactSelect
key={index}
placeholder='Select a mechanism'
placeholder={setting.placeholder}
onChange={(val) =>
val && setting.stateHandler(val.value, setter)
}
options={saslOptions}
options={setting.options}
theme={SelectTheme}
/>
}
Expand Down

0 comments on commit ff10e12

Please sign in to comment.