这文章你就读吧,越读越🥸,一读一个不吱声

可靠的🐰警官:rabbitMQ,功能全面,不丢数据,体量小,容易堆积
- Map<String,Object> params = new HashMap<>();
- params.put("x-queue-type","stream");
- params.put("x-max-length-bytes", 20_000_000_000L); //日志文件的最大字节数 maximum stream size:20 GB
- params.put("x-stream-max-segment-size-bytes", 100_000_000); //每一个日志文件的最大大小 size of segment files: 100 MB
- channel.queueDeclare(QUEUE_NAME, true, false, false, params);
queue持久化不代表消息持久化,消息要看消息的:basicPublish
单队列持久化 重启之后 消息会离你而去,单消息持久化 重启之后 消息早跑了 妥妥的
exchange与queue绑定关系
- void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
- void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
- void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
- //对应页面上的Properties部分,传入一些预定的参数值。
- builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());//持久化
- builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());//持久化
- //builder.headers(headers);对应页面上的Headers部分。传入自定义的参数值
- builder.build()
- AMQP.BasicProperties prop = builder.build();
- public BasicProperties(
- String contentType,//消息类型如:text/plain
- String contentEncoding,//编码
- Map<String,Object> headers,
- Integer deliveryMode,//1:nonpersistent不持久化 2:persistent持久化
- Integer priority,//优先级
- String correlationId,
- String replyTo,//反馈队列
- String expiration,//expiration到期时间
- String messageId,
- Date timestamp,
- String type,
- String userId,
- String appId,
- String clusterId)
consumer消费消息
- String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback
- cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws
- IOException;
- Callback,可实现handle方法
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);
autoAck=true,消息被consumer消费成功后,无法再消费,一次性的 懂?
autoAck=false,需手动调channel.basicAck(),不凋则重复消费 资源多就是这么豪横
他又来了 真的是 大水冲了龙王庙 不是一家人不进一家门
一,pro 送一个 msg 到que,不需要exchange,con按que 消费
二,pro发送msg到que,多con消费同一个队列queue
- // 创建连接和通道
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- // 声明交换器
- String exchangeName = "publishSubscribeExchange";
- channel.exchangeDeclare(exchangeName, "fanout");
-
- // 创建随机队列并绑定到交换器
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, exchangeName, "");
-
- // 发送消息
- String message = "Hello, RabbitMQ!";
- channel.basicPublish(exchangeName, "", null, message.getBytes("UTF-8"));
- System.out.println("Sent message: " + message);
-
- // 接收消息
- boolean autoAck = true;
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("Received message: " + message);
- }
- };
- channel.basicConsume(queueName, autoAck, consumer);