Introduction
In this blog, I am going to explain how to use Mule ESB to perform integration with database and messaging brokers. I use MySQL as database engine and ActiveMQ as message broker.The youtube video now available at https://www.youtube.com/watch?v=DAsFv045nlw&feature=youtu.be
Here is the use case:
- A user sends HTTP GET request to retrieve customer information based on company name
- A Mule flow will retrieve all the records from database table named Customer
- Send each record to JMS Queue
- Return use the counter of total records
- handle exceptions
Infrastructure Setup
In order achieve this, we need to setup MySQL database. Also we need to setup ActiveMQ.Setup MySQL
Here is the procedures to setup MySQL on MacBook Pro:- Down load MySQL dmg
- Install it
- Start the mysqld by run: sudo $MYSQL_INSTALLATION/mysql/support-files/mysql.server start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | $mysql -u root mysql> create user 'mule' @ 'localhost' identified by 'mule' ; mysql> create database dataformule; $mysql -u mule -p dataformule mysql> grant all privileges on * . * to 'mule' @ 'localhost' ; mysql> show databases; mysql> select user from mysql.user; mysql -u mule -p dataformule |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | use dataformule; show tables; create table Customer ( id INT NOT NULL AUTO_INCREMENT, company_name char(30) NOT NULL, contact_name char (30) NOT NULL, phone char(30), company_address char(30), PRIMARY KEY (id) ); mysql> desc Customer; +-----------------+----------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+----------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | company_name | char(30) | NO | | NULL | | | contact_name | char(30) | NO | | NULL | | | phone | char(30) | YES | | NULL | | | company_address | char(30) | YES | | NULL | | +-----------------+----------+------+-----+---------+----------------+ 5 rows in set (0.01 sec) select * from dataformule.Customer; INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (102, 'Company_B' , 'Contact_B' , '999-888-7778' , 'Address_B' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (103, 'Company_C' , 'Contact_C' , '999-888-7779' , 'Address_C' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (104, 'Company_D' , 'Contact_D' , '999-888-7781' , 'Address_D' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (105, 'Company_E' , 'Contact_E' , '999-888-7782' , 'Address_E' ); mysql> select * from customer; +-----+--------------+--------------+--------------+-----------------+ | id | company_name | contact_name | phone | company_address | +-----+--------------+--------------+--------------+-----------------+ | 102 | Company_B | Contact_B | 999-888-7778 | Address_B | | 103 | Company_C | Contact_C | 999-888-7779 | Address_C | | 104 | Company_D | Contact_D | 999-888-7781 | Address_D | | 105 | Company_E | Contact_E | 999-888-7782 | Address_E | +-----+--------------+--------------+--------------+-----------------+ 4 rows in set (0.01 sec) |
The Code
The complete code is available at GitHub at:https://github.com/garyliu1119/Mule-Development/tree/master/ActiveMQ-Messaging
The mule flow code list as the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | < mule version = "EE-3.7.0" xmlns:db = "http://www.mulesoft.org/schema/mule/db" xmlns:doc = "http://www.mulesoft.org/schema/mule/documentation" xmlns:http = "http://www.mulesoft.org/schema/mule/http" xmlns:jms = "http://www.mulesoft.org/schema/mule/jms" xmlns:json = "http://www.mulesoft.org/schema/mule/json" xmlns:scripting = "http://www.mulesoft.org/schema/mule/scripting" xmlns:spring = "http://www.springframework.org/schema/beans" xmlns:tracking = "http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns = "http://www.mulesoft.org/schema/mule/core" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> < http:listener-config doc:name = "HTTP Listener Configuration" host = "0.0.0.0" name = "HTTP_Listener_Configuration" port = "8081" > < jms:activemq-connector brokerurl = "tcp://localhost:61616" doc:name = "Active MQ" name = "Active_MQ" validateconnections = "true" > < reconnect count = "10" frequency = "15000" > </ reconnect ></ jms:activemq-connector > < configuration defaultexceptionstrategy-ref = "globlal_Exception_Strategy" doc:name = "Configuration" > < db:mysql-config database = "dataformule" doc:name = "MySQL Configuration" host = "localhost" name = "MySQL_Configuration" password = "mule" port = "3306" user = "mule" > < db:template-query doc:name = "Template Query" name = "Template_Query_By_Company_Name" > < db:parameterized-query > <![CDATA[SELECT * FROM Customer WHERE company_name = :companyName;]]> </ db:parameterized-query > < db:in-param defaultvalue = "#[message.inboundProperties.'http.query.params'.companyName];" name = "companyName" > </ db:in-param ></ db:template-query > < db:template-query doc:name = "Template Query" name = "Template_Query_By_Phone_Number" > < db:parameterized-query > <![CDATA[SELECT * FROM CUSTOMER WHERE phone = :phoneNumber;]]> </ db:parameterized-query > < db:in-param defaultvalue = "#[message.inboundProperties.'http.query.params'.phoneNumber]" name = "phoneNumber" type = "CHAR" > </ db:in-param ></ db:template-query > < flow name = "Mule-Use-Case-A-MainFlow" > < http:listener allowedmethods = "GET,POST,DELETE, UPDATE" config-ref = "HTTP_Listener_Configuration" doc:name = "HTTP" path = "/json-jms" > < choice doc:name = "Choice" > < when expression = "#[message.inboundProperties.'http.query.params'.companyName != null]" > < flow-ref doc:name = "Data Retrieval by Company" name = "activemq-messaging-datd-retrieval-by-company-Sub_Flow" > </ flow-ref ></ when > < when expression = "#[message.inboundProperties.'http.query.params'.phoneNumber != null]" > < flow-ref doc:name = "Data Retrieval By Phone Number" name = "activemq-messaging-datd-retrieval-by-phone-Sub_Flow" > </ flow-ref ></ when > < otherwise > < scripting:component doc:name = "Groovy" > < scripting:script engine = "Groovy" > <![CDATA[throw new IllegalArgumentException('Paramenter Not Acceptable')]]> </ scripting:script > </ scripting:component > </ otherwise > </ choice > < set-variable doc:name = "Set Message Counter to 0" value = "0" variablename = "messageCounter" > < flow-ref doc:name = "activemq-messagiing-send-Sub_Flow" name = "activemq-messagiing-send-Sub_Flow" > < set-payload doc:name = "Set Payload" value = "{"count": #[flowVars.messageCounter]}" > < set-property doc:name = "Property" propertyname = "Content-Type" value = "application/json" > < choice-exception-strategy doc:name = "Choice Exception Strategy" > < catch-exception-strategy doc:name = "Catch Exception Strategy" when = "#[exception.causedBy(org.mule.api.transformer.TransformerException)]" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "JMS" queue = "Dead.Letter.Invalid.Data" > </ jms:outbound-endpoint ></ set-payload ></ catch-exception-strategy > < catch-exception-strategy doc:name = "Catch Database Connection Exception" when = "#[exception.causedBy(java.sql.SQLException)]" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "JMS" queue = "Dead.Letter.Invalid.Data" > </ jms:outbound-endpoint ></ set-payload ></ catch-exception-strategy > < catch-exception-strategy doc:name = "Catch ActiveMQ Connection Exception" when = "#[exception.causedBy(java.lang.Exception)]" > < set-payload doc:name = "Set Payload" value = "The request cannot be processed, the error is #[exception.getSummaryMessage()] " > < logger doc:name = "Logger" level = "INFO" message = "Unexpected Exception: #[payload]" > </ logger ></ set-payload ></ catch-exception-strategy > </ choice-exception-strategy > </ set-property ></ set-payload ></ flow-ref ></ set-variable ></ http:listener ></ flow > < sub-flow name = "activemq-messagiing-send-Sub_Flow" > < foreach collection = "#[payload]" doc:name = "Loop List Of Messages" > < logger doc:name = "Log Single message" level = "INFO" message = "#[payload]" > < json:object-to-json-transformer doc:name = "Object to JSON" > < set-variable doc:name = "Set Count Variable" value = "#[flowVars.counter]" variablename = "messageCounter" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "Send To JMS" queue = "Customer.Information" > </ jms:outbound-endpoint ></ set-variable ></ json:object-to-json-transformer ></ logger ></ foreach > </ sub-flow > < sub-flow name = "activemq-messaging-datd-retrieval-by-company-Sub_Flow" > < db:select config-ref = "MySQL_Configuration" doc:name = "Database" > < db:template-query-ref name = "Template_Query_By_Company_Name" > < db:in-param name = "companyName" value = "#[message.inboundProperties.'http.query.params'.companyName]" > </ db:in-param ></ db:template-query-ref ></ db:select > < json:object-to-json-transformer doc:name = "Object to JSON" > < json:json-to-object-transformer doc:name = "JSON to Object" returnclass = "java.util.List" > </ json:json-to-object-transformer ></ json:object-to-json-transformer ></ sub-flow > < sub-flow name = "activemq-messaging-datd-retrieval-by-phone-Sub_Flow" > < db:select config-ref = "MySQL_Configuration" doc:name = "Database" > < db:template-query-ref name = "Template_Query_By_Phone_Number" > < db:in-param name = "phoneNumber" type = "CHAR" value = "#[message.inboundProperties.'http.query.params'.phoneNumber]" > </ db:in-param ></ db:template-query-ref ></ db:select > < json:object-to-json-transformer doc:name = "Object to JSON" > < json:json-to-object-transformer doc:name = "JSON to Object" returnclass = "java.util.List" > </ json:json-to-object-transformer ></ json:object-to-json-transformer ></ sub-flow > < catch-exception-strategy name = "globlal_Exception_Strategy" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < logger doc:name = "Logger" level = "INFO" message = "GLOBAL Exception Handller: #[payload]" > </ logger ></ set-payload ></ catch-exception-strategy > </ db:mysql-config ></ configuration ></ http:listener-config ></ mule > |
Detailed Explanation
You can view the video for detailed explanation about this application at http://youtu.be/DAsFv045nlwHow To Stop mysqld
Recently, I find I could not stop the mysqld process on my MacBook Pro. After some research, trial and errors, I found the following command works:
1 | sudo launchctl unload -w /Library/LaunchDaemons/com.oracle.oss.mysql.mysqld.plist |
ReplyDeleteawesome post presented by you..your writing style is fabulous and keep update with your blogs.
Mulesoft online training hyderabad
ReplyDeleteGreat Article. its is very very helpful for all of us and I never get bored while reading your article because, they are becomes a more and more interesting from the starting lines until the end.
Mulesoft online course
ReplyDeletethe blog is good and Interactive it is Mule Application Development Using MySQL and ActiveMQ Connectors it is useful for students and Mulesoft Developers for more updates on Mulesoft follow the link
mulesoft Online training hyderabad
For more info on other technologies go with below links
Python Online Training
tableau online training hyderabad
ServiceNow Online Training
Awesome post. You Post is very informative. Thanks for Sharing.
ReplyDeleteMulesoft Online Training
Mulesoft Training in Hyderabad
Thanks for sharing valuable information and keep posting.
ReplyDeletemulesoft self learning
mulesoft course
Very Nice information
ReplyDeleteBest Mulesoft Training
Mulesoft Online Training in Hyderabad