技术成就梦想

代码如诗

原 activemq, rabbitmq demo [Mac OS X] 京东技术

2017年9月11日 未分类 0

 

 activemq:

下载 apache-activemq-5.11.1-bin.tar

解压 tar -xvf apache-activemq-5.11.1-bin.tar

cd bin/macosx

启动broker及console ./activemq [start | stop | restart]

访问console http://localhost:8161 [conf/jetty.xml] admin/admin登录

查看broker日志 data/activemq.log [conf/activemq.xml]

Producer:

public class Producer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i= 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("hello world! " + i);
            producer.send(message);
            System.out.println(message);
        }
        producer.close();
    }
}

 Consumer:

public class Consumer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println(message);
            }
        });

        connection.start();
    }
}

 测试时,也可通过代码启动broker:

public class BrokerStartup {
    // 启动win64/activemq.bat可启动broker和console
    public static void main(String[] args) throws Exception {
        // BrokerService broker =BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616"));
        BrokerService broker =new BrokerService();
        broker.setBrokerName("TestBroker"); // 如果启动多个Broker时,必须为Broker设置一个名称
        broker.addConnector("tcp://127.0.0.1:61616");
        broker.start();
    }
}

 

rabbitmq:

安装 drew install rabbitmq

chenhongxideMacBook-Pro:local javahongxi$ brew install rabbitmq
Warning: You are using OS X 10.12.
We do not provide support for this pre-release version.
You may encounter build failures or other breakage.
==> Downloading https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.3/rabbitm
Already downloaded: /Library/Caches/Homebrew/rabbitmq-3.5.3.tar.gz
==> /usr/bin/unzip -qq -j /usr/local/Cellar/rabbitmq/3.5.3/plugins/rabbitmq_mana
==> Caveats
Management Plugin enabled by default at http://localhost:15672


Bash completion has been installed to:
  /usr/local/etc/bash_completion.d

To have launchd start rabbitmq at login:
    ln -sfv /usr/local/opt/rabbitmq/*.plist ~/Library/LaunchAgents
Then to load rabbitmq now:
    launchctl load ~/Library/LaunchAgents/homebrew.mxcl.rabbitmq.plist
Or, if you don't want/need launchctl, you can just run:
    rabbitmq-server
.......
==> Summary
🍺  /usr/local/Cellar/rabbitmq/3.5.3: 1037 files, 28M, built in 3 seconds
chenhongxideMacBook-Pro:local javahongxi$ brew link rabbitmq
Linking /usr/local/Cellar/rabbitmq/3.5.3... 
Error: Could not symlink sbin/rabbitmq-defaults
/usr/local/sbin is not writable.

cd /usr/local/Cellar/rabbitmq/3.5.3/sbin

启动broker ./rabbitmq-server

chenhongxideMacBook-Pro:sbin javahongxi$ ./rabbitmq-server

              RabbitMQ 3.5.3. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

 看到completed with 10 plugins.标识启动OK。

Producer:

public class Producer {

    public static void main(String[] args) throws Exception {
        String queueName = "TestQueue";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

Consumer:

public class Consumer {

    public static void main(String[] args) throws Exception {
        String queueName = "TestQueue";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(queueName, false, false, false, null);
        System.out.println(" [*] Waiting for messages...");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

  https://github.com/javahongxi/whatsmars