Introduction
In this blog, I am going to explain how to use Mule ESB to perform integration with database and messaging brokers. I use MySQL as database engine and ActiveMQ as message broker.The youtube video now available at https://www.youtube.com/watch?v=DAsFv045nlw&feature=youtu.be
Here is the use case:
- A user sends HTTP GET request to retrieve customer information based on company name
- A Mule flow will retrieve all the records from database table named Customer
- Send each record to JMS Queue
- Return use the counter of total records
- handle exceptions
Infrastructure Setup
In order achieve this, we need to setup MySQL database. Also we need to setup ActiveMQ.Setup MySQL
Here is the procedures to setup MySQL on MacBook Pro:- Down load MySQL dmg
- Install it
- Start the mysqld by run: sudo $MYSQL_INSTALLATION/mysql/support-files/mysql.server start
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | $mysql -u root mysql> create user 'mule' @ 'localhost' identified by 'mule' ; mysql> create database dataformule; $mysql -u mule -p dataformule mysql> grant all privileges on * . * to 'mule' @ 'localhost' ; mysql> show databases; mysql> select user from mysql.user; mysql -u mule -p dataformule |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | use dataformule; show tables; create table Customer ( id INT NOT NULL AUTO_INCREMENT, company_name char(30) NOT NULL, contact_name char (30) NOT NULL, phone char(30), company_address char(30), PRIMARY KEY (id) ); mysql> desc Customer; +-----------------+----------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+----------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | company_name | char(30) | NO | | NULL | | | contact_name | char(30) | NO | | NULL | | | phone | char(30) | YES | | NULL | | | company_address | char(30) | YES | | NULL | | +-----------------+----------+------+-----+---------+----------------+ 5 rows in set (0.01 sec) select * from dataformule.Customer; INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (102, 'Company_B' , 'Contact_B' , '999-888-7778' , 'Address_B' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (103, 'Company_C' , 'Contact_C' , '999-888-7779' , 'Address_C' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (104, 'Company_D' , 'Contact_D' , '999-888-7781' , 'Address_D' ); INSERT INTO dataformule.Customer (id, company_name, contact_name, phone, company_address) VALUES (105, 'Company_E' , 'Contact_E' , '999-888-7782' , 'Address_E' ); mysql> select * from customer; +-----+--------------+--------------+--------------+-----------------+ | id | company_name | contact_name | phone | company_address | +-----+--------------+--------------+--------------+-----------------+ | 102 | Company_B | Contact_B | 999-888-7778 | Address_B | | 103 | Company_C | Contact_C | 999-888-7779 | Address_C | | 104 | Company_D | Contact_D | 999-888-7781 | Address_D | | 105 | Company_E | Contact_E | 999-888-7782 | Address_E | +-----+--------------+--------------+--------------+-----------------+ 4 rows in set (0.01 sec) |
The Code
The complete code is available at GitHub at:https://github.com/garyliu1119/Mule-Development/tree/master/ActiveMQ-Messaging
The mule flow code list as the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | < mule version = "EE-3.7.0" xmlns:db = "http://www.mulesoft.org/schema/mule/db" xmlns:doc = "http://www.mulesoft.org/schema/mule/documentation" xmlns:http = "http://www.mulesoft.org/schema/mule/http" xmlns:jms = "http://www.mulesoft.org/schema/mule/jms" xmlns:json = "http://www.mulesoft.org/schema/mule/json" xmlns:scripting = "http://www.mulesoft.org/schema/mule/scripting" xmlns:spring = "http://www.springframework.org/schema/beans" xmlns:tracking = "http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns = "http://www.mulesoft.org/schema/mule/core" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> < http:listener-config doc:name = "HTTP Listener Configuration" host = "0.0.0.0" name = "HTTP_Listener_Configuration" port = "8081" > < jms:activemq-connector brokerurl = "tcp://localhost:61616" doc:name = "Active MQ" name = "Active_MQ" validateconnections = "true" > < reconnect count = "10" frequency = "15000" > </ reconnect ></ jms:activemq-connector > < configuration defaultexceptionstrategy-ref = "globlal_Exception_Strategy" doc:name = "Configuration" > < db:mysql-config database = "dataformule" doc:name = "MySQL Configuration" host = "localhost" name = "MySQL_Configuration" password = "mule" port = "3306" user = "mule" > < db:template-query doc:name = "Template Query" name = "Template_Query_By_Company_Name" > < db:parameterized-query > <![CDATA[SELECT * FROM Customer WHERE company_name = :companyName;]]> </ db:parameterized-query > < db:in-param defaultvalue = "#[message.inboundProperties.'http.query.params'.companyName];" name = "companyName" > </ db:in-param ></ db:template-query > < db:template-query doc:name = "Template Query" name = "Template_Query_By_Phone_Number" > < db:parameterized-query > <![CDATA[SELECT * FROM CUSTOMER WHERE phone = :phoneNumber;]]> </ db:parameterized-query > < db:in-param defaultvalue = "#[message.inboundProperties.'http.query.params'.phoneNumber]" name = "phoneNumber" type = "CHAR" > </ db:in-param ></ db:template-query > < flow name = "Mule-Use-Case-A-MainFlow" > < http:listener allowedmethods = "GET,POST,DELETE, UPDATE" config-ref = "HTTP_Listener_Configuration" doc:name = "HTTP" path = "/json-jms" > < choice doc:name = "Choice" > < when expression = "#[message.inboundProperties.'http.query.params'.companyName != null]" > < flow-ref doc:name = "Data Retrieval by Company" name = "activemq-messaging-datd-retrieval-by-company-Sub_Flow" > </ flow-ref ></ when > < when expression = "#[message.inboundProperties.'http.query.params'.phoneNumber != null]" > < flow-ref doc:name = "Data Retrieval By Phone Number" name = "activemq-messaging-datd-retrieval-by-phone-Sub_Flow" > </ flow-ref ></ when > < otherwise > < scripting:component doc:name = "Groovy" > < scripting:script engine = "Groovy" > <![CDATA[throw new IllegalArgumentException('Paramenter Not Acceptable')]]> </ scripting:script > </ scripting:component > </ otherwise > </ choice > < set-variable doc:name = "Set Message Counter to 0" value = "0" variablename = "messageCounter" > < flow-ref doc:name = "activemq-messagiing-send-Sub_Flow" name = "activemq-messagiing-send-Sub_Flow" > < set-payload doc:name = "Set Payload" value = "{"count": #[flowVars.messageCounter]}" > < set-property doc:name = "Property" propertyname = "Content-Type" value = "application/json" > < choice-exception-strategy doc:name = "Choice Exception Strategy" > < catch-exception-strategy doc:name = "Catch Exception Strategy" when = "#[exception.causedBy(org.mule.api.transformer.TransformerException)]" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "JMS" queue = "Dead.Letter.Invalid.Data" > </ jms:outbound-endpoint ></ set-payload ></ catch-exception-strategy > < catch-exception-strategy doc:name = "Catch Database Connection Exception" when = "#[exception.causedBy(java.sql.SQLException)]" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "JMS" queue = "Dead.Letter.Invalid.Data" > </ jms:outbound-endpoint ></ set-payload ></ catch-exception-strategy > < catch-exception-strategy doc:name = "Catch ActiveMQ Connection Exception" when = "#[exception.causedBy(java.lang.Exception)]" > < set-payload doc:name = "Set Payload" value = "The request cannot be processed, the error is #[exception.getSummaryMessage()] " > < logger doc:name = "Logger" level = "INFO" message = "Unexpected Exception: #[payload]" > </ logger ></ set-payload ></ catch-exception-strategy > </ choice-exception-strategy > </ set-property ></ set-payload ></ flow-ref ></ set-variable ></ http:listener ></ flow > < sub-flow name = "activemq-messagiing-send-Sub_Flow" > < foreach collection = "#[payload]" doc:name = "Loop List Of Messages" > < logger doc:name = "Log Single message" level = "INFO" message = "#[payload]" > < json:object-to-json-transformer doc:name = "Object to JSON" > < set-variable doc:name = "Set Count Variable" value = "#[flowVars.counter]" variablename = "messageCounter" > < jms:outbound-endpoint connector-ref = "Active_MQ" doc:name = "Send To JMS" queue = "Customer.Information" > </ jms:outbound-endpoint ></ set-variable ></ json:object-to-json-transformer ></ logger ></ foreach > </ sub-flow > < sub-flow name = "activemq-messaging-datd-retrieval-by-company-Sub_Flow" > < db:select config-ref = "MySQL_Configuration" doc:name = "Database" > < db:template-query-ref name = "Template_Query_By_Company_Name" > < db:in-param name = "companyName" value = "#[message.inboundProperties.'http.query.params'.companyName]" > </ db:in-param ></ db:template-query-ref ></ db:select > < json:object-to-json-transformer doc:name = "Object to JSON" > < json:json-to-object-transformer doc:name = "JSON to Object" returnclass = "java.util.List" > </ json:json-to-object-transformer ></ json:object-to-json-transformer ></ sub-flow > < sub-flow name = "activemq-messaging-datd-retrieval-by-phone-Sub_Flow" > < db:select config-ref = "MySQL_Configuration" doc:name = "Database" > < db:template-query-ref name = "Template_Query_By_Phone_Number" > < db:in-param name = "phoneNumber" type = "CHAR" value = "#[message.inboundProperties.'http.query.params'.phoneNumber]" > </ db:in-param ></ db:template-query-ref ></ db:select > < json:object-to-json-transformer doc:name = "Object to JSON" > < json:json-to-object-transformer doc:name = "JSON to Object" returnclass = "java.util.List" > </ json:json-to-object-transformer ></ json:object-to-json-transformer ></ sub-flow > < catch-exception-strategy name = "globlal_Exception_Strategy" > < set-payload doc:name = "Set Payload" value = "#[payload]" > < logger doc:name = "Logger" level = "INFO" message = "GLOBAL Exception Handller: #[payload]" > </ logger ></ set-payload ></ catch-exception-strategy > </ db:mysql-config ></ configuration ></ http:listener-config ></ mule > |
Detailed Explanation
You can view the video for detailed explanation about this application at http://youtu.be/DAsFv045nlwHow To Stop mysqld
Recently, I find I could not stop the mysqld process on my MacBook Pro. After some research, trial and errors, I found the following command works:
1 | sudo launchctl unload -w /Library/LaunchDaemons/com.oracle.oss.mysql.mysqld.plist |