http://www.voidcn.com/blog/mawming/article/p-3874119.html
Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。缺省协议,使用基于netty3.2.2+hessian3.2.1交互。
连接个数:单链接
连接方式:长连接
传输协议:TCP
传输方式:NIO异步传输
序列化: Hessian二进制序列化
适用范围:传入传出参数数据包较小(建议小于100k),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用。
为什么要消费者比提供者个数多:
因为dubbo协议采用单一长连接,假设网络为千兆网卡(1024Mbit=128MByte),根据测试经验数据每条连接最多只能压满7MByte(不同的网络环境可能不一样,仅供参考),理论上1个服务提供者需要20个服务消费者才能压满网卡。
为什么不能传大包:
因dubbo协议采用单一长连接,
如果每次请求的数据包大小为500KByte,则单个服务提供者的TPS(每秒处理事务数)最大为:128Mbyte/500KByte=262. 单个消费者调用单个服务提供者的TPS(每秒处理事务数)最大为: 7MByte/500KByte = 14.如果能接受,可以考虑使用,否则网络将成为瓶颈。
为什么采用异步单一长连接:
因为服务的现状大都是服务提供者少,通常只有几台机器,而服务的消费者多,可能整个网络都在访问该服务,比如Morgan的提供者只有6台提供者,却有上百台消费者,每天有1.5亿次调用,如果采用常规的hessian服务,服务提供者很容易就被压垮,通过单一连接,保证单一消费者不会压死提供者,长连接,减少连接握手验证等,并使用异步IO,复用线程池,防止C10K问题。
(1) 约束:
参数及返回值需实现Serializable接口
参数及返回值需有无参构造函数(可以是private的)或者有参构造所有函数允许传入null值。
参数及返回值不能自定义实现List, Map, Number, Date, Calendar等接口,只能用JDK自带的实现,因为hessian会做特殊处理,自定义实现类中的属性值都会丢失。
Hessian序列化,只传成员属性值和值的类型,不传方法或静态变量,兼容情况:(由吴亚军提供)
数据通讯 情况 结果
A->B 类A多一种 属性(或者说类B少一种 属性) 不抛异常,A多的那 个属性的值,B没有, 其他正常
A->B 枚举A多一种 枚举(或者说B少一种 枚举),A使用多 出来的枚举进行传输 抛异常
A->B 枚举A多一种 枚举(或者说B少一种 枚举),A不使用 多出来的枚举进行传输 不抛异常,B正常接 收数据
A->B A和B的属性 名相同,但类型不相同 抛异常
A->B serialId 不相同 正常传输
总结:会抛异常的情况:枚 举值一边多一种,一边少一种,正好使用了差别的那种,或者属性名相同,类型不同
接口增加方法,对客户端无影响,如果该方法不是客户端需要的,客户端不需要重新部署;
输入参数和结果集中增加属性,对客户端无影响,如果客户端并不需要新属性,不用重新
部署;
Dubbo底层采用Socket进行通信。
通信理论:
计算机与外界的信息交换成为通信,基本的通信方法有并行通信和串行通信两种。
1.一组消息(通常是字节)的各位数据被同时传送的通信方式为并行通信。并行通信依靠并行I/O接口实现。并行通信速度快,但传输根线多,只适用于近距离(相距数公尺)的通信。
2.一组消息的各位数据被逐位顺序传送的通信方式成为串行通信。串行通信可通过串行接口来实现,串行通信速度慢,但传输线少,适合长距离通信。
1) 单工
只能一个方向传输数据
2) 半双工
信息能双向传输,但不能同时双向传输
3) 全双工
能双向传输并且可以同时双向传输
- 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
- 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
- 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
- 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
关键代码:
com.taobao.remoting.impl.DefaultClient.java //同步调用远程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; } |
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋锁 while (!isDone) { // 是否有结果了 wait(); //没结果是释放锁,让当前线程处于等待状态 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //将远程调用结果设置到callback中来 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了 notifyAll(); // 唤醒处于等待的线程 } } |
com.taobao.remoting.impl.DefaultConnection.java
// 用来存放请求和回调的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents;
//发送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于记录日志 queue[idx++] = connRequest; //用于记录日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 记录响应队列 write(connRequest);
// 埋点记录等待响应的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()发送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注册FutureListener,当请求发送失败后,能够立即做出响应 writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); }
/** * 在得到响应后,删除对应的请求队列,并执行回调 * 调用者:MINA线程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回调任务交给业务提供的线程池执行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间 logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread;
CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); }
public void run() { // 预防这种情况:业务提供的Executor,让调用者线程来执行任务 if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; }
if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //设置调用结果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //处理调用异常 } } } |
另外:
1, 服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理
同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调