项目作者: sofastack

项目描述 :
The Node.js implementation of the SOFABolt protocol
高级语言: JavaScript
项目地址: git://github.com/sofastack/sofa-bolt-node.git
创建时间: 2018-05-22T08:18:06Z
项目社区:https://github.com/sofastack/sofa-bolt-node

开源协议:MIT License

下载


sofa-bolt-node

Bolt 协议 Nodejs 实现版本

NPM version
build status
Test coverage
David deps
Known Vulnerabilities
npm download

一、简介

SOFABoltNode 是 SOFABolt 的 Nodejs 实现,它包含了 Bolt 通讯层协议框架,以及 RPC 应用层协议定制。和 Java 版本略有不同的是,它并不包含基础通讯功能(连接管理、心跳、自动重连等等),这些功能会放到专门的 RPC 模块里实现。

二、Bolt 通信层协议设计

Bolt 协议是一个标准的通讯层协议,目前包含两个大版本,定义如下:

V1 版本

  1. Request command protocol for v1
  2. 0 1 2 4 6 8 10 12 14 16
  3. +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
  4. |proto| type| cmdcode |ver2 | requestId |codec| timeout | classLen |
  5. +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
  6. |headerLen | contentLen | ... ... |
  7. +-----------+-----------+-----------+ +
  8. | className + header + content bytes |
  9. + +
  10. | ... ... |
  11. +-----------------------------------------------------------------------------------------------+
  12. Response command protocol for v1
  13. 0 1 2 3 4 6 8 10 12 14 16
  14. +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
  15. |proto| type| cmdcode |ver2 | requestId |codec|respstatus | classLen |headerLen |
  16. +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
  17. | contentLen | ... ... |
  18. +-----------------------+ +
  19. | header + content bytes |
  20. + +
  21. | ... ... |
  22. +-----------------------------------------------------------------------------------------------+

V2 版本

  1. Request command protocol for v2
  2. 0 1 2 4 6 8 10 11 12 14 16
  3. +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
  4. |proto| ver1|type | cmdcode |ver2 | requestId |codec|switch| timeout |
  5. +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
  6. |classLen |headerLen |contentLen | ... |
  7. +-----------+-----------+-----------+-----------+ +
  8. | className + header + content bytes |
  9. + +
  10. | ... ... | CRC32(optional) |
  11. +------------------------------------------------------------------------------------------------+
  12. Response command protocol for v2
  13. 0 1 2 3 4 6 8 10 11 12 14 16
  14. +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
  15. |proto| ver1| type| cmdcode |ver2 | requestId |codec|switch|respstatus | classLen |
  16. +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
  17. |headerLen | contentLen | ... |
  18. +-----------------------------------+ +
  19. | className + header + content bytes |
  20. + +
  21. | ... ... | CRC32(optional) |
  22. +------------------------------------------------------------------------------------------------+

V2 相比 V1 版本,主要两点改进:

  1. 增加了协议版本号(ver1)
  2. 协议层面支持了数据包的 CRC32 校验(后面详细介绍)

主要字段介绍:

  • proto: 协议标识位,bolt v1 是 0x01,bolt v2 是 0x02
  • ver1: bolt 协议版本,从 v2 开始 proto 不会再变,升级只变这个版本号
  • type: request/response/request oneway
  • cmdcode: request/response/heartbeat,和 type 有交叉
  • ver2: 应用层协议的版本(暂时没用)
  • requestId: 数据包唯一标识 id
  • codec: body 序列化方式,目前支持 hessian/hessian2/protobuf
  • switch: 是否开启 crc32 校验
  • headerLen: 自定义头部长度
  • contentLen: 内容长度
  • CRC32: 整个数据包通过计算出的 crc32 值(ver1 > 1 时支持)

三、功能介绍

基本 RPC 调用功能

客户端示例

  1. 'use strict';
  2. const net = require('net');
  3. const pump = require('pump');
  4. const protocol = require('sofa-bolt-node');
  5. const options = {
  6. sentReqs: new Map(),
  7. };
  8. const socket = net.connect(12200, '127.0.0.1');
  9. const encoder = protocol.encoder(options);
  10. const decoder = protocol.decoder(options);
  11. socket.once('connect', () => {
  12. console.log('connected');
  13. });
  14. socket.once('close', () => {
  15. console.log('close');
  16. });
  17. socket.once('error', err => {
  18. console.log(err);
  19. });
  20. // 流式 API
  21. pump(encoder, socket, decoder, err => {
  22. console.log(err);
  23. });
  24. // 监听 response / heartbeat_acl
  25. decoder.on('response', res => {
  26. console.log(res);
  27. });
  28. decoder.on('heartbeat_ack', res => {
  29. console.log(res);
  30. });
  31. // 发送 RPC 请求
  32. encoder.writeRequest(1, {
  33. args: [{
  34. $class: 'java.lang.String',
  35. $: 'peter',
  36. }],
  37. serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0',
  38. methodName: 'sayHello',
  39. timeout: 3000,
  40. });
  41. // 发送心跳包
  42. encoder.writeHeartbeat(2, { clientUrl: 'xxx' });

服务端示例

  1. 'use strict';
  2. const net = require('net');
  3. const pump = require('pump');
  4. const protocol = require('sofa-bolt-node');
  5. const server = net.createServer(socket => {
  6. const options = {
  7. sentReqs: new Map(),
  8. };
  9. const encoder = protocol.encoder(options);
  10. const decoder = protocol.decoder(options);
  11. pump(encoder, socket, decoder, err => {
  12. console.log(err);
  13. });
  14. decoder.on('request', req => {
  15. console.log(req);
  16. encoder.writeResponse(req, {
  17. isError: false,
  18. appResponse: {
  19. $class: 'java.lang.String',
  20. $: `hello ${req.data.args[0]} !`,
  21. },
  22. });
  23. });
  24. decoder.on('heartbeat', hb => {
  25. console.log(hb);
  26. encoder.writeHeartbeatAck(hb);
  27. });
  28. });
  29. server.listen(12200);

多种序列化方式支持

目前推荐的序列化方式是 protobuf,因为它跨语言性做得比较好。在蚂蚁内部其实我们主要使用的是 hessian 序列化,后面我们会陆续开源关于它的一系列最佳实践,尽请期待。下面我们演示一个 pb 的 demo

通过 *.proto 文件定义接口

  1. syntax = "proto3";
  2. package com.alipay.sofa.rpc.test;
  3. // 可选
  4. option java_multiple_files = false;
  5. service ProtoService {
  6. rpc echoObj (EchoRequest) returns (EchoResponse) {}
  7. }
  8. message EchoRequest {
  9. string name = 1;
  10. Group group = 2;
  11. }
  12. message EchoResponse {
  13. int32 code = 1;
  14. string message = 2;
  15. }
  16. enum Group {
  17. A = 0;
  18. B = 1;
  19. }

客户端使用 protobuf

  1. 'use strict';
  2. const net = require('net');
  3. const path = require('path');
  4. const pump = require('pump');
  5. const protocol = require('sofa-bolt-node');
  6. const protobuf = require('antpb');
  7. // 存放 *.proto 文件的目录,加载 proto
  8. const protoPath = path.join(__dirname, 'proto');
  9. const proto = protobuf.loadAll(protoPath);
  10. // 将 proto 作为参数传入 encoder/decoder
  11. const sentReqs = new Map();
  12. const encoder = protocol.encoder({ sentReqs, proto });
  13. const decoder = protocol.decoder({ sentReqs, proto });
  14. const socket = net.connect(12200, '127.0.0.1');
  15. socket.once('connect', () => {
  16. console.log('connected');
  17. });
  18. socket.once('close', () => {
  19. console.log('close');
  20. });
  21. socket.once('error', err => {
  22. console.log(err);
  23. });
  24. pump(encoder, socket, decoder, err => {
  25. console.log(err);
  26. });
  27. // 指定序列化方式为 protobuf
  28. encoder.codecType = 'protobuf';
  29. const req = {
  30. serverSignature: 'com.alipay.sofa.rpc.test.ProtoService:1.0',
  31. methodName: 'echoObj',
  32. args: [{
  33. name: 'peter',
  34. group: 'B',
  35. }],
  36. timeout: 3000,
  37. };
  38. decoder.on('response', res => {
  39. console.log(res.data.appResponse);
  40. });
  41. // 记录请求、发送请求
  42. sentReqs.set(1, { req });
  43. encoder.writeRequest(1, req);

服务端使用 protobuf

  1. 'use strict';
  2. const net = require('net');
  3. const path = require('path');
  4. const pump = require('pump');
  5. const protocol = require('sofa-bolt-node');
  6. const protobuf = require('antpb');
  7. const protoPath = path.join(__dirname, 'proto');
  8. const proto = protobuf.loadAll(protoPath);
  9. const server = net.createServer(socket => {
  10. const options = {
  11. sentReqs: new Map(),
  12. proto,
  13. };
  14. const encoder = protocol.encoder(options);
  15. const decoder = protocol.decoder(options);
  16. pump(encoder, socket, decoder, err => {
  17. console.log(err);
  18. });
  19. decoder.on('request', req => {
  20. const reqData = req.data.args[0].toObject({ enums: String });;
  21. encoder.writeResponse(req, {
  22. isError: false,
  23. appResponse: {
  24. code: 200,
  25. message: 'hello ' + reqData.name + ', you are in ' + reqData.group,
  26. },
  27. });
  28. });
  29. decoder.on('heartbeat', hb => {
  30. console.log(hb);
  31. encoder.writeHeartbeatAck(hb);
  32. });
  33. });
  34. server.listen(12200);

CRC32 校验

RPC 在网络传输过程中可能会遇到各种各样奇葩的问题,导致二进制被篡改,如果这个接口是和钱有关的,就可能导致资损,所以 Bolt 协议层面引入了一个校验功能,当开启时会在整个数据包后面额外传输 4 个 bytes 是数据包计算出来的 CRC32 值,接收端收到数据包以后先在本地重新计算 CRC32 值然后和附带的值比对,一致继续处理,不一致则直接报错

该功能由客户端开启,但是开启之前一般有一个协商的过程,服务端通过协商告诉客户端它支持 crc32 校验

  1. 'use strict';
  2. const net = require('net');
  3. const pump = require('pump');
  4. const protocol = require('sofa-bolt-node');
  5. const options = {
  6. sentReqs: new Map(),
  7. };
  8. const socket = net.connect(12200, '127.0.0.1');
  9. const encoder = protocol.encoder(options);
  10. const decoder = protocol.decoder(options);
  11. pump(encoder, socket, decoder);
  12. // 客户端开启 crc 校验
  13. encoder.protocolType = 'bolt2'; // v2 版本以上才支持 crc 校验
  14. encoder.boltVersion = 2;
  15. encoder.crcEnable = true;
  16. // 发送
  17. encoder.writeRequest(1, {
  18. args: [{
  19. $class: 'java.lang.String',
  20. $: 'peter',
  21. }],
  22. serverSignature: 'com.alipay.sofa.rpc.quickstart.HelloService:1.0',
  23. methodName: 'sayHello',
  24. timeout: 3000,
  25. });

四、用户接口

全局接口

  • encoder(options) 创建一个 ProtocolEncoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
    • @param {Url} [address] - TCP socket 地址
    • @param {String} [codecType] - 序列化方式
  • decoder(options) 创建一个 ProtocolDecoder
    • @param {Map} sentReqs - 用于存储发送出去的请求
    • @param {Map} [classCache] - 类定义缓存
    • @param {Object} [classMap] - hessian 序列化的类型定义
    • @param {Object} [proto] - protobuf 序列化的接口定义
  • setOptions(options) 设置一些全局的参数

ProtocolEncoder 接口

  • protocolType 设置协议,bolt/bolt2
  • codecType 设置序列化方式,hessian/hessian2/protobuf
  • boltVersion 设置 bolt 的版本
  • crcEnable 是否开启 crc 校验
  • writeRequest(id, req, [callback]) 发送请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} req - 请求对象
      • @param {String} serverSignature - 服务的唯一标识
      • @param {String} methodName - 方法名
      • @param {Array} args - 参数列表
      • @param {Number} timeout - 超时时间
      • @param {Object} requestProps - 额外传递的 kv 参数
  • writeResponse(req, res, [callback]) 发送响应
    • @param {Object} req - 请求对象,有请求才有响应
    • @parma {Object} res - 响应对象
      • @param {Boolean} isError - 是否成功
      • @param {String} errorMsg - 异常信息,isError=false 的话为 null
      • @param {Object} appResponse - 响应对象
      • @param {Object} responseProps - 额外传递的 kv 参数
  • writeHeartbeat(id, hb, [callback]) 发送心跳请求
    • @param {Number} id - 数据包唯一标识
    • @parma {Object} hb - 心跳对象
      • @param {String} clientUrl - 客户端 url
  • writeHeartbeatAck(hb, [callback]) 发送心跳响应
    • @parma {Object} hb - 心跳对象

五、接口设计思想

从上面的介绍和接口定义看,我们对协议的实现核心就是 Encoder 和 Decoder 两个类,并且采用了 Nodejs 里流(Stream)的风格

  1. +---------+ pipe +---------+ pipe +---------+ response
  2. | Encoder | ---> | Socket | ---> | Decoder | ...
  3. +---------+ +---------+ +---------+
  4. | ^
  5. | |
  6. | |
  7. v |
  8. +---------+ pipe +---------+ pipe +---------+ request
  9. | Encoder | ---> | Socket | ---> | Decoder | ...
  10. +---------+ +---------+ +---------+

所有的协议细节,数据的切分都封装在 Encoder/Decoder 两个类中,并且提供标准的 API,所以以后我们要替换其他的通讯层协议(比如:dubbo),那么只需要直接替换就好了

六、如何贡献

请告知我们可以为你做些什么,不过在此之前,请检查一下是否有已经存在的Bug或者意见

如果你是一个代码贡献者,请参考代码贡献规范

七、开源协议

MIT