消息中间件来解耦服务调用
比如1个登录系统,登录的话需要调用很多系统的其他服务,如果中间调用失败,可能会导致登录信息一致无法返回,同时也增加了系统的耦合度。而用消息中间件的话,则是不发送服务到其他系统,而是发送服务到消息中间件,发完消息就直接返回结果,完美。而消息中间件则是自己吧消息发送给那些服务。
什么是中间件
非底层操作系统软件。非业务应用软件,不是直接给用户的,不能给客户带来直接价值的软件叫中间件。
什么是消息中间件
关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。
什么是JMS
java消息服务即是JMS,是一个java平台中关于面向消息中间件的API,用于2个程序啊件进行异步通信。就是一个规范。
什么是AMQP
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。和JMS想对。
案例
简介
第一步,下载安装activemq,第二步,编写发送端和接收端。
下载后可以修改这个消息队列的配置文件,来修改持久化方式,有文件持久化,数据库持久化等。发送端发送消息给消息队列,接收端从消息队列中获取消息。有2个方式来消费消息,一个是队列方式,一个是主体模式。队列模式就是现有的消费者们平均或者按照一定规则消费队列里面的消息。主题模式则是每个消费者完整的消费消息队列里面的消息。
这里写了一个案例来显示
生产者代码
public class Producer {
public static final String url = “tcp://127.0.0.1:61616”;
public static final String queueName = “dh-test-queue”;
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageProducer mp = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
TextMessage textMessage = session.createTextMessage(“test” + i);
mp.send(textMessage);
System.out.println(“发送消息” + textMessage.getText());
}
connection.close();
}
这个是先根据url生成一个连接工厂,然后根据这个工厂类生成一个连接。然后根据这个连接创建一个session,然后根据这个session创建一个目的地,这个就是消息队列上你的消息队列的名字,然后根据session和这个目的地,创建一个消息发送者,就可以发送消息到消息队列了。
消费者代码
public class ConsumerApp {
public static final String url = “tcp://127.0.0.1:61616”;
public static final String queueName = “dh-test-queue”;
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
MessageConsumer mc= session.createConsumer(destination);
mc.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message)
TextMessage textMessage=(TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//connection.close();
}
前面代码都大致一样,后面不同的就是根据session和目的地生成一个消费者,然后用这个消费者调用一个监听器持续监听来自消息队列的消息,从而获取消息。
rabbitmq
生产者到消息队列确认机制,以及为何发生消息重复
定义几个概念:
A=生成者->消息队列,发送消息到消费者。
B=消息队列加入消息持久化,
C=消息队列->生产者。回复ack,
D=消息队列->消费者,发送消息到消费者。
E=消费者->消息队列,回复ack,
F=消息队列删除消息,持久化。
基本可以分为这几个步骤。
1.第一个是前3个回合。
生产端要维护一个消息发送的表,消息发送的时候记录消息id,这个消息id是消息队列自己生产的id,唯一。在消息成功落地broker磁盘并且进行回调确认(ack)的时候,根据本地消息表和回调确认的消息id进行对比,然后删除。如果这个表里面存在没有收到ack的消息,且超时了,那么生产者会再次发送。且消息id不变。
我们分析,前面3个步骤A,B,C无论哪个出了问题,生产者都没有收到ack,那么都会发送再次发送消息到消息队列,那么势必造成重复,但是消息队列对这个做了去重处理,根据消息id.
第二个回合就是后面3个回合了。
对于消息队列来说,必须收到ack,我才会进行进行删除,不然我会持续发送,但是存在一个情况就是E的时候网络断了,但是消息是确实消费了,消息队列却没有删除,再次消费,则会造成消息重复,这个需要我们自己去处理。
因此第一个回合消息是幂等的,第二个回合消息不是幂等的。
四种交换机
1.扇形交换机
广播模式,将消息发送到绑定到该交换机的所有队列上。
2.直连交换机
每个队列绑定到一个交换机上面,同时有个routing_key对应这个队列到交换机。同时消息也携带一个routing_key。交换机根据这个routing_key将消息指定到对应队列里面。如果希望一个消息发送到多个队列里面,那么就要发送多次.这里面的routing_key是写死的,比如某个队列绑定到交换机的routing_key=back,那么携带back的消息就会被路由分配到这个队列里面。
3.主题交换机
这个是上面直连交换机的加强版。不过routing_key加强了,可以用一部分规则了。规则如下。*:一个单词。#:任意单词。我们队列的routing_key=com.hehe.#,那么任何以com.hehe开头的消息都会被分配到这里。
4.首部交换机
首部交换机个人觉得类似于主题交换机,只不过规则变了,是基于hash了,队列和交换机绑定的时候声明一个hash数据结构,当消息发送的时候,会携带一个hash数组,然后进行匹配,如果匹配成功就会写入到相对应的队列。
持久化
为了保证rabbitMq的可靠性,保证消息的可靠性。需要将quene,exchange,message都持久化。
quene持久化就是将quene的持久化标示设置为true,当服务重启后,之前存在quene会被重洗放到rabbitmq里面。
message持久化:quene持久化,如果message不持久化,那么重启后quene里面的没消费的消息会丢失。设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。
exchange持久化:这个持久化与否没什么影响,但是建议持久化,不然重启后交换机就会丢失。
总结
大致了解了一些消息队列,以及做出了一个简单demo,目前没有用到,用到在细看。