bluetjs

dubbo服务笔记一

604 浏览 0 回复 2016-11-01

http://www.voidcn.com/blog/mawming/article/p-3874119.html


        Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。缺省协议,使用基于netty3.2.2+hessian3.2.1交互。

    连接个数:单链接

    连接方式:长连接

    传输协议:TCP

    传输方式:NIO异步传输

    序列化:    Hessian二进制序列化

    适用范围:传入传出参数数据包较小(建议小于100k),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。

    适用场景:常规远程服务方法调用。

为什么要消费者比提供者个数多:

因为dubbo协议采用单一长连接,假设网络为千兆网卡(1024Mbit=128MByte),根据测试经验数据每条连接最多只能压满7MByte(不同的网络环境可能不一样,仅供参考),理论上1个服务提供者需要20个服务消费者才能压满网卡。

为什么不能传大包:

因dubbo协议采用单一长连接,

如果每次请求的数据包大小为500KByte,则单个服务提供者的TPS(每秒处理事务数)最大为:128Mbyte/500KByte=262. 单个消费者调用单个服务提供者的TPS(每秒处理事务数)最大为: 7MByte/500KByte = 14.如果能接受,可以考虑使用,否则网络将成为瓶颈。

为什么采用异步单一长连接:

因为服务的现状大都是服务提供者少,通常只有几台机器,而服务的消费者多,可能整个网络都在访问该服务,比如Morgan的提供者只有6台提供者,却有上百台消费者,每天有1.5亿次调用,如果采用常规的hessian服务,服务提供者很容易就被压垮,通过单一连接,保证单一消费者不会压死提供者,长连接,减少连接握手验证等,并使用异步IO,复用线程池,防止C10K问题。

    (1) 约束:

    参数及返回值需实现Serializable接口
    参数及返回值需有无参构造函数(可以是private的)或者有参构造所有函数允许传入null值。
    参数及返回值不能自定义实现List, Map, Number, Date, Calendar等接口,只能用JDK自带的实现,因为hessian会做特殊处理,自定义实现类中的属性值都会丢失。
    Hessian序列化,只传成员属性值和值的类型,不传方法或静态变量,兼容情况:(由吴亚军提供)
    数据通讯     情况     结果
    A->B     类A多一种 属性(或者说类B少一种 属性)     不抛异常,A多的那 个属性的值,B没有, 其他正常
    A->B     枚举A多一种 枚举(或者说B少一种 枚举),A使用多 出来的枚举进行传输     抛异常
    A->B     枚举A多一种 枚举(或者说B少一种 枚举),A不使用 多出来的枚举进行传输     不抛异常,B正常接 收数据
    A->B     A和B的属性 名相同,但类型不相同     抛异常
    A->B     serialId 不相同     正常传输

    总结:会抛异常的情况:枚 举值一边多一种,一边少一种,正好使用了差别的那种,或者属性名相同,类型不同

接口增加方法,对客户端无影响,如果该方法不是客户端需要的,客户端不需要重新部署;
输入参数和结果集中增加属性,对客户端无影响,如果客户端并不需要新属性,不用重新
部署;


   Dubbo底层采用Socket进行通信。

  通信理论:

  计算机与外界的信息交换成为通信,基本的通信方法有并行通信和串行通信两种。

 1.一组消息(通常是字节)的各位数据被同时传送的通信方式为并行通信。并行通信依靠并行I/O接口实现。并行通信速度快,但传输根线多,只适用于近距离(相距数公尺)的通信。

2.一组消息的各位数据被逐位顺序传送的通信方式成为串行通信。串行通信可通过串行接口来实现,串行通信速度慢,但传输线少,适合长距离通信。

1)   单工

只能一个方向传输数据


2)   半双工

信息能双向传输,但不能同时双向传输


3)   全双工

能双向传输并且可以同时双向传输


Socket:
 Socket是一种应用接口,TCP/IP是网络传输协议,虽然接口相同,但是不同的协议会有不同的服务性质。创建Socket连接时,可以指定使用的传输层协议,Socket可以支持不同的传输层协议(TCP或UDP),当使用TCP协议进行连接时,该Socket连接就是一个TCP连接。Socket跟TCP/IP并没有必然的联系,Socket编程接口在设计的时候,就希望能适应其他的网络协议,所以Socket的出现只是可以更方便的使用TCP/IP协议栈而已。
Dubbo远程同步调用原理分析
Dubbo缺省协议才用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

Dubbo缺省协议,使用基于mina1.1.7+hessian3.2。1的tbremoting交互。
连接个数:单连接
连接方式:  长连接
传输协议:   Tcp
传输方式:NIO异步传输
序列化:Hessian二进制序列化
适用范围:传入传出参数数据包较小(建议小于100k),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用。
 通常,一个典型的同步远程调用应该是这样的:
1, 客户端线程调用远程接口,向服务端发送请求,同时当前线程应该处于“暂停“状态,即线程不能向后执行了,必需要拿到服务端给自己的结果后才能向后执行
2,   服务端接到客户端请求后,处理请求,将结果给客户端
3,   客户端收到结果,然后当前线程继续往后执行

Dubbo  里使用到了  Socket  (采用  apache mina  框架做底层调用)来建立长连接,发送、接收数据,底层使用 apache mina  框架的  IoSession  进行发送消息。
查看  Dubbo  文档及源代码可知,  Dubbo  底层使用  Socket  发送消息的形式进行数据传递,结合了  mina  框架,使用 IoSession.write()  方法,这个方法调用后对于整个远程调用  (  从发出请求到接收到结果  )  来说是一个异步的,即对于当前线程来说,将请求发送出来,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现了  2  个问题:
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给clientclient收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
基本原理如下:
1.client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的。
2.将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
3.向专门存放调用信息的全局ConcurrentHashMap中put(Id,Object)
4.将ID和打包的方法调用信息封装成以对象connRequest,使用IoSession.write(connRequest)异步发送出去
5.当前线程再使用callback的get()方法视图获取远程方法返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁,再先检测是否已经获取到结果,如果没有,就调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
6.服务端接收到请求并处理后,将结果(此结果中包含了签名的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap中get(Id),从而找到callback,将方法调用结果设置到callback对象里。
7.监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已经释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。
需要注意的是,这里的callback对象是每次调用产生一个新的,不能共享,否则会有问题;另外ID必需至少保证在一个Socket连接里面是唯一的。
现在,前面两个问题已经有答案了,
  • 当前线程怎么让它“暂停”,等结果回来后,再向后执行?
     答:先生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用obj.notifyAll()唤醒前面处于等待状态的线程。
  • 正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给clientclient收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
     答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。


关键代码:

com.taobao.remoting.impl.DefaultClient.java

//同步调用远程接口

public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

        byte protocol = getProtocol(control);

        if (!TRConstants.isValidProtocol(protocol)) {

            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

        }

        ResponseFuture future = invokeWithFuture(appRequest, control);

        return future.get() //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback

}

public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {

         byte protocol = getProtocol(control);

         long timeout = getTimeout(control);

         ConnectionRequest request = new ConnectionRequest(appRequest);

         request.setSerializeProtocol(protocol);

         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

         connection.sendRequestWithCallback(request, adapter, timeout);

         return adapter;

}

 

Callback2FutureAdapter implements ResponseFuture

public Object get() throws RemotingException, InterruptedException {

synchronized (this) {  // 旋锁

    while (!isDone) {  // 是否有结果了

wait(); //没结果是释放锁,让当前线程处于等待状态

    }

}

if (errorCode == TRConstants.RESULT_TIMEOUT) {

    throw new TimeoutException("Wait response timeout, request["

    + connectionRequest.getAppRequest() + "].");

}

else if (errorCode > 0) {

    throw new RemotingException(errorMsg);

}

else {

    return appResp;

}

}

客户端收到服务端结果后,回调时相关方法,即设置isDone = truenotifyAll()

public void handleResponse(Object _appResponse) {

         appResp = _appResponse; //将远程调用结果设置到callback中来

         setDone();

}

public void onRemotingException(int _errorType, String _errorMsg) {

         errorCode = _errorType;

         errorMsg = _errorMsg;

         setDone();

}

private void setDone() {

         isDone = true;

         synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了

             notifyAll(); // 唤醒处于等待的线程

         }

}

 

com.taobao.remoting.impl.DefaultConnection.java

 

// 用来存放请求和回调的MAP

private final ConcurrentHashMap<Long, Object[]> requestResidents;

 

//发送消息出去

void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

         long requestId = connRequest.getId();

         long waitBegin = System.currentTimeMillis();

         long waitEnd = waitBegin + timeoutMs;

         Object[] queue = new Object[4];

         int idx = 0;

         queue[idx++] = waitEnd;

         queue[idx++] = waitBegin;   //用于记录日志

         queue[idx++] = connRequest; //用于记录日志

         queue[idx++] = callback;

         requestResidents.put(requestId, queue); // 记录响应队列

         write(connRequest);

 

         // 埋点记录等待响应的Map的大小

         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

                   1L);

}

public void write(final Object connectionMsg) {

//mina里的IoSession.write()发送消息

         WriteFuture writeFuture = ioSession.write(connectionMsg);

         // 注册FutureListener,当请求发送失败后,能够立即做出响应

         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

}

 

/**

在得到响应后,删除对应的请求队列,并执行回调

调用者:MINA线程

*/

public void putResponse(final ConnectionResponse connResp) {

         final long requestId = connResp.getRequestId();

         Object[] queue = requestResidents.remove(requestId);

         if (null == queue) {

             Object appResp = connResp.getAppResponse();

             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

             StringBuilder sb = new StringBuilder();

             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

             sb.append("from [").append(connResp.getHost()).append("],");

             sb.append("response type [").append(appRespClazz).append("].");

             LOGGER.warn(sb.toString());

             return;

         }

         int idx = 0;

         idx++;

         long waitBegin = (Long) queue[idx++];

         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

         ResponseCallback callback = (ResponseCallback) queue[idx++];

         // ** 把回调任务交给业务提供的线程池执行 **

         Executor callbackExecutor = callback.getExecutor();

         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));

 

         long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间

         logIfResponseError(connResp, duration, connRequest.getAppRequest());

}

 

CallbackExecutorTask

static private class CallbackExecutorTask implements Runnable {

         final ConnectionResponse resp;

         final ResponseCallback callback;

         final Thread createThread;

 

         CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

             resp = _resp;

             callback = _cb;

             createThread = Thread.currentThread();

         }

 

         public void run() {

             // 预防这种情况:业务提供的Executor,让调用者线程来执行任务

             if (createThread == Thread.currentThread()

                       && callback.getExecutor() != DIYExecutor.getInstance()) {

                   StringBuilder sb = new StringBuilder();

                   sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

                   sb.append("Can not callback task on the network io thhread.");

                   LOGGER.warn(sb.toString());

                   return;

             }

 

             if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

                   callback.handleResponse(resp.getAppResponse()); //设置调用结果

             }

             else {

                   callback.onRemotingException(resp.getResult(), resp

                            .getErrorMsg());  //处理调用异常

             }

         }

}

 

另外:

1, 服务端在处理客户端的消息,然后再处理时,使用了线程池来并行处理,不用一个一个消息的处理

同样,客户端接收到服务端的消息,也是使用线程池来处理消息,再回调


收藏
评论加载中...