最近学习了一段时间的ActiveMQ,apache的强劲的消息总线服务。学习过程参考了ActiveMQ in Action和whitesock的javaeye博客。使用消息中间件来进行消息传递的原理如下图
与大家分享2个最简单的消息通信的例子。生产者和消费者,发布者和订阅者
生产者
package cn.adcc.activemq.point2point;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestProducer {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private boolean transacted;//默认为false
private String subject = "TOOL.DEFAULT";
private Destination destination;//消息目的地
private boolean persistent = true;
public static void main(String[] args) throws JMSException {
new TestProducer().run();
}
public void run() throws JMSException{
Connection connection = null;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
// Create the producer.
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("hello");
producer.send(message);
System.exit(0);
}
}
消费者
package cn.adcc.activemq.point2point;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestConsumer implements MessageListener, ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
//ActiveMQConnection.DEFAULT_BROKER_URL;
private Session session;
private boolean transacted;
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private String subject = "TOOL.DEFAULT";
//"TOOL.DEFAULT";//主题
private Destination destination = null;
private MessageProducer replyProducer;
private boolean persistent = true;
//回调方法
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = "";
msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TestConsumer().run();
}
public void run(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
try {
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
destination = session.createQueue(subject);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onException(JMSException arg0) {
// TODO Auto-generated method stub
}
}
发布者
package cn.adcc.activemq.publisher.mypackage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicPublisher {
private String url = "tcp://localhost:61616";
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String topicName = "testtopic";
private void run() throws JMSException{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,password,url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message message = session.createTextMessage("hello world1!");
producer.send(message);
System.exit(0);
}
public static void main(String[] args) throws JMSException{
new TopicPublisher().run();
}
}
订阅者
package cn.adcc.activemq.publisher.mypackage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.RemoveInfo;
public class TopicSubsribe implements MessageListener{
private String url = "tcp://localhost:61616";
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String topicName = "testtopic";
public static void main(String[] args) throws JMSException{
new TopicSubsribe().run();
}
private void run() throws JMSException{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,password,url);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = "";
try {
msg = txtMsg.getText();
System.out.println("Received: " + msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
分享到:
相关推荐
springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot...
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
用zabbix 自动发现实现activemq 监控pending consumers activemq_scan.sh #!/bin/bash activemq() { MQ_IP=(10.10.11.208:8161) for g in ${MQ_IP[@]} do port=($(curl -uadmin:admin http://${g}/admin/queues.jsp...
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种...
ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
自己做的尚硅谷周阳老师ActiveMQ课程脑图,其中自己所用做案例的环境搭建都是基于docker与老师课程不一样。脑图内容涵盖视频的99%的笔记,含有自己编写的代码文件,外加了自己对一些问题的测试与回答。 消息中间件...
包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...
apache-activemq Linux版本
构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...
ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...
activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-
Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
springboot集成activemq实现消息接收demo