From 43f702b0dd9e0ac174fa037c081a387381ff4763 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Luis=20L=C3=B3pez=20S=C3=A1nchez?= Date: Wed, 13 Nov 2024 09:53:02 +0100 Subject: [PATCH] sql_insert: support keyword options to add before the INTO clause of the query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pedro Luis López Sánchez --- .../components/pages/outputs/sql_insert.adoc | 17 +++++++++++++++++ .../components/pages/processors/sql_insert.adoc | 17 +++++++++++++++++ internal/impl/sql/output_sql_insert.go | 13 +++++++++++++ internal/impl/sql/processor_sql_insert.go | 15 ++++++++++++++- 4 files changed, 61 insertions(+), 1 deletion(-) diff --git a/docs/modules/components/pages/outputs/sql_insert.adoc b/docs/modules/components/pages/outputs/sql_insert.adoc index 4a42df7d3d..108401e1a0 100644 --- a/docs/modules/components/pages/outputs/sql_insert.adoc +++ b/docs/modules/components/pages/outputs/sql_insert.adoc @@ -69,6 +69,7 @@ output: args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (required) prefix: "" # No default (optional) suffix: ON CONFLICT (name) DO NOTHING # No default (optional) + options: [] # No default (optional) max_in_flight: 64 init_files: [] # No default (optional) init_statement: | # No default (optional) @@ -276,6 +277,22 @@ An optional suffix to append to the insert query. suffix: ON CONFLICT (name) DO NOTHING ``` +=== `options` + +A list of keyword options to add before the INTO clause of the query. + + +*Type*: `array` + + +```yml +# Examples + +options: + - DELAYED + - IGNORE +``` + === `max_in_flight` The maximum number of inserts to run in parallel. diff --git a/docs/modules/components/pages/processors/sql_insert.adoc b/docs/modules/components/pages/processors/sql_insert.adoc index 87f3aa0618..45da5b2cd3 100644 --- a/docs/modules/components/pages/processors/sql_insert.adoc +++ b/docs/modules/components/pages/processors/sql_insert.adoc @@ -61,6 +61,7 @@ sql_insert: args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (required) prefix: "" # No default (optional) suffix: ON CONFLICT (name) DO NOTHING # No default (optional) + options: [] # No default (optional) init_files: [] # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS some_table ( @@ -264,6 +265,22 @@ An optional suffix to append to the insert query. suffix: ON CONFLICT (name) DO NOTHING ``` +=== `options` + +A list of keyword options to add before the INTO clause of the query. + + +*Type*: `array` + + +```yml +# Examples + +options: + - DELAYED + - IGNORE +``` + === `init_files` An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star). diff --git a/internal/impl/sql/output_sql_insert.go b/internal/impl/sql/output_sql_insert.go index 2f74d6cc8b..b08a95794f 100644 --- a/internal/impl/sql/output_sql_insert.go +++ b/internal/impl/sql/output_sql_insert.go @@ -55,6 +55,11 @@ func sqlInsertOutputConfig() *service.ConfigSpec { Optional(). Advanced(). Example("ON CONFLICT (name) DO NOTHING")). + Field(service.NewStringListField("options"). + Description("A list of keyword options to add before the INTO clause of the query."). + Optional(). + Advanced(). + Example([]string{"DELAYED", "IGNORE"})). Field(service.NewIntField("max_in_flight"). Description("The maximum number of inserts to run in parallel."). Default(64)) @@ -198,6 +203,14 @@ func newSQLInsertOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resou s.builder = s.builder.Suffix(suffixStr) } + if conf.Contains("options") { + options, err := conf.FieldStringList("options") + if err != nil { + return nil, err + } + s.builder = s.builder.Options(options...) + } + if s.connSettings, err = connSettingsFromParsed(conf, mgr); err != nil { return nil, err } diff --git a/internal/impl/sql/processor_sql_insert.go b/internal/impl/sql/processor_sql_insert.go index 2798f1053e..05c5734a32 100644 --- a/internal/impl/sql/processor_sql_insert.go +++ b/internal/impl/sql/processor_sql_insert.go @@ -56,7 +56,12 @@ If the insert fails to execute then the message will still remain unchanged and Description("An optional suffix to append to the insert query."). Optional(). Advanced(). - Example("ON CONFLICT (name) DO NOTHING")) + Example("ON CONFLICT (name) DO NOTHING")). + Field(service.NewStringListField("options"). + Description("A list of keyword options to add before the INTO clause of the query."). + Optional(). + Advanced(). + Example([]string{"DELAYED", "IGNORE"})) for _, f := range connFields() { spec = spec.Field(f) @@ -187,6 +192,14 @@ func NewSQLInsertProcessorFromConfig(conf *service.ParsedConfig, mgr *service.Re s.builder = s.builder.Suffix(suffixStr) } + if conf.Contains("options") { + options, err := conf.FieldStringList("options") + if err != nil { + return nil, err + } + s.builder = s.builder.Options(options...) + } + connSettings, err := connSettingsFromParsed(conf, mgr) if err != nil { return nil, err