RocketMQ 系列 生产者

刨析rocketMQ的底层实现

Posted by lichao modified on June 8, 2020

同步发送原理

RocketMQ 使用 Netty 进行发送,Netty 通讯默认都是异步的,那么同步是怎么实现的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        // 1、定义一个 ResponseFuture 来处理响应结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    // 2、通过 Netty 执行完成后回调处理请求的结果
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                // 唤醒阻塞的线程
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 3、请求结果默认等待请求3秒钟,如果超过3秒则抛出异常。
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

1、定义一个 ResponseFuture 来处理响应结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ResponseFuture {
    //省略其它代码
    ......
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }
}

ResponseFuture 内部使用了 CountDownLatch 来实现的。当调用 waitResponse() 方法时阻塞当前线程,当返回结果时调用 putResponse() 方法存放结果,然后执行 this.countDownLatch.countDown() 唤醒阻塞的线程。

2、通过 Netty 执行完成后回调处理请求的结果

使用 Netty 进行发送消息,当 Netty 收到结果后会执行自定义的 ChannelFutureListener.operationComplete() 方法。 如果执行完成,调用 responseFuture.putResponse(null); 立即唤醒阻塞的线程,处理请求结果。

3、默认最长等待请求3秒钟,如果超过3秒则抛出异常

调用 responseFuture.waitResponse(timeoutMillis) 方法阻塞等待 Netty 返回结果。默认最长等待时间为 3 秒,如果超过 3 秒则认为调用超时,抛出 RemotingSendRequestException 异常信息。

异步发送原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    final InvokeCallback invokeCallback)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

    long beginStartTime = System.currentTimeMillis();
    final int opaque = request.getOpaque();
    //1、尝试获得 semaphore 信号量,semaphore 默认为65535。
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
        }
        // 2、定义 ResponseFuture 来处理响应的结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        // 存储到把 ResponseFuture  存储到 responseTable 中。
        this.responseTable.put(opaque, responseFuture);
        try {
            // 3、调用 Netty 发送数据
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    // 3.1、成功则设置 responseFuture 的状态为发送成功
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    // 3.2、发送失败则快速处理失败请求
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreAsync.getQueueLength(),
                    this.semaphoreAsync.availablePermits()
                );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

1、尝试获得 semaphore 信号量,semaphore 默认为 65535

1
2
public static final int CLIENT_ASYNC_SEMAPHORE_VALUE 
=Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));

异步发送请求的并发量,默认最大为65535。

2、定义 ResponseFuture 来处理响应的结果

ResponseFuture responseFuture 定义了发送数据响应的结果,同上面介绍的同步发送。 把responseFuture 存储到 responseTable 中。

1
2
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

opaque 为请求的唯一标识,每次请求创建一个新的(AtomicInteger 自增的)。

3、调用 Netty 发送数据

调用 Netty 异步发送数据。

  1. Netty 发送成功则设置 responseFuture 的状态为发送成功
  2. 发送失败则快速处理失败请求

快速失败是相对下面的定时任务扫描处理响应结果的

处理异步的响应结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//1、 定时间隔3秒钟扫描一次 responseTable
this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);


public void scanResponseTable() {
    final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
    Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<Integer, ResponseFuture> next = it.next();
        ResponseFuture rep = next.getValue();
        // 2、扫描开始执行时间大于 执行超时+1s 的 ResponseFuture 数据,并存放到 rfList 中
        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }
    
    // 3、执行处理完成或超时的请求
    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}
  1. 定时间隔3秒钟扫描一次 responseTable 当 Netty 客户端和服务端启动的时候,都会自动这个定时任务。定时的扫描 responseTable 的请求数据。每隔3秒扫描一次。

  2. 扫描开始执行时间大于 执行超时+1s 的 ResponseFuture 数据 Netty 请求的时候默认会有等待的执行超时时间(或可自己设置),超过超时时间的则认为任务超时,需要通过定时任务处理超时的任务。 异步执行完成的请求也会在定时任务中回调执行处理结果。

Oneway 发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

    request.markOnewayRPC();
    // 尝试获得 semaphore 信号量,semaphore 默认为65535。
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            // 2、调用 Netty 发送请求
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    once.release();
                    if (!f.isSuccess()) {
                        log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            once.release();
            log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                timeoutMillis,
                this.semaphoreOneway.getQueueLength(),
                this.semaphoreOneway.availablePermits()
            );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}

1、尝试获得 semaphore 信号量,semaphore 默认为65535

1
2
public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE 
= Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));

Oneway 方式发送请求的并发量,默认最大为65535。

2、调用 Netty 发送请求

这里只是发送数据,而没有处理响应结果。 这种方式发送数据,吞吐量更高,但不管数据是否发送成功。

问题

消息发送流程?

rocketMq通信协议是什么?

存储概览 RocketMQ 设计了自己的一个通信协议,用于消息内容和二进制格式之间的转换:

  1. length:4 字节整数,二三四部分长度总和;
  2. header length:4字节整数,第三部分header data长度;
    • 最高8为存储header data的协议
    • 底24位为数据长度
  3. header data:存放Json序列化的数据;
  4. body data:应用自定义二进制序列化的数据。

消息的编码过程

消息的编码是在 RemotingCommand 中 encode 方法中完成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ByteBuffer encode() {
    // 1> header length size
    int length = 4;
    // 2> header data length
    byte[] headerData = this.headerEncode();
    length += headerData.length;
    // 3> body data length
    if (this.body != null) {
        length += body.length;
    }
    ByteBuffer result = ByteBuffer.allocate(4 + length);
    // 1.先放入消息的总大小
    result.putInt(length);
    // 2.再放入头部的长度
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    // 3.接着放入头部数据
    result.put(headerData);
    // 4.最后放入消息体的数据
    if (this.body != null) {
        result.put(this.body);
    }
    result.flip();
    return result;
}

消息的解码过程

消息的解码是在类 RemotingCommand 中 decode方法中完成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    int length = byteBuffer.limit();// 获取byteBuffer的总长度
    int oriHeaderLen = byteBuffer.getInt();// 1.获取前4个字节,组装int类型,该长度为总长度 图中 length
    int headerLength = getHeaderLength(oriHeaderLen);// length & 0xFFFFFF 获取消息头的长度,与运算,编码时候的长度即为24位
    byte[] headerData = new byte[headerLength];// 保存header data
    byteBuffer.get(headerData);// 2.从缓冲区中读取headerLength个字节的数据,这个数据就是报文头部的数据
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
    int bodyLength = length - 4 - headerLength;// 报文体的数据,减去了第二、三部分的长度
    byte[] bodyData = null;
    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);// 获取消息体的数据
    }
    cmd.body = bodyData;
    return cmd;
}

rocketMQ 如何处理半包问题?

由于RocketMQ在消息头定义长度字段来标识消息总长度, 可用Netty提供的相应的解码器:LengthFieldBasedFrameDecoder

大多数协议(私有或公有),协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求的通用性,Netty提供了LengthFieldBasedFrameDecoder,自动屏蔽TCP底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决”读半包”问题。

LengthFieldBasedFrameDecoder构造函数:

1
2
3
4
5
6
7
8
9
10
public LengthFieldBasedFrameDecoder(ByteOrder byteOrder,  // 标示字节流表示的数据是大端还是小端,用于长度域的读取;
                                    int maxFrameLength, // 帧的最大长度,如果帧长度大于此值,抛异常;
                                    int lengthFieldOffset, // 长度域的偏移量
                                    int lengthFieldLength, // 长度域的长度;
                                    int lengthAdjustment, // 该字段加长度字段等于数据帧的长度,包括长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包;
                                    int initialBytesToStrip, // 从数据帧中跳过的字节数,表示获取完一个完整的数据包之后,忽略前面的指定的位数个字节,应用解码器拿到的就是不带长度域的数据包;
                                    boolean failFast // 如果为true,则表示读取到长度域,TA的值的超过maxFrameLength,就抛出一个 TooLongFrameException,而为false表示只有当真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出。
                                    ) {

   }

LengthFieldBasedFrameDecoder 定义了一个长度字段来表示消息的长度,因此能够处理可变长度的消息。将消息分为消息头和消息体,消息头固定位置增加一个表示长度的字段,通过长度字段来获取整包的信息。LengthFieldBasedFrameDecoder继承了ByteToMessageDecoder,即转换字节这样的工作是由ByteToMessageDecoder来完成,而LengthFieldBasedFrameDecoder只用安心完成他的解码工作就好。