Monday, August 31, 2015

Mule Application Development Using MySQL and ActiveMQ Connectors

Introduction

In this blog, I am going to explain how to use Mule ESB to perform integration with database and messaging brokers. I use MySQL as database engine and ActiveMQ as message broker.
The youtube video now available at https://www.youtube.com/watch?v=DAsFv045nlw&feature=youtu.be






Here is the use case:
  1. A user sends HTTP GET request to retrieve customer information based on company name
  2. A Mule flow will retrieve all the records from database table named Customer
  3. Send each record to JMS Queue
  4. Return use the counter of total records
  5. handle exceptions

Infrastructure Setup

In order achieve this, we need to setup MySQL database. Also we need to setup ActiveMQ.

Setup MySQL

Here is the procedures to setup MySQL on MacBook Pro:
  • Down load MySQL dmg
  • Install it
  • Start the mysqld by run: sudo $MYSQL_INSTALLATION/mysql/support-files/mysql.server start
After start MySQL server, you need to do the following:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$mysql -u root
 
mysql> create user 'mule'@'localhost' identified by 'mule';
 
mysql> create database dataformule;
 
$mysql -u mule -p dataformule
 
mysql> grant all privileges on * . * to 'mule'@'localhost';
 
mysql> show databases;
 
mysql> select user from mysql.user;
 
mysql -u mule -p dataformule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use dataformule;
show tables;
 
create table Customer (
     id INT NOT NULL AUTO_INCREMENT,
     company_name char(30) NOT NULL,
     contact_name char (30) NOT NULL,
     phone char(30),
     company_address char(30),
     PRIMARY KEY (id)
);
 
mysql> desc Customer;
+-----------------+----------+------+-----+---------+----------------+
| Field           | Type     | Null | Key | Default | Extra          |
+-----------------+----------+------+-----+---------+----------------+
| id              | int(11)  | NO   | PRI | NULL    | auto_increment |
| company_name    | char(30) | NO   |     | NULL    |                |
| contact_name    | char(30) | NO   |     | NULL    |                |
| phone           | char(30) | YES  |     | NULL    |                |
| company_address | char(30) | YES  |     | NULL    |                |
+-----------------+----------+------+-----+---------+----------------+
5 rows in set (0.01 sec)
 
 
 
select * from dataformule.Customer;
 
INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (102, 'Company_B', 'Contact_B', '999-888-7778', 'Address_B');
INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (103, 'Company_C', 'Contact_C', '999-888-7779', 'Address_C');
INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (104, 'Company_D', 'Contact_D', '999-888-7781', 'Address_D');
INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (105, 'Company_E', 'Contact_E', '999-888-7782', 'Address_E');
 
 
mysql> select * from customer;
+-----+--------------+--------------+--------------+-----------------+
| id  | company_name | contact_name | phone        | company_address |
+-----+--------------+--------------+--------------+-----------------+
| 102 | Company_B    | Contact_B    | 999-888-7778 | Address_B       |
| 103 | Company_C    | Contact_C    | 999-888-7779 | Address_C       |
| 104 | Company_D    | Contact_D    | 999-888-7781 | Address_D       |
| 105 | Company_E    | Contact_E    | 999-888-7782 | Address_E       |
+-----+--------------+--------------+--------------+-----------------+
4 rows in set (0.01 sec)
For setup ActiveMQ with SSL, you may search my previous blog. I have documented in details about the set up

The Code

The complete code is available at GitHub at:
https://github.com/garyliu1119/Mule-Development/tree/master/ActiveMQ-Messaging
The mule flow code list as the following:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
    <http:listener-config doc:name="HTTP Listener Configuration" host="0.0.0.0" name="HTTP_Listener_Configuration" port="8081">
    <jms:activemq-connector brokerurl="tcp://localhost:61616" doc:name="Active MQ" name="Active_MQ" validateconnections="true">
        <reconnect count="10" frequency="15000">
    </reconnect></jms:activemq-connector>
    <configuration defaultexceptionstrategy-ref="globlal_Exception_Strategy" doc:name="Configuration">
    <db:mysql-config database="dataformule" doc:name="MySQL Configuration" host="localhost" name="MySQL_Configuration" password="mule" port="3306" user="mule">
    <db:template-query doc:name="Template Query" name="Template_Query_By_Company_Name">
        <db:parameterized-query><![CDATA[SELECT * FROM Customer WHERE company_name = :companyName;]]></db:parameterized-query>
        <db:in-param defaultvalue="#[message.inboundProperties.'http.query.params'.companyName];" name="companyName">
    </db:in-param></db:template-query>
    <db:template-query doc:name="Template Query" name="Template_Query_By_Phone_Number">
        <db:parameterized-query><![CDATA[SELECT * FROM CUSTOMER WHERE phone = :phoneNumber;]]></db:parameterized-query>
        <db:in-param defaultvalue="#[message.inboundProperties.'http.query.params'.phoneNumber]" name="phoneNumber" type="CHAR">
    </db:in-param></db:template-query>
    <flow name="Mule-Use-Case-A-MainFlow">
        <http:listener allowedmethods="GET,POST,DELETE, UPDATE" config-ref="HTTP_Listener_Configuration" doc:name="HTTP" path="/json-jms">
        <choice doc:name="Choice">
            <when expression="#[message.inboundProperties.'http.query.params'.companyName != null]">
                <flow-ref doc:name="Data Retrieval by Company" name="activemq-messaging-datd-retrieval-by-company-Sub_Flow">
            </flow-ref></when>
            <when expression="#[message.inboundProperties.'http.query.params'.phoneNumber != null]">
                <flow-ref doc:name="Data Retrieval By Phone Number" name="activemq-messaging-datd-retrieval-by-phone-Sub_Flow">
            </flow-ref></when>
            <otherwise>
                <scripting:component doc:name="Groovy">
                    <scripting:script engine="Groovy"><![CDATA[throw new IllegalArgumentException('Paramenter Not Acceptable')]]></scripting:script>
                </scripting:component>
            </otherwise>
        </choice>
        <set-variable doc:name="Set Message Counter to 0" value="0" variablename="messageCounter">
        <flow-ref doc:name="activemq-messagiing-send-Sub_Flow" name="activemq-messagiing-send-Sub_Flow">
        <set-payload doc:name="Set Payload" value="{"count": #[flowVars.messageCounter]}">
        <set-property doc:name="Property" propertyname="Content-Type" value="application/json">
        <choice-exception-strategy doc:name="Choice Exception Strategy">
            <catch-exception-strategy doc:name="Catch Exception Strategy" when="#[exception.causedBy(org.mule.api.transformer.TransformerException)]">
                <set-payload doc:name="Set Payload" value="#[payload]">
                <jms:outbound-endpoint connector-ref="Active_MQ" doc:name="JMS" queue="Dead.Letter.Invalid.Data">
            </jms:outbound-endpoint></set-payload></catch-exception-strategy>
            <catch-exception-strategy doc:name="Catch Database Connection Exception" when="#[exception.causedBy(java.sql.SQLException)]">
                <set-payload doc:name="Set Payload" value="#[payload]">
                <jms:outbound-endpoint connector-ref="Active_MQ" doc:name="JMS" queue="Dead.Letter.Invalid.Data">
            </jms:outbound-endpoint></set-payload></catch-exception-strategy>
            <catch-exception-strategy doc:name="Catch ActiveMQ Connection Exception" when="#[exception.causedBy(java.lang.Exception)]">
                <set-payload doc:name="Set Payload" value="The request cannot be processed, the error is #[exception.getSummaryMessage()] ">
                <logger doc:name="Logger" level="INFO" message="Unexpected Exception: #[payload]">
            </logger></set-payload></catch-exception-strategy>
        </choice-exception-strategy>
    </set-property></set-payload></flow-ref></set-variable></http:listener></flow>
    <sub-flow name="activemq-messagiing-send-Sub_Flow">
        <foreach collection="#[payload]" doc:name="Loop List Of Messages">
            <logger doc:name="Log Single message" level="INFO" message="#[payload]">
            <json:object-to-json-transformer doc:name="Object to JSON">
            <set-variable doc:name="Set Count Variable" value="#[flowVars.counter]" variablename="messageCounter">
            <jms:outbound-endpoint connector-ref="Active_MQ" doc:name="Send To JMS" queue="Customer.Information">
        </jms:outbound-endpoint></set-variable></json:object-to-json-transformer></logger></foreach>
    </sub-flow>
    <sub-flow name="activemq-messaging-datd-retrieval-by-company-Sub_Flow">
        <db:select config-ref="MySQL_Configuration" doc:name="Database">
            <db:template-query-ref name="Template_Query_By_Company_Name">
            <db:in-param name="companyName" value="#[message.inboundProperties.'http.query.params'.companyName]">
        </db:in-param></db:template-query-ref></db:select>
        <json:object-to-json-transformer doc:name="Object to JSON">
        <json:json-to-object-transformer doc:name="JSON to Object" returnclass="java.util.List">
    </json:json-to-object-transformer></json:object-to-json-transformer></sub-flow>
    <sub-flow name="activemq-messaging-datd-retrieval-by-phone-Sub_Flow">
        <db:select config-ref="MySQL_Configuration" doc:name="Database">
            <db:template-query-ref name="Template_Query_By_Phone_Number">
            <db:in-param name="phoneNumber" type="CHAR" value="#[message.inboundProperties.'http.query.params'.phoneNumber]">
        </db:in-param></db:template-query-ref></db:select>
        <json:object-to-json-transformer doc:name="Object to JSON">
        <json:json-to-object-transformer doc:name="JSON to Object" returnclass="java.util.List">
    </json:json-to-object-transformer></json:object-to-json-transformer></sub-flow>
    <catch-exception-strategy name="globlal_Exception_Strategy">
        <set-payload doc:name="Set Payload" value="#[payload]">
        <logger doc:name="Logger" level="INFO" message="GLOBAL Exception Handller: #[payload]">
    </logger></set-payload></catch-exception-strategy>
</db:mysql-config></configuration></http:listener-config></mule>

Detailed Explanation

You can view the video for detailed explanation about this application at http://youtu.be/DAsFv045nlw 


How To Stop mysqld

Recently, I find I could not stop the mysqld process on my MacBook Pro. After some research, trial and errors, I found the following command works:

1
sudo launchctl unload -w /Library/LaunchDaemons/com.oracle.oss.mysql.mysqld.plist

6 comments:


  1. awesome post presented by you..your writing style is fabulous and keep update with your blogs.

    Mulesoft online training hyderabad

    ReplyDelete

  2. Great Article. its is very very helpful for all of us and I never get bored while reading your article because, they are becomes a more and more interesting from the starting lines until the end.

    Mulesoft online course

    ReplyDelete




  3. the blog is good and Interactive it is Mule Application Development Using MySQL and ActiveMQ Connectors it is useful for students and Mulesoft Developers for more updates on Mulesoft follow the link

    mulesoft Online training hyderabad

    For more info on other technologies go with below links

    Python Online Training

    tableau online training hyderabad

    ServiceNow Online Training

    ReplyDelete
  4. Thanks for sharing valuable information and keep posting.

    mulesoft self learning
    mulesoft course

    ReplyDelete

Anypoint Studio Error: The project is missing Munit lIbrary to run tests

Anypoint Studio 7.9 has a bug. Even if we following the article: https://help.mulesoft.com/s/article/The-project-is-missing-MUnit-libraries-...