java消息隊列ActiveMQ的簡單用

  • 時間:2018-09-13 22:42 作者:Java架構解析 來源:Java架構解析 閱讀:142
  • 掃一掃,手機訪問
摘要:activeMQ是學習java消息隊列的實現項目,用jfinal + jfinal-ext + activeMQ + quartz快速構建。1.消息隊列消息隊列,其實是一種基于數據結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。2.環境調試首先去apache的官網

activeMQ

是學習java消息隊列的實現項目,用jfinal + jfinal-ext + activeMQ + quartz快速構建。

1.消息隊列

消息隊列,其實是一種基于數據結構實現的服務。而java語言中的實現,有apache的activeMQ,比較主流。

2.環境調試

首先去apache的官網下載apache-activeMQ-...-.zip的包,解壓后,運行bin中的activeMQ服務。
在瀏覽器中輸入http://localhost:8186/admin,出現登陸界面輸入admin/admin登陸就可。


java消息隊列ActiveMQ的簡單用


而后創立一個FirstQueue隊列(給后面的實例提供服務)。

3.activeMQ原始操作

記住activeMQ服務肯定要一直開啟,發送者和接收者都會通過tcp協議去鏈接服務器,以獲得消息隊列中的消息體。
如下圖是我的服務器cmd截圖:


java消息隊列ActiveMQ的簡單用


3.1.首先建立發送者Sender.java

package com.mg.demo;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :連接工廠,JMS 使用它創立連接
ConnectionFactory connectionFactory;
// Connection :JMS 用戶端到JMS
// Provider 的連接
Connection connection = null;
// Session: 一個發送或者接收消息的線程
Session session;
// Destination :消息的目的地;消息發送給誰.
Destination destination;
// MessageProducer:消息發送者
MessageProducer producer;
// TextMessage message;
// 構造ConnectionFactory實例對象,此處采使用ActiveMq的實現jar
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try { // 構造從工廠得到連接對象
connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
// 得到消息生成者【發送者】
producer = session.createProducer(destination);
// 設置不持久化,此處學習,實際根據項目決定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 構造消息,此處寫死,項目就是參數,或者者方法獲取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i);
// 發送消息到目的地方
System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
producer.send(message);
}
}

3.2.再創立接收者Receiver.java

package com.mg.demo;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver{
public static void main(String[] args) {
// ConnectionFactory :連接工廠,JMS 使用它創立連接
ConnectionFactory connectionFactory;
// Connection :JMS 用戶端到JMS Provider 的連接
Connection connection = null;
// Session: 一個發送或者接收消息的線程
Session session;
// Destination :消息的目的地;消息發送給誰.
Destination destination;
// 消費者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
// 構造從工廠得到連接對象
connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
// 設置接收者接收消息的時間,為了便于測試,這里誰定為100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}

3.3.測試結果

先運行接收者Receiver.java,在運行Sender.java。
得到結果如下圖:(2個控制臺都會輸出如下圖數據)


java消息隊列ActiveMQ的簡單用


4.用jfinal-ext中的jms插件操作activeMQ

整合quartz任務調度框架,實現每10秒發送一次消息到隊列。

4.1.核心代碼

public static void main(String[] args) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
JmsPlugin jp = new JmsPlugin("jms.properties");
jp.start();
PropertyConfig pc = PropertyConfig.me();
pc.loadPropertyFile("job.properties");
QuartzPlugin qp = new QuartzPlugin();
if (pc.getPropertyToBoolean("a.enable")) {
qp.add(pc.getProperty("a.cron"), (Job) Class.forName(pc.getProperty("a.job")).newInstance());
}
qp.start();
}

4.2.配置文件jms.properties

################################
# server info #
################################
# jms服務器地址
serverUrl=tcp://localhost:61616
username=admin
password=admin
################################
# queue info #
################################
# 發送的隊列名字,使用“,”號分隔
sendQueues=firstMQ
# 接受的隊列的名字,使用“,”號分隔
receiveQueues=firstMQ
# 隊列firstMQ上消息名字為a的消息號
queue.firstMQ.a=10000
#接受到隊列q1上消息名字為a的消息的時候調使用的解決器
queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver

4.3.配置文件job.properties

#JobA
a.job=com.mg.jfinal.task.JobA
a.cron=*/10 * * * * ?
a.enable=true

4.4.運行結果

如圖:


java消息隊列ActiveMQ的簡單用


最后:大家可以關注我私信我:“資料”就可領取Java架構,Dubbo、Redis、Netty、zookeeper Spring cloud、分布式、高并發、性能調優、微服務等架構技術的視頻資料及源碼。

  • 全部評論(0)
最新發布的資訊信息
【系統環境|】淘碼庫,據消息稱已被調查。淘碼庫源碼網,已經無法訪問!(2020-01-14 04:13)
【系統環境|服務器應用】Discuz隱藏后臺admin.php網址修改路徑(2019-12-16 16:48)
【系統環境|服務器應用】2020新網站如何讓百度快速收錄網站首頁最新方法,親測有用!免費(2019-12-16 16:46)
【系統環境|服務器應用】Discuz發布帖子時默認顯示第一個主題分類的修改方法(2019-12-09 00:13)
【系統環境|軟件環境】Android | App內存優化 之 內存泄漏 要點概述 以及 處理實戰(2019-12-04 14:27)
【系統環境|軟件環境】MySQL InnoDB 事務(2019-12-04 14:26)
【系統環境|軟件環境】vue-router(單頁面應用控制中心)常見用法(2019-12-04 14:26)
【系統環境|軟件環境】Linux中的Kill命令(2019-12-04 14:26)
【系統環境|軟件環境】Linux 入門時必學60個文件解決命令(2019-12-04 14:26)
【系統環境|軟件環境】更新版ThreeJS 3D粒子波浪動畫(2019-12-04 14:26)
手機二維碼手機訪問領取大禮包
返回頂部
双色球号码300期遗传走势图