Skip to content

Commit

Permalink
add BigTable example
Browse files Browse the repository at this point in the history
  • Loading branch information
himoacs committed Apr 5, 2024
1 parent bf7d857 commit 67b5904
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 81 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<properties>
<repoName>SolaceProducts</repoName>

<beam.version>2.35.0</beam.version>
<beam.version>2.55.0</beam.version>
<jcsmp.version>10.13.0</jcsmp.version>
<slf4j.version>1.7.25</slf4j.version>
<pmd.version>6.37.0</pmd.version>
Expand Down
42 changes: 25 additions & 17 deletions solace-apache-beam-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
<description>Samples for the Apache Beam I/O Component for Solace PubSub+</description>

<properties>
<beam.version>2.35.0</beam.version>
<solace-beam.version>1.3.0-SNAPSHOT</solace-beam.version>
<beam.version>2.55.0</beam.version>
<solace-beam.version>1.2.0</solace-beam.version>

<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
Expand Down Expand Up @@ -61,19 +61,6 @@
<artifactId>beam-sdks-java-io-solace</artifactId>
<version>${solace-beam.version}</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-beam</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>2.37.0</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -122,6 +109,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-beam</artifactId>
<version>2.12.0</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>2.37.0</version>
</dependency>

<dependency>
<groupId>com.solace.test.integration</groupId>
<artifactId>pubsubplus-testcontainer</artifactId>
Expand All @@ -146,6 +145,17 @@
<version>3.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-vendor-guava-26_0-jre</artifactId>
<version>0.1</version>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -159,7 +169,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<scope>import</scope>
</dependency>
</dependencies>
</profile>
Expand All @@ -171,7 +180,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<scope>import</scope>
</dependency>
</dependencies>
</profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -43,63 +43,7 @@ public class SolaceBeamBigTable {

private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordTest.class);

public static void main(String[] args) {

BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);

List<String> queues = Arrays.asList(options.getSql().split(","));
boolean useSenderMsgId = options.getSmi();

/** Create pipeline **/
Pipeline p = Pipeline.create(options);

/** Set Solace connection properties **/
JCSMPProperties jcsmpProperties = new JCSMPProperties();
jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip());
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn());
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu());
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp());

/** Create object for BigTable table configuration to be used later to run the pipeline **/
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.build();

/* The pipeline consists of three components:
* 1. Reading message from Solace queue
* 2. Doing any necessary transformation and creating a BigTable row
* 3. Writing the row to BigTable
*/
p.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper())
.withUseSenderTimestamp(options.getSts())
.withAdvanceTimeoutInMillis(options.getTimeout()))
.apply(ParDo.of(
new DoFn<SolaceTextRecord, Mutation>() {
@ProcessElement
public void processElement(ProcessContext c) {

String uniqueID = UUID.randomUUID().toString();

Put row = new Put(Bytes.toBytes(uniqueID));

/** Create row that will be written to BigTable **/
row.addColumn(
Bytes.toBytes("stats"),
null,
c.element().getPayload().getBytes(StandardCharsets.UTF_8));
c.output(row);
}
}))
.apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

p.run().waitUntilFinish();
}

public interface BigtableOptions extends DataflowPipelineOptions {
public interface Options extends DataflowPipelineOptions {

@Description("IP and port of the client appliance. (e.g. -cip=192.168.160.101)")
String getCip();
Expand Down Expand Up @@ -164,16 +108,75 @@ public interface BigtableOptions extends DataflowPipelineOptions {
void setBigtableTableId(String bigtableTableId);
}

public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
BigtableOptions options) {
private static void WriteToBigTable(Options options) throws Exception {

List<String> queues = Arrays.asList(options.getSql().split(","));
boolean useSenderMsgId = options.getSmi();

/** Create pipeline **/
Pipeline pipeline = Pipeline.create(options);

/** Set Solace connection properties **/
JCSMPProperties jcsmpProperties = new JCSMPProperties();
jcsmpProperties.setProperty(JCSMPProperties.HOST, options.getCip());
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, options.getVpn());
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, options.getCu());
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, options.getCp());

/** Create object for BigTable table configuration to be used later to run the pipeline **/
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
"true")
.build();
return bigtableTableConfig;

/* The pipeline consists of three components:
* 1. Reading message from Solace queue
* 2. Doing any necessary transformation and creating a BigTable row
* 3. Writing the row to BigTable
*/
pipeline.apply(SolaceIO.read(jcsmpProperties, queues, SolaceTextRecord.getCoder(), SolaceTextRecord.getMapper())
.withUseSenderTimestamp(options.getSts())
.withAdvanceTimeoutInMillis(options.getTimeout()))
.apply("Map to BigTable row",
ParDo.of(
new DoFn<SolaceTextRecord, Mutation>() {
@ProcessElement
public void processElement(ProcessContext c) {

String uniqueID = UUID.randomUUID().toString();

Put row = new Put(Bytes.toBytes(uniqueID));

/** Create row that will be written to BigTable **/
row.addColumn(
Bytes.toBytes("stats"),
null,
c.element().getPayload().getBytes(StandardCharsets.UTF_8));
c.output(row);
}
}))
.apply("Write to BigTable",
CloudBigtableIO.writeToTable(bigtableTableConfig));

PipelineResult result = pipeline.run();

try {
result.waitUntilFinish();
} catch (Exception exc) {
result.cancel();
}

}

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(SolaceBeamBigTable.Options.class);

try {
WriteToBigTable(options);
} catch (Exception e) {
e.printStackTrace();
}
}
}

0 comments on commit 67b5904

Please sign in to comment.