`
谷超
  • 浏览: 163372 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ

    博客分类:
  • J2EE
阅读更多

最近学习了一段时间的ActiveMQ,apache的强劲的消息总线服务。学习过程参考了ActiveMQ in Action和whitesock的javaeye博客。使用消息中间件来进行消息传递的原理如下图

 

 

与大家分享2个最简单的消息通信的例子。生产者和消费者,发布者和订阅者

生产者

package cn.adcc.activemq.point2point;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TestProducer {
	private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private boolean transacted;//默认为false
    private String subject = "TOOL.DEFAULT";
    private Destination destination;//消息目的地
    private boolean persistent = true;
    
	public static void main(String[] args) throws JMSException {
		new TestProducer().run();
	}
	
	public void run() throws JMSException{
		Connection connection = null;
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
        connection = connectionFactory.createConnection();
        connection.start();
        
        // Create the session
        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        
        // Create the producer.
        MessageProducer producer = session.createProducer(destination);
              
        TextMessage message = session.createTextMessage("hello");

        producer.send(message);
        System.exit(0);
	}
}

 

消费者

package cn.adcc.activemq.point2point;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TestConsumer implements MessageListener, ExceptionListener {
	 private String user = ActiveMQConnection.DEFAULT_USER;
	 private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	 private String url = ActiveMQConnection.DEFAULT_BROKER_URL; 
		 //ActiveMQConnection.DEFAULT_BROKER_URL;
	 private Session session;
	 private boolean transacted;
	 private int ackMode = Session.AUTO_ACKNOWLEDGE;
	 
	 private String subject = "TOOL.DEFAULT"; 
		 //"TOOL.DEFAULT";//主题
	 private Destination destination = null;
	 private MessageProducer replyProducer;
	 private boolean persistent = true;
	 
	//回调方法
	public void onMessage(Message message) {
		try {
			if (message instanceof TextMessage) {
				
				TextMessage txtMsg = (TextMessage) message;
				String msg = "";

				msg = txtMsg.getText();

				System.out.println("Received: " + msg);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		} 
	}

	public static void main(String[] args) {
		new TestConsumer().run();
	}
	
	public void run(){
		 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
         try {
			Connection connection = connectionFactory.createConnection();
			connection.setExceptionListener(this);
            connection.start();
            
            session = connection.createSession(transacted, ackMode);
            destination = session.createQueue(subject);
                        
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
         
	}

	@Override
	public void onException(JMSException arg0) {
		// TODO Auto-generated method stub
		
	}

}

 

发布者

package cn.adcc.activemq.publisher.mypackage;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicPublisher {
	private String url = "tcp://localhost:61616";
	private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String topicName = "testtopic";
	
	private void run() throws JMSException{
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,password,url);
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		Topic topic = session.createTopic(topicName);
		MessageProducer producer = session.createProducer(topic);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		Message message = session.createTextMessage("hello world1!");
		producer.send(message);
		System.exit(0);
	}
	
	public static void main(String[] args) throws JMSException{
		new TopicPublisher().run();
	}
	
}

 

订阅者

package cn.adcc.activemq.publisher.mypackage;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.RemoveInfo;

public class TopicSubsribe implements MessageListener{
	private String url = "tcp://localhost:61616";
	private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String topicName = "testtopic";
	
	public static void main(String[] args) throws JMSException{
		new TopicSubsribe().run();
	}
	
	private void run() throws JMSException{
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,password,url);
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createTopic(topicName);
	
		MessageConsumer consumer = session.createConsumer(destination);
		
		consumer.setMessageListener(this);
	}
	
	@Override
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			
			TextMessage txtMsg = (TextMessage) message;
			String msg = "";

			try {
				msg = txtMsg.getText();
				System.out.println("Received: " + msg);
			} catch (JMSException e) {
				e.printStackTrace();
			} 
			
		}
	}

}

 

分享到:
评论

相关推荐

    springboot-nettysocketio +netty+activeMq在线客服系统

    springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot +netty+activeMq在线客服系统springboot...

    spring 整合activemq实现自定义动态消息队列

    百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...

    zabbix 3.4 监控 Activemq 自动发现模板

    用zabbix 自动发现实现activemq 监控pending consumers activemq_scan.sh #!/bin/bash activemq() { MQ_IP=(10.10.11.208:8161) for g in ${MQ_IP[@]} do port=($(curl -uadmin:admin http://${g}/admin/queues.jsp...

    activemq-core-5.7.0-API文档-中文版.zip

    赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...

    apache-activemq-5.15.0-bin.tar.7z

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 特点: 1、支持多种...

    ActiveMQ高并发处理方案

    ActiveMQ高并发处理方案ActiveMQ高并发处理方案 超级字数补丁超级字数补丁

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    activeMQ收发工具.rar

    activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行

    MQ之ActiveMQ.mmap

    自己做的尚硅谷周阳老师ActiveMQ课程脑图,其中自己所用做案例的环境搭建都是基于docker与老师课程不一样。脑图内容涵盖视频的99%的笔记,含有自己编写的代码文件,外加了自己对一些问题的测试与回答。 消息中间件...

    ActiveMQ 之Spring结合实例

    包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。

    activemq-protobuf-1.1-API文档-中文版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    apache-activemq-5.11.2

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的...

    apache-activemq Linux版本

    apache-activemq Linux版本

    ActiveMQ部署方案分析对比

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...

    ActiveMQ in Action pdf英文版+源代码

    ActiveMQ in Action pdf英文原版加源代码压缩包。 Apache ActiveMQ in Action is a thorough, practical guide to implementing message-oriented systems in Java using ActiveMQ. The book lays out the core of ...

    activemq, Apache ActiveMQ镜像.zip

    activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-

    Spring集成ActiveMQ配置

    Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat

    activemq-protobuf-1.1-API文档-中英对照版.zip

    赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...

    springboot集成activemq实现消息接收demo

    springboot集成activemq实现消息接收demo

Global site tag (gtag.js) - Google Analytics