Saturday, December 15, 2018

Mule 4 Integration With Cassandra

Introduction

Mule 4 Cassandra Database Connector, current version of 3.10, has made significant improvement over the previous version. It is fairly straight forward to setup the integration to the Cassandra Database using the connector. This article is an introduction of the Cassandra Connector to Cassandra cluster.

Cassandra Cluster Configuration

I have created 2 a two node cluster as shown the in following diagram. The details on how to setup the Cassandra clustering will be covered in another post. Basically, I opened the native transport port 9042, which is the default vale on both nodes.
We need to some initial setup using cqlsh too by run the following command:
$ cqlsh -u cassandra -p casandra
cassandra@cqlsh> create keyspace if not exists dev_keyspace with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 2};
The above command will create a keyspace, namely dev_keyspace. We can query the keyspaces by the following command:
cassandra@cqlsh> desc keyspaces;
You should see the following:
system_schema  system      system_distributed
system_auth    dev_keyspace  system_traces
The next step is to create emp table by the following command:
cassandra@cqlsh> create table emp (empid int primary key, emp_first varchar, emp_last varchar, emp_dept varchar);
And insert a row:
create table emp (empid int primary key, emp_first varchar, emp_last varchar, emp_dept varchar);
insert into emp (empid, emp_first, emp_last, emp_dept) values (1, 'Gary','liu','consulting');
That is all and we are ready to do the integration.

Integration Using Mule 4 Cassandra Connector

Add Cassandra Connector

First, we need to search the Exchange and add the Cassandra Connection as shown in the following snapshot:

Create CassandraDB Config

Create a mule configuration, namely, global-config.xml. Then create CasandraDB Config as the following:
Enter the General setting as the following. Note: Leave the Host empty as we use cluster configuration.
Now, click the "Advanced Settings" tab and enter the information as the following:

As you can see, we can enter the ip addresses separated by comma. In this way, we can achieve high availability from client side of view. Now, we test the connectivity. If the port of 9042 is exposed correctly, it should work fine. I will explain more in the next article on how to make sure we expose the native transport port correctly.

Create Integration Flows

Read Flow

The read flow is very straight forward. The cassandra-db:select operation uses payload as the whole query. We just need to set payload. In this case, it is "select * from emp;"

<flow name="select-objecsFlow" doc:id="8bdcc4fe-cf4e-49be-b2f9-2dfacbddaf4b" >
<http:listener doc:name="Listener" doc:id="ab57d8ab-2623-468b-b635-a0c6efd6c829" config-ref="HTTP_Listener_config" path="/cassandra/emp"/>
<set-payload value="select * from emp;" doc:name="Set Payload"  />
<cassandra-db:select doc:name="Select"  config-ref="CassandraDB_Config_cluster" />
<ee:transform doc:name="Transform Message"  >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="e6097436-1909-4d8b-8c81-27be82c11714" message="#[payload]" />
</flow>

</mule>

Insert A Row

To insert a row, we need to use insert operation as the following:

<flow name="insert-objecsFlow" doc:id="8bdcc4fe-cf4e-49be-b2f9-2dfacbddaf4b" >
<http:listener doc:name="Listener" doc:id="ab57d8ab-2623-468b-b635-a0c6efd6c829" config-ref="HTTP_Listener_config" path="/cassandra/emp/create"/>
<ee:transform doc:name="Transform Message" doc:id="0d484493-fa23-4a56-9547-58f0bb54dbd1" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<cassandra-db:insert table="emp" doc:name="Insert" doc:id="71cb7584-981a-4475-a839-ea82ed3b9832" config-ref="CassandraDB_Config_vm1" keyspaceName="rogers_dev"/>
<ee:transform doc:name="Transform Message"  >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="e6097436-1909-4d8b-8c81-27be82c11714" message="#[payload]" />

</flow>

Currently, we can only insert one row at a time. To insert multiple rows, we can use for loop or use batch processes for large volumes. 

About CassandraDB Connector

The detailed information can be found at the following Mulesoft Document:
https://docs.mulesoft.com/connectors/cassandra/cassandra-connector

Important Cassandra Information

When you using cqlsh to connect Cassandra cluster, you should notice the following:

cqlsh -u cassandra -p cassandra
Connected to DevelopmentCluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]

The v4 is the current native protocol version, which is required to configure the connector.



5 comments:

Anypoint Studio Error: The project is missing Munit lIbrary to run tests

Anypoint Studio 7.9 has a bug. Even if we following the article: https://help.mulesoft.com/s/article/The-project-is-missing-MUnit-libraries-...