`
yaozhiqiang109
  • 浏览: 117363 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

jms Spring+ActiveMQ 5.4.2

阅读更多

 

 

Java Message Service(JMS)是 SUN 提出的旨在统一各种 MOM 系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。 

 

JMS 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布/订阅模型),分别称作:PTP Domain 和Pub/Sub Domain.

 

PTP(使用Queue 即队列目标)     消息从一个生产者传送至一个消费者.在此传送模型中,目标是一个队列。

 

每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者.

 

Pub/Sub(使用 Topic即主题目标)消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。每个消息可以发送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示消费者已向主题目标进行注册,

但在消息传送时此消费者可以处于非活动状态。当此消费者再次处于活动状态时,它将接收此信息。如果没有已经向主题目标注册的消费者,主题不保留其接收到的消息,除非有非活动消费者注册了持久订阅。

 

贴下发送和接收消息的xml配置,完整的代码从附件下载

 

发送消息xml配置

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://activemq.apache.org/schema/core
		http://activemq.apache.org/schema/core/activemq-core-5.4.2.xsd" default-autowire="byName">

	<!-- 连接ActiveMQ -->
	<amq:connectionFactory id="defaultFactory" brokerURL="tcp://127.0.0.1:61616"></amq:connectionFactory>

	<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="defaultFactory"></property>
		<property name="sessionCacheSize" value="10"></property>
	</bean>
		
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="cachingConnectionFactory"></property>
		<property name="explicitQosEnabled" value="true" /><!-- 使 deliveryMode, priority, timeToLive设置生效-->
		<property name="deliveryPersistent" value="false" />
	</bean>
	<!-- 队列发送消息 -->
	<bean id="ptpMessageProducer" class="com.jms.producer.PtpMessageProducer">
		<property name="jmsTemplate" ref="jmsTemplate"></property>
		<property name="queue" ref="ptpQueue"></property>
	</bean>
	<!-- 主题发送消息 -->
	<bean id="publisherMessageProducer" class="com.jms.producer.PublisherMessageProducer">
		<property name="jmsTemplate" ref="jmsTemplate"></property>
		<property name="topic" ref="topicQueue"></property>
	</bean>
	
	<amq:queue name="ptpQueue" physicalName="ptp.queue.message"></amq:queue>
	
	<amq:topic name="topicQueue" physicalName="topic.queue.message"></amq:topic>
	
</beans>

 接收消息xml配置

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:amq="http://activemq.apache.org/schema/core"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://activemq.apache.org/schema/core
		http://activemq.apache.org/schema/core/activemq-core-5.4.2.xsd" default-autowire="byName">
	<description>jms 配置</description>

	<amq:connectionFactory id="defaultFactory" brokerURL="tcp://127.0.0.1:61616"></amq:connectionFactory>
											 
	<bean id="cacheConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="defaultFactory"></property>
		<property name="sessionCacheSize" value="10"></property>
	</bean>
	   <!--异步调用消息 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="cacheConnectionFactory"></property>
		<property name="destination" ref="ptpQueue"></property>
		<!--<property name="destination" ref="topicQueue"></property>-->
		<property name="messageListener" ref="jmsReciveListener"></property>
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
	</bean>
	<!-- 主题 -->
	<amq:topic id="topicQueue" physicalName="topic.queue.message"></amq:topic>
	<!-- 队列 -->
	<amq:queue id="ptpQueue" physicalName="ptp.queue.message"></amq:queue>														
	
	<bean id="jmsReciveListener" class="com.jms.receive.JmsReceiveListener"></bean>
	
</beans>

 

 

 private Destination queue ;
 private final ExecutorService threadPool = Executors.newFixedThreadPool(2);

public void sendMessage(final Object obj){
		Runnable task=new Runnable(){

			@Override
			public void run() {
				try {
					jmsTemplate.send(queue,  new MessageCreator(){

						@Override
						public Message createMessage(Session session) throws JMSException {
							ObjectMessage message = session.createObjectMessage();
				                	message.setObject(obj);
								return message;
						}
						
					});
				}catch(Exception e){
					logger.error("异常",e);
				}
			}
			
		};
		threadPool.execute(task);
	}

       public Destination getQueue() {
		return queue;
	}


	public void setQueue(Destination queue) {
		this.queue = queue;
	}

 

 

1
0
分享到:
评论
1 楼 solomon 2012-03-12  
  学习了。。。

相关推荐

Global site tag (gtag.js) - Google Analytics