本文共 8121 字,大约阅读时间需要 27 分钟。
-----------------ActiveMQ----------------- 一、ActiveMQ核心概念 1、ActiveMQ是消息队列技术,为解决高并发问题而生! 2、ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统) 有中间平台 3、ActiveMQ支持两种消息传输方式 1)Queue,队列模式,生产者生产了一个消息,只能由一个消费者进行消费 2)Topic,发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费 参考"ActiveMQ补充.docx" 二、ActiveMQ消息队列安装使用 1、官网:http://activemq.apache.org/ 2、安装成功访问地址:http://localhost:8161用户名和密码都是admin 三、ActiveMQ整合Spring实现生产者 1、activeMQ_spring工程的pom.xml导入坐标 spring-context spring-test junit activemq-all spring-jms 2、src/main/resources/applicationContext-mq.xml配置生产者 <!-- 扫描包 --> <context:component-scan base-package="cn.itcast.activemq" /> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 queue --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) topic --> <property name="pubSubDomain" value="true" /> </bean> <!--Spring JmsTemplate 的消息生产者 end--> 3、写生产者类 1)基于Topic模式的生产者类,发布/订阅模式 @Service public class TopicSender { // 注入jmsTemplate @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String topicName, final String message) { jmsTemplate.send(topicName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } } 2)基于Queue模式的生产者类,队列模式 @Service public class QueueSender { // 注入jmsTemplate @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } } 4、src/test/java/cn/itcast/activemq/producer/test下写测试类ProducerTest @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") public class ProducerTest { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; @Test public void testSendMessage() { for (int i = 0; i < 200; i++) { queueSender.send("spring_queue", "queue你好"); topicSender.send("spring_topic", "topic你好"); } } } 四、ActiveMQ整合Spring实现消费者 1、配置/src/main/resources/applicationContext-mq-consumer.xml <!-- 扫描包 --> <context:component-scan base-package="cn.itcast.activemq.consumer" /> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默认注册bean名称,应该是类名首字母小写 --> <jms:listener destination="spring_queue" ref="abc1"/> <jms:listener destination="spring_queue" ref="queueConsumer2"/> <jms:listener destination="spring_queue" ref="queueConsumer3"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2"/> </jms:listener-container> <!-- 消息消费者 end --> 2、写消费者类 1)对应Queue模式的消费者类QueueConsumer1、QueueConsumer2 @Service("abc1") //这里与<jms:listener destination="spring_queue" ref="abc1"/>的ref对应,默认名为类名首字母小写,即ref="queueConsumer1" public class QueueConsumer1 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消费者QueueConsumer1获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } 2)对应Topic模式的消费者类TopicConsumer1、TopicConsumer2 @Service public class TopicConsumer1 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消费者TopicConsumer1获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } 3、src/test/java/cn/itcast/activemq/producer/test下写测试类ConsumerTest @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml") public class ConsumerTest { @Test public void testConsumerMessage() { while (true) { // junit退出,防止进程死掉 } } } 五、生产者消费者开发注意点(重点) 1)中间平台brokerURL,生产者和消费者配置要一样,连接同一个平台 2)destination生产者和消费者配置要一样 3)Queue的生产者与Queue的消费者对应、Topic的生产者与Topic的消费者对应 4)生产者和消费者配置对比 生产者 a.包扫描,扫描到生产者那个类 b.mq连接工厂 c.spring封装mq连接工厂 d.提供模板,关联spring封装mq连接工厂 <property name="pubSubDomain" value="false" /> false队列模式queue true订阅模式topic 消费者 a.包扫描,扫描到消费者那个类 b.mq连接工厂 c.spring封装mq连接工厂 d.配置监听 关联spring封装mq连接工厂 <jms:listener destination="与生产者发起的消息名字对应" ref="与消费者类上的@Service的名字对应"/> 六、基于ActiveMQ完成发短信 在common_parent的pom.xml导入坐标activemq-all 1)bos_fore作为生产者,发消息 a.配置生产者bos_fore/src/main/resources/applicationContext-mq.xml b.注入生产者模板,Queue模式 @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; c.发送消息 jmsTemplate.send("bos_sms111", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("telephone2", model.getTelephone()); mapMessage.setString("username2", model.getUsername()); mapMessage.setString("randomCode2",randomCode); return mapMessage; } }); 2)新建war工程bos_sms作为消费者,收消息 a.建war工程,继承于common_parent b.配置web.xml,Spring配置文件位置及Spring的核心监听器 c.配置bos_sms/src/main/resources/applicationContext.xml 注意包扫描 <context:component-scan base-package="cn.itcast.bos.mq" /> 注意brokerURL和生产者一致 注意destination="bos_sms111"与生产者一致 d.编写消费者类 //与消费者配置的<jms:listener destination="bos_sms111" ref="smsConsumer123"/>的ref对应 @Service("smsConsumer123") //@Service不写后面的名字,则默认ref="smsConsumer" public class SmsConsumer implements MessageListener { @Override public void onMessage(Message message) { MapMessage mapMessage = (MapMessage) message; SendSmsResponse response = null; try { System.out.println("消费者:" + mapMessage.getString("telephone2") + " " + mapMessage.getString("username2") + " " + mapMessage.getString("randomCode2")); response = AliSmsUtils.sendSms( mapMessage.getString("telephone2"), mapMessage.getString("username2"), mapMessage.getString("randomCode2")); if (response.getCode() != null && response.getCode().equals("OK")) { System.out.println("发送短信成功!"); } } catch (Exception e) { // 发送失败 throw new RuntimeException("短信发送失败="+response); } } }转载地址:http://palti.baihongyu.com/