From 471bb711bc654cc8ac9c4a12645b00586bda67fe Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Apr 2024 14:47:51 -0400 Subject: [PATCH 1/3] Adding example file for BigTable --- .../beam/examples/SolaceBeamBigTable.java | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java diff --git a/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java new file mode 100644 index 0000000..7870c45 --- /dev/null +++ b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java @@ -0,0 +1,179 @@ +package com.solace.connector.beam.examples; + +import com.google.cloud.bigtable.beam.CloudBigtableIO; +import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; +import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; + +import com.solace.connector.beam.SolaceIO; +import com.solace.connector.beam.examples.common.SolaceTextRecord; +import com.solacesystems.jcsmp.JCSMPProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * An example that binds to a Solace queue, consumes messages, and then writes them to BigTable. + * + * You will need to make sure there is a BigTable table with appropriate schema already created. + * + *

By default, the examples will run with the {@code DirectRunner}. To run the pipeline on + * Google Dataflow, specify: + * + *

{@code
+ * --runner=DataflowRunner
+ * }
+ *

+ */ + +public class SolaceBeamBigTable { + + private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordTest.class); + + public static void main(String[] args) { + + BigtableOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class); + + List queues = Arrays.asList(options.getSql().split(",")); + boolean useSenderMsgId = options.getSmi(); + + /** Create pipeline **/ + Pipeline p = Pipeline.create(options); + + /** Set Solace connection properties **/ + JCSMPProperties jcsmpProperties = new JCSMPProperties(); + jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip()); + jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn()); + jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu()); + jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp()); + + /** Create object for BigTable table configuration to be used later to run the pipeline **/ + CloudBigtableTableConfiguration bigtableTableConfig = + new CloudBigtableTableConfiguration.Builder() + .withProjectId(options.getBigtableProjectId()) + .withInstanceId(options.getBigtableInstanceId()) + .withTableId(options.getBigtableTableId()) + .build(); + + /* The pipeline consists of three components: + * 1. Reading message from Solace queue + * 2. Doing any necessary transformation and creating a BigTable row + * 3. Writing the row to BigTable + */ + p.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper()) + .withUseSenderTimestamp(options.getSts()) + .withAdvanceTimeoutInMillis(options.getTimeout())) + .apply(ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + + String uniqueID = UUID.randomUUID().toString(); + + Put row = new Put(Bytes.toBytes(uniqueID)); + + /** Create row that will be written to BigTable **/ + row.addColumn( + Bytes.toBytes("stats"), + null, + c.element().getPayload().getBytes(StandardCharsets.UTF_8)); + c.output(row); + } + })) + .apply(CloudBigtableIO.writeToTable(bigtableTableConfig)); + + p.run().waitUntilFinish(); + } + + public interface BigtableOptions extends DataflowPipelineOptions { + + @Description("IP and port of the client appliance. (e.g. -cip=192.168.160.101)") + String getCip(); + + void setCip(String value); + + @Description("VPN name") + String getVpn(); + + void setVpn(String value); + + @Description("Client username") + String getCu(); + + void setCu(String value); + + @Description("Client password (default '')") + @Default.String("") + String getCp(); + + void setCp(String value); + + @Description("List of queues for subscribing") + String getSql(); + + void setSql(String value); + + @Description("Enable reading sender timestamp to determine freshness of data") + @Default.Boolean(false) + boolean getSts(); + + void setSts(boolean value); + + @Description("Enable reading sender sequence number to determine duplication of data") + @Default.Boolean(false) + boolean getSmi(); + + void setSmi(boolean value); + + @Description("The timeout in milliseconds while try to receive a messages from Solace broker") + @Default.Integer(100) + int getTimeout(); + + void setTimeout(int timeoutInMillis); + + @Description("The Bigtable project ID, this can be different than your Dataflow project") + @Default.String("bigtable-project") + String getBigtableProjectId(); + + void setBigtableProjectId(String bigtableProjectId); + + @Description("The Bigtable instance ID") + @Default.String("bigtable-instance") + String getBigtableInstanceId(); + + void setBigtableInstanceId(String bigtableInstanceId); + + @Description("The Bigtable table ID in the instance.") + @Default.String("bigtable-table") + String getBigtableTableId(); + + void setBigtableTableId(String bigtableTableId); + } + + public static CloudBigtableTableConfiguration batchWriteFlowControlExample( + BigtableOptions options) { + CloudBigtableTableConfiguration bigtableTableConfig = + new CloudBigtableTableConfiguration.Builder() + .withProjectId(options.getBigtableProjectId()) + .withInstanceId(options.getBigtableInstanceId()) + .withTableId(options.getBigtableTableId()) + .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, + "true") + .build(); + return bigtableTableConfig; + } +} \ No newline at end of file From bf7d85792ec052398ae29a86d73c17df7ac8703c Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Apr 2024 15:44:22 -0400 Subject: [PATCH 2/3] Updating pom.xml for BigTable --- solace-apache-beam-samples/pom.xml | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/solace-apache-beam-samples/pom.xml b/solace-apache-beam-samples/pom.xml index e828c10..14041cf 100644 --- a/solace-apache-beam-samples/pom.xml +++ b/solace-apache-beam-samples/pom.xml @@ -61,6 +61,19 @@ beam-sdks-java-io-solace ${solace-beam.version} + + + com.google.cloud.bigtable + bigtable-hbase-beam + 2.12.0 + + + + com.google.cloud + google-cloud-bigtable + 2.37.0 + + @@ -146,7 +159,7 @@ org.apache.beam beam-runners-direct-java - runtime + import @@ -158,7 +171,7 @@ org.apache.beam beam-runners-google-cloud-dataflow-java - runtime + import From 67b59046eccd64916a142d16c8bddaa764504d12 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 5 Apr 2024 12:59:18 -0400 Subject: [PATCH 3/3] add BigTable example --- pom.xml | 2 +- solace-apache-beam-samples/pom.xml | 42 +++--- .../beam/examples/SolaceBeamBigTable.java | 129 +++++++++--------- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/pom.xml b/pom.xml index 762cc41..f637ab5 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ SolaceProducts - 2.35.0 + 2.55.0 10.13.0 1.7.25 6.37.0 diff --git a/solace-apache-beam-samples/pom.xml b/solace-apache-beam-samples/pom.xml index 14041cf..edc0655 100644 --- a/solace-apache-beam-samples/pom.xml +++ b/solace-apache-beam-samples/pom.xml @@ -30,8 +30,8 @@ Samples for the Apache Beam I/O Component for Solace PubSub+ - 2.35.0 - 1.3.0-SNAPSHOT + 2.55.0 + 1.2.0 1.6.0 3.0.2 @@ -61,19 +61,6 @@ beam-sdks-java-io-solace ${solace-beam.version} - - - com.google.cloud.bigtable - bigtable-hbase-beam - 2.12.0 - - - - com.google.cloud - google-cloud-bigtable - 2.37.0 - - @@ -122,6 +109,18 @@ + + com.google.cloud.bigtable + bigtable-hbase-beam + 2.12.0 + + + + com.google.cloud + google-cloud-bigtable + 2.37.0 + + com.solace.test.integration pubsubplus-testcontainer @@ -146,6 +145,17 @@ 3.12.0 test + + org.junit.jupiter + junit-jupiter-api + test + + + org.apache.beam + beam-vendor-guava-26_0-jre + 0.1 + compile + @@ -159,7 +169,6 @@ org.apache.beam beam-runners-direct-java - import @@ -171,7 +180,6 @@ org.apache.beam beam-runners-google-cloud-dataflow-java - import diff --git a/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java index 7870c45..7a5f380 100644 --- a/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java +++ b/solace-apache-beam-samples/src/main/java/com/solace/connector/beam/examples/SolaceBeamBigTable.java @@ -2,9 +2,9 @@ import com.google.cloud.bigtable.beam.CloudBigtableIO; import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration; -import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -43,63 +43,7 @@ public class SolaceBeamBigTable { private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordTest.class); - public static void main(String[] args) { - - BigtableOptions options = - PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class); - - List queues = Arrays.asList(options.getSql().split(",")); - boolean useSenderMsgId = options.getSmi(); - - /** Create pipeline **/ - Pipeline p = Pipeline.create(options); - - /** Set Solace connection properties **/ - JCSMPProperties jcsmpProperties = new JCSMPProperties(); - jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip()); - jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn()); - jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu()); - jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp()); - - /** Create object for BigTable table configuration to be used later to run the pipeline **/ - CloudBigtableTableConfiguration bigtableTableConfig = - new CloudBigtableTableConfiguration.Builder() - .withProjectId(options.getBigtableProjectId()) - .withInstanceId(options.getBigtableInstanceId()) - .withTableId(options.getBigtableTableId()) - .build(); - - /* The pipeline consists of three components: - * 1. Reading message from Solace queue - * 2. Doing any necessary transformation and creating a BigTable row - * 3. Writing the row to BigTable - */ - p.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper()) - .withUseSenderTimestamp(options.getSts()) - .withAdvanceTimeoutInMillis(options.getTimeout())) - .apply(ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - - String uniqueID = UUID.randomUUID().toString(); - - Put row = new Put(Bytes.toBytes(uniqueID)); - - /** Create row that will be written to BigTable **/ - row.addColumn( - Bytes.toBytes("stats"), - null, - c.element().getPayload().getBytes(StandardCharsets.UTF_8)); - c.output(row); - } - })) - .apply(CloudBigtableIO.writeToTable(bigtableTableConfig)); - - p.run().waitUntilFinish(); - } - - public interface BigtableOptions extends DataflowPipelineOptions { + public interface Options extends DataflowPipelineOptions { @Description("IP and port of the client appliance. (e.g. -cip=192.168.160.101)") String getCip(); @@ -164,16 +108,75 @@ public interface BigtableOptions extends DataflowPipelineOptions { void setBigtableTableId(String bigtableTableId); } - public static CloudBigtableTableConfiguration batchWriteFlowControlExample( - BigtableOptions options) { + private static void WriteToBigTable(Options options) throws Exception { + + List queues = Arrays.asList(options.getSql().split(",")); + boolean useSenderMsgId = options.getSmi(); + + /** Create pipeline **/ + Pipeline pipeline = Pipeline.create(options); + + /** Set Solace connection properties **/ + JCSMPProperties jcsmpProperties = new JCSMPProperties(); + jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip()); + jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn()); + jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu()); + jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp()); + + /** Create object for BigTable table configuration to be used later to run the pipeline **/ CloudBigtableTableConfiguration bigtableTableConfig = new CloudBigtableTableConfiguration.Builder() .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) .withTableId(options.getBigtableTableId()) - .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL, - "true") .build(); - return bigtableTableConfig; + + /* The pipeline consists of three components: + * 1. Reading message from Solace queue + * 2. Doing any necessary transformation and creating a BigTable row + * 3. Writing the row to BigTable + */ + pipeline.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper()) + .withUseSenderTimestamp(options.getSts()) + .withAdvanceTimeoutInMillis(options.getTimeout())) + .apply("Map to BigTable row", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + + String uniqueID = UUID.randomUUID().toString(); + + Put row = new Put(Bytes.toBytes(uniqueID)); + + /** Create row that will be written to BigTable **/ + row.addColumn( + Bytes.toBytes("stats"), + null, + c.element().getPayload().getBytes(StandardCharsets.UTF_8)); + c.output(row); + } + })) + .apply("Write to BigTable", + CloudBigtableIO.writeToTable(bigtableTableConfig)); + + PipelineResult result = pipeline.run(); + + try { + result.waitUntilFinish(); + } catch (Exception exc) { + result.cancel(); + } + + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SolaceBeamBigTable.Options.class); + + try { + WriteToBigTable(options); + } catch (Exception e) { + e.printStackTrace(); + } } } \ No newline at end of file