• rocketmq4.9.4 ubuntu环境启动过程


    当前教程属于最简单的启动并与java语言的联通教程。

    下载

    下载页面
    https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

    解压

    unzip rocketmq-all-4.9.4-bin-release.zip
    
    • 1

    按照等比例修改mq启动内存大小

    因为使用的虚拟机rocketmq默认启动内存申请的比较大,所以测试环境需要改小一点。

    # bin/runserver.sh
     71       JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
     76       JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    # bin/runbroker.sh
     85 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
    
    # 上面两个文件,按照指定行号,修改制定位置。一定要等比例。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    启动rocketmq

    # 启动naming server
    bin/mqnamesrv
    
    # 启动broker
    bin/mqbroker -n localhost:9876
    
    • 1
    • 2
    • 3
    • 4
    • 5

    引入依赖到pom文件

    		<dependency>
    			<groupId>org.apache.rocketmqgroupId>
    			<artifactId>rocketmq-clientartifactId>
    			<version>4.9.4version>
    		dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    java代码product端

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class TestRocketProduct {
        public final static String WODE_TOPIC = "rockjiangtopic1";
    
        public static void main(String[] args){
            try {
    // 创建生产者对象,指明了生产者组名
                DefaultMQProducer producer = new DefaultMQProducer("simple");
                // 设置服务器地址
                producer.setNamesrvAddr("192.168.3.128:9876");
                // 启动实例
                producer.start();
    
                for (int i = 0; i < 3; i++) {
                    String str = "Hello RocketMQ";
                    // 实例化消息对象
                    Message message = new Message(WODE_TOPIC, "tagA", (str + i).getBytes());
                    // 发送消息
                    SendResult sendResult = producer.send(message);
                    System.out.printf("%s%n", sendResult);
                }
                // 关闭生产者
                producer.shutdown();
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    java代码consumer端

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class TestRocketConsumer {
        public static void main(String[] args){
            try {
                //创建一个消息接收对象consumer
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    
                //设定接收消息的命名服务器地址---获取到消息服务器ip
                consumer.setNamesrvAddr("192.168.3.128:9876");
    
                //设置接收消息对应的topic,对应的sub标签为任意*,之前producer没有指定tag。如果producer发送的消息指定了tag,那么也必须指定相应的tag
                consumer.subscribe(TestRocketProduct.WODE_TOPIC, "*");
    
                //开启监听,用于接收消息
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        //遍历接收到的消息
                        for (MessageExt msg : list) {
                            System.out.println("msg = " + msg);
                            System.out.println("消息为:" + new String(msg.getBody()));
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
    
                //启动消息接收服务
                consumer.start();
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
  • 相关阅读:
    ARM接口编程—IIC总线(exynos 4412平台)
    企业实施MES系统的关键点详解
    C语言基础4:数组(一、二维数组的初识化、创建与存储、数组越界、冒泡排序、三子棋、扫雷实现)
    Cadence OrCAD Could not check the CaptureCIS feature licence
    23.1 Bootstrap 表格
    Mybatis完整版详解
    第03章_基本的SELECT语句
    Java collections framework
    【剑指Offer】41.数据流中的中位数
    HIVE基本查询操作(二)——第3关:Hive抽样查询
  • 原文地址:https://blog.csdn.net/jl19861101/article/details/127670349