JMS和ActiveMQ介绍

消息中间件

Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messagesbetweendistributed systems.

MOM能做什么? 收消息和发消息,本质上是通信问题。最直接的通信方式是源端→目的端的两点通信,消息中间件的产生使得通信方式发生转变,变成了源端→消息中间件→目的端的三点通信。

用消息中间件进行收/发消息,关键在于分布式。三个核心特性

  • 解耦: 各系统间松耦合,弹性伸缩

  • 异步: 异步消息

  • 缓冲: 实现消息缓冲,转存储,流量削峰

这三个特性对于大型分布式系统实现高可用、高性能、可扩展具有极其重要的意义。

从JMS说起

谈到Java系的消息中间件,避不开JMS规范。JMS(Java message service)是一种面向消息中间件的API规范, 抽象了应用与MOM间的交互行为,类似于JDBC在应用和不同数据库间的角色。如老牌的ActiveMQ就是一种实现了JMS规范的消息中间件产品。 JMS解决了消息中间件的兼容问题,且极大地简化了Java程序员的编程难度,是程序员的福音,哈哈哈。

JMS

消息结构

JMS规定消息由以下三部分组成:

  • 消息头:包含了路由和识别消息的元数据。

  • 消息属性:除了头字段定义的信息以外,Message 接口包含了内置的设置来支持属性值。因而,这就提供了一种为消息增加可选头信息的机制。通过消息选择器(Selector)可以使客户端让JMS提供者按照应用指定的规则选择消息。

  • 消息体:JMS定义的消息类型很丰富,有以下五种:

TextMessage:包含了一个java.lang.String。

MapMessage:包含了一些列的name-value对,name是String,value是Java primitive类型。消息体中的条目可以被enumerator按照顺序访问,也可以自由访问。

BytesMessage:包含一个不间断的字节流。

StreamMessage:包含了一个Java primitive流对象,这个流可以顺序读取和填充。

ObjectMessage:包含了一个可序列化的Java对象。

消息模型

JMS规定了两种消息模型:

  • Point-to-Point(P2P),点对点模型

  • Publish/Subscribe(Pub/Sub),发布订阅模型

P2P模型中,关注发送者(Sender)、消息队列(Queue)、接收者(Receiver)这三个概念。每个消息都被Sender发送到一个特定的Queue,Receiver从队列中获取消息。队列保留着消息,直到它们被消费或超时。PTP是生产一个消息,一人享用,即一对一的方案。

在Pub/Sub模型中,关注主题(Topic)、发布者(Publisher)、订阅者(Subscriber)这三个概念。多个Subscriber订阅Topic,一旦有Publisher将消息发送到该Topic,系统将这些消息传递给订阅该Topic的Subscriber。 Pub/Sub是生产一个消息,多人享用,即一对多的方案。

同步和异步

JMS规定生产消息和消费消息都支持同步和异步的方式。

生产消息

JMS规范支持同步方式或者异步方式向broker发送消息。 使用何种方式对send方法的延迟有巨大的影响。对于生产者来说,既然延迟是决定吞吐量的重要因素,那么使用异步发送方式会极大地提高系统的性能。

消费消息

同步消费消息时,消费者通过调用receive方法从broker中主动拉取消息,receive方法可以一直阻塞到消息到达。异步消费消息时,消费者注册一个消息监听器以定义在消息到达时所采取的动作。异步消费要比同步消费的时延更小,响应更快。

持久化方式

JMS针对Pub/Sub 支持以下两种消息持久化方式:

  • 持久化(persistent) 持久化方式用于严格的不能丢失消息的场合。Broker收到消息后先将消息持久化到磁盘中,在对生产者进行确认后再分发给消费者。当消息被消费者消费后,会对broker发出确认,broker收到消费者的确认后才会从磁盘中删除消息。这是非常可靠的方式,即使遭遇到系统奔溃,broker重启后也可以从磁盘中恢复所有未被消费的消息。

  • 非持久化(non-persistent) 非持久化方式用于可以允许偶尔丢失消息的场合。消息不需要保存到本地磁盘,消息存储和分发全部是在内存中进行的,处理速度比persistent方式要快很多,但是当broker系统崩溃时,内存的消息会丢失。

根据使用场景对性能和消息可靠性的要求,权衡选择持久化方式,至关重要。

优先级

JMSPriority头属性包含了消息的优先级。JMS定义从0级到9级的十级优先级,优先考虑0-4为正常优先级, 5-9为高优先级。JMS不要求JMS规范实现者严格实现消息的优先级顺序,但是应该尽最大努力(Best Effort)优先于正常消息投递加急消息。JMS支持消息优先级的另外一种可行策略是:JMS Selector,利用JMSPriority对消费者端的消息进行过滤。

编程模型

JMS 1.0定义了两个域相关的API,一个用于点对点的消息处理(queue),一个用于发布订阅的消息处理(topic)。JMS 1.1引入了一个新的统一的一组API,可以同时用于点对点和发布订阅消息模式。JMS 2.0引入了一组简化API,它拥有传统API的全部特性,同时接口更少、使用更方便。所有的接口都在javaxjms包下。

我们以JMS 1.1传统API为例,看看JMS的编程模型。传统API提供的主要接口如下:

ConnectionFactory:客户端用来创建连接的受管对象。

Connection:客户端到JMS提供者之间的活动连接。

Session:发送和接收消息的一个单线程上下文。

MessageProducer:由Session创建的对象,用于发送消息到Queue或Topic。

MessageConsumer:由Session创建的对象,用于接收Queue或Topic中的消息。

Destination:用来指定生产的消息的目标和消费的消息来源的对象。

activemq

基本元素的关系可理解为:

ConnectionFactory → Connection → Session → Message

Destination + Session → MessageProducer

Destination + Session → MessageProducer

ActiveMQ实践

Apache ActiveMQ是完全支持JMS规范的老牌消息中间件,是业界最广泛使用的性能强劲的开源消息总线。 我们以AMQ为例,看看JMS规范些的消息中间件怎么玩的。

生产消息

见如下代码,诸多开关放在成员变量里。

public class ProducerTest {
private static String uri = " ip+ port";
private static String user = "";
private static String password = "";
private static String subject = "TEST.FOO";
private static boolean topic = false;
private static boolean transacted = false;
private static boolean persistent = true;
private static int messageCount = 2000;
private static int messageSize = 100;
private static Connection connection;
private static long messageSend = 0;

public static void main(String[] args) {
    Destination destination = null;

    try {
        // Create connection
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri);
        connectionFactory.setProducerWindowSize(16384);
        connection = connectionFactory.createConnection();
        connection.start();
        if (connection == null) {
            System.out.println("can't create connection!");
            System.exit(-1);
        }
        System.out.println("create connection!" + connection);
        // Create Session
        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
        // Create Topic/Queue
        if (topic){
            destination = session.createTopic(subject);
        }else{
            destination = session.createQueue(subject);
        }
        // Create message producer
        MessageProducer producer = session.createProducer(destination);
        //PERSISTENT or NON_PERSISTENT
        if (persistent){
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        }else{
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
        //publish messages
        for(int i = 0; i < messageCount; i++) {
            TextMessage textMessage = session.createTextMessage(createMessage(i));
            System.out.println("send message[" + i + "]");
            producer.send(textMessage);
            messageSend++;
        }

    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
            connection.close();
    }

    private static String createMessage(int index) {
        StringBuffer buffer = new StringBuffer("Message[" + index + "]");
        if (buffer.length() >= messageSize)
        return buffer.substring(0, messageSize);
        for (int i = buffer.length(); i < messageSize; i++)
        buffer.append(i);
        return buffer.toString();
    }
}

消费消息

见如下代码,诸多开关放在成员变量里。

public class ConsumerTest implements MessageListener, Runnable {
private MessageProducer replyProducer;
private Session session;
static final int messageSize = 100;
private String uri = "ip+port";
private String user = "";
private String password = "";
private String subject = "TEST.FOO";
private boolean topic = false;
private boolean durable = true;
private boolean transacted = false;
private int messageCount = 2000;
private int sleepTime = 0;
private boolean syncReceive = false;
private long messageReceive = 0;

public static void main(String[] args) {
    new ConsumerTest().run();

    //覆写“onMessage”方法
    @Override
    public void onMessage(Message message) {

        messageReceive++;

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage)message;
            try {
                if (textMessage.getText().length() <= 50) {
                System.out.println("recieve Message: " + textMessage.getText());
                } else {
                String string = textMessage.getText().substring(0, 50);
                System.out.println("recieve Message: " + string + " ...");
                }
            //message.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("recieve Message " + message);
        }

        try {
            if(message.getJMSReplyTo() != null) {
            replyProducer.send(message.getJMSReplyTo(),
            session.createTextMessage("Reply: " + message.getJMSCorrelationID()));
        }
        } catch (JMSException e) {
            e.printStackTrace();
        }

        try {
        //simulate a slow consumer
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        Connection connection = null;
        Destination destination = null;
        MessageConsumer consumer = null;
        try {
            // create Connection
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri);
            connection = connectionFactory.createConnection();
            connection.setClientID("gsw-win7");
            connection.start();
            // Create Session
            session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
            // Create Topic/Queue
            if (topic){
                destination = session.createTopic(subject);
            }else{
                destination = session.createQueue(subject);
            }
            replyProducer = session.createProducer(null);
            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            if (durable && topic){
                consumer = session.createDurableSubscriber((Topic)destination, "myDurableSubName");
            }else{
                consumer = session.createConsumer(destination);
            }
            //sync receive
            if (syncReceive && messageCount > 0) {
                for(int i = 0; i < messageCount; i++) {
                Message message = consumer.receive(1000);
                if (message != null) {
                onMessage(message);
                }
                }

                System.out.println("Consumer " + messageReceive + " messages. Close connection!");
                consumer.close();
                session.close();
                connection.close();
            } else { //async receive
                consumer.setMessageListener(this);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
文章目录
  1. 消息中间件
  2. 从JMS说起
  3. JMS
    1. 消息结构
    2. 消息模型
    3. 同步和异步
    4. 生产消息
    5. 消费消息
    6. 持久化方式
    7. 优先级
    8. 编程模型
  4. ActiveMQ实践
    1. 生产消息
    2. 消费消息