2018年12月08日 23:01
原创作品,转载时请务必以超链接形式标明文章原始出处,否则将追究法律责任。

Html5, 云计算,移动互联网,大数据你知道多少,如果不知道多少,请抓紧时间学习。

今天要说的是消息队列ActiveMQ,这个和cassandra一样也是apache的产品,开源的,支持很多客户端,比如java,C#,ruby,python等。ActiveMQ 是一个完全支持JMS和J2EE规范的 JMS Provider实现。在介绍jms之前,先要介绍一下它的规范,俗话说无规矩不成方圆,就像序列化和反序列化一样,大家要遵循一定的规则才能交互。

JMS的基本构件有

(1).连接工厂, 是客户端用来创建连接的对象,ActiveMQConnectionFactory类。

(2). 连接,JMS Connection封装了客户端和JMS提供者之间的一个虚拟连接,ActiveMQConnectio类。

(3). 会话,JMS Session是生产者和消费者之间Produce message和Consume message的上下文。

会话用于创建消息的生产者,消费者和消息。这个上下文保持了会话的事务性,即发送消息和接受消息被组合到了一个事务中。

(4). 目的地,这个目的地表明了生产消息的目标和消费消息的目标,Destination对象。

JMS规范中定义了两种消息传递域,一种是点对点,一种是发布/订阅。点对点的特点是一个消息只能有一个消费者,消费者和生产者之间没有时间上的相关性,无论消费者在生产者发送消息的时候是否是出于运行状态,他都可以提取到消息。订阅发布模式的特点是一个生产者可以有多个消费者,这种模式存在时间上的相关性,消费者只能消费自从他订阅之后的消息,之前的消息是不能消费的。但是JMS规范允许客户创建持久订阅,即使是在生产者在消费者发送消息的时候处于一个未激活的状态,等他激活以后,依然可以消费未激活之前生产者发送的消息

(5).生产者,是由会话创建的一个对象,它主要职责是将消息发送到目的地

(6).消费者,是由会话创建的一个对象,它主要是接收生产者发送到目的地的消息,消费者可以同步消费和异步消费。


OK,概念就先讲到这里,我们看一下环境

首先从apache 网站上下载ActiveMQ linux版本

activeMQ.PNG

在这里我下载的版本是5.8.0。OK我们把它放到FTP Server上,在CentOS上拷贝至tmp文件夹。

image.png

ok我们用命令解压

image.png

解压完成后,直接运行启动脚本

[root@bogon opt]# ls
apache-activemq-5.8.0  apache-activemq-5.8.0-bin.tar.gz
[root@bogon opt]# cd apache-activemq-5.8.0
[root@bogon apache-activemq-5.8.0]# ls
activemq-all-5.8.0.jar  docs     NOTICE           webapps-demo
bin                     example  README.txt       WebConsole-README.txt
conf                    lib      user-guide.html
data                    LICENSE  webapps
[root@bogon apache-activemq-5.8.0]# cd bin
[root@bogon bin]# ls
activemq        activemq.jar  linux-x86-32  macosx
activemq-admin  diag          linux-x86-64  wrapper.jar
[root@bogon bin]# sh activemq

我们发现如果没有配置java环境变量,就会报错。

image.png

去官网下载一个jdk安装一下,版本不要太高,现在已经收费了。

image.png

下载下来以后我们进行安装。

image.png

右键选择Open with "Software Installer",开始安装。

image.png

我们继续点击Apply,如果提示什么可能damage你的电脑之类,选择Intsall anyway。

image.png

安装成功之后,我们配置一下环境变量,首先我们打开etc/profile文件,然后加上java环境配置,修改profile文件之气那确保你有权限进行修改。先切换到root用户,su root,然后设置文件可修改权限。

JAVA_HOME=/usr/java/jdk1.7.0_21

PATH=$JAVA_HOME/bin:$PATH

CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME

export PATH

export CLASSPATH

配置好之后保存,然后我们测试一下,输入java -version,如下就表示环境变量配置成功。

[root@localhost etc]# java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) Client VM (build 25.191-b12, mixed mode, sharing)
[root@localhost etc]#


OK,接下来开始启动我们的ActiveMQ,输入命sh activemq,到这一步启动成功

image.png

我们看一下它的管理界面,默认端口是8161,在conf目录下的jetty.xml中

<property name="connectors">
            <list>
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
                    <property name="port" value="8161" />
                </bean>
                <!--
                    Enable this connector if you wish to use https with web console
                -->
                <!--
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector">
                    <property name="port" value="8162" />
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" />
                    <property name="password" value="password" />
                </bean>
                -->
            </list>
        </property>

OK,我们在浏览器中输入Http://localhost:8161/admin,提示输入用户名和密码,用户名:admin,密码admin,

image.png

这个用户名和密码可以在conf/jetty-realm.properties文件中去修改。

image.png

OK,管理界面如下

image.png

因为哥一直做.net开发,所以还是以.net来做个demo,来做个消息的发送可接收。首先需要下载.net客户端。去网站上找到下载.net 客户端的地方

捕获.PNG

在这里我下载的是1.5.3版本,因为1.6.0版本连接有问题。下载下来之后,我们使用.net 4.0版本

image.png

OK,新建一个solution,创建两个WinForm的工程,如下

image.png

注意要引用上面的两个dll,这两个dll分别在build和lib下

image.png

我们看一下代码,没什么好解释的,先看生产者代码

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Windows.Forms;

namespace ActiveMQSender
{
    public partial class FrmMsgSender : Form
    {
        IConnectionFactory factory;
        IConnection connection;
        ISession session;
        IDestination destination;
        IMessageProducer prod;
        ITextMessage streamMessage;

        ~FrmMsgSender()
        {
            session.Close();
            connection.Close();
        }

        public FrmMsgSender()
        {
            InitializeComponent();
            this.Init();
        }

        private void Init()
        {
            factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
            connection = factory.CreateConnection("admin", "admin");
            session = connection.CreateSession();
            destination = new ActiveMQTopic("Bruce Test");
            prod = session.CreateProducer(destination);
            streamMessage = prod.CreateTextMessage();
        }

        private void btnSend_Click(object sender, EventArgs e)
        {
            streamMessage.Text = txtSendMsg.Text.Trim();
            prod.Send(streamMessage, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
        }
    }
}

再看消费者代码

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using System;
using System.Windows.Forms;

namespace FrmMsgReceiver
{
    public partial class FrmMsgReceiver : Form
    {
        IConnectionFactory factory;
        IConnection connection;
        ISession session;
        IDestination destination;
        IMessageConsumer consumer;

        ~FrmMsgReceiver()
        {
            session.Close();
            connection.Close();
        }

        public FrmMsgReceiver()
        {
            InitializeComponent();
        }

        private void Init()
        {
            factory = new ConnectionFactory("tcp://192.168.192.128:61616/");
            connection = factory.CreateConnection("admin", "admin");
            session = connection.CreateSession();
            connection.Start();
            destination = new ActiveMQTopic("Bruce Test");
            consumer = session.CreateConsumer(destination);
        }

        private void FrmMsgReceiver1_Load(object sender, EventArgs e)
        {
            this.Init();
            consumer.Listener += new MessageListener(consumer_Listener);
        }

        private void consumer_Listener(IMessage message)
        {
            txtReceiveMsg.Text = ((ITextMessage)(message)).Text;
        }

    }
}

OK,接下来我们就测试一下这个程序,新建名称为Bruce Test的消息队列。跑我们的winform程序,结果如下。

image.png

接下来我们看一下winform的执行效果,先发送一条消息

image.png

然后消费者再消费一下

image.png

关闭消费端Message Receiver,再看admin界面,我们发现有消费记录,并且没有了消费者。

image.png

OK,这是最简单的点对点消费,后续会有群体消费案例,如果你觉得文章好,请打赏。


如果大家有兴趣的话,看一下这篇文章

http://blog.csdn.net/bodybo/article/details/5647968。

发表评论
匿名  
用户评论

匿名游客

2018年12月09日 16:02
楼主写的真不错,我正需要这样的产品,再次感谢楼主的技术文章。

匿名游客

2018年12月08日 23:16
ActiveMQ 不错,做异步消息处理非常有用,我很喜欢,感谢博主分享。