RPC(remote procedure call)远程过程调用

RPC是为了在分布式应用中,两台主机的Java进程进行通信,当A主机调用B主机的方法时,过程简洁,就像是调用自己进程里的方法一样。
RPC框架的职责就是,封装好底层调用的细节,客户端只要调用方法,就能够获取服务提供者的响应,方便开发者编写代码。
RPC底层使用的是TCP协议,服务端和客户端和点对点通信。

作用

在RPC的应用场景中,客户端调用服务端的代码

客户端需要有相应的api接口,将方法名、方法参数类型、具体参数等等都发送给服务端

服务端需要有方法的具体实现,在接收到客户端的请求后,根据信息调用对应的方法,并返回响应给客户端

流程图演示

代码实现

首先客户端要知道服务端的接口,然后封装一个请求对象,发送给服务端

要调用一个方法需要有:方法名、方法参数类型、具体参数、执行方法的类名

View Code

由服务端返回给客户端的响应(方法调用结果)也使用一个对象进行封装

View Code

  • 如果是在多线程调用中,需要具体把每个响应返回给对应的请求,可以加一个ID进行标识

将对象通过网络传输,需要先进行序列化操作,这里使用的是jackson工具

  
  1. <dependency>

  2. <groupId>com.fasterxml.jackson.core</groupId>

  3. <artifactId>jackson-databind</artifactId>

  4. <version>2.11.4</version>

  5. </dependency>

View Code

  • 在反序列化过程中,需要指定要转化的类型,而服务端接收request,客户端接收response,二者类型是不一样的,所以在后续传输时指定类型

有了需要传输的数据后,使用Netty开启网络服务进行传输

服务端

绑定端口号,开启连接

  
  1. public class ServerNetty {

  2. public static void connect(int port) throws InterruptedException {

  3. EventLoopGroup workGroup = new NioEventLoopGroup();

  4. EventLoopGroup bossGroup = new NioEventLoopGroup();

  5. ServerBootstrap bootstrap = new ServerBootstrap();

  6. bootstrap.channel(NioServerSocketChannel.class)

  7. .group(bossGroup,workGroup)

  8. .childHandler(new ChannelInitializer<SocketChannel>() {

  9. @Override

  10. protected void initChannel(SocketChannel ch) throws Exception {

  11. /**

  12. * 加入自定义协议的数据处理器,指定接收到的数据类型

  13. * 加入服务端处理器

  14. */

  15. ch.pipeline().addLast(new NettyProtocolHandler(RpcRequest.class));

  16. ch.pipeline().addLast(new ServerHandler());

  17. }

  18. });

  19. bootstrap.bind(port).sync();

  20. }

  21. }

Netty中绑定了两个数据处理器

一个是数据处理器,服务端接收到请求->调用方法->返回响应,这些过程都在数据处理器中执行

  
  1. public class ServerHandler extends SimpleChannelInboundHandler {

  2. @Override

  3. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

  4. RpcRequest rpcRequest = (RpcRequest)msg;

  5. // 获取使用反射需要的各个参数

  6. String methodName = rpcRequest.getMethodName();

  7. Class[] paramTypes = rpcRequest.getParamType();

  8. Object[] args = rpcRequest.getArgs();

  9. String className = rpcRequest.getClassName();

  10. //从注册中心容器中获取对象

  11. Object object = Server.hashMap.get(className);

  12. Method method = object.getClass().getMethod(methodName,paramTypes);

  13. //反射调用方法

  14. String result = (String) method.invoke(object,args);

  15. // 将响应结果封装好后发送回去

  16. RpcResponse rpcResponse = new RpcResponse();

  17. rpcResponse.setCode(200);

  18. rpcResponse.setResult(result);

  19. ctx.writeAndFlush(rpcResponse);

  20. }

  21. }

  • 这里从hash表中获取对象,有一个预先进行的操作:将有可能被远程调用的对象放入容器中,等待使用

一个是自定义的TCP协议处理器,为了解决TCP的常见问题:因为客户端发送的数据包和服务端接收数据缓冲区之间,大小不匹配导致的粘包、拆包问题。

  
  1. /**

  2. * 网络传输的自定义TCP协议

  3. * 发送时:为传输的字节流添加两个魔数作为头部,再计算数据的长度,将数据长度也添加到头部,最后才是数据

  4. * 接收时:识别出两个魔数后,下一个就是首部,最后使用长度对应的字节数组接收数据

  5. */

  6. public class NettyProtocolHandler extends ChannelDuplexHandler {

  7. private static final byte[] MAGIC = new byte[]{0x15,0x66};

  8. private Class decodeType;

  9. public NettyProtocolHandler() {

  10. }

  11. public NettyProtocolHandler(Class decodeType){

  12. this.decodeType = decodeType;

  13. }

  14. @Override

  15. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

  16. ByteBuf in = (ByteBuf) msg;

  17. //接收响应对象

  18. Object dstObject;

  19. byte[] header = new byte[2];

  20. in.readBytes(header);

  21. byte[] lenByte = new byte[4];

  22. in.readBytes(lenByte);

  23. int len = ByteUtils.Bytes2Int_BE(lenByte);

  24. byte[] object = new byte[len];

  25. in.readBytes(object);

  26. dstObject = JsonSerialization.deserialize(object, decodeType);

  27. //交给下一个数据处理器

  28. ctx.fireChannelRead(dstObject);

  29. }

  30. @Override

  31. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

  32. ByteBuf byteBuf = Unpooled.buffer();

  33. //写入魔数

  34. byteBuf.writeBytes(MAGIC);

  35. byte[] object = JsonSerialization.serialize(msg);

  36. //数据长度转化为字节数组并写入

  37. int len = object.length;

  38. byte[] bodyLen = ByteUtils.int2bytes(len);

  39. byteBuf.writeBytes(bodyLen);

  40. //写入对象

  41. byteBuf.writeBytes(object);

  42. ctx.writeAndFlush(byteBuf);

  43. }

  44. }

  • 这个数据处理器是服务端和客户端都要使用的,就相当于是一个双方定好传输数据要遵守的协议
  • 在这里进行了对象的序列化和反序列化,所以反序列化类型在这个处理器中指定
  • 这里面要将数据的长度发送,需一个将整数类型转化为字节类型的工具

转化数据工具类

View Code

客户端

将Netty的操作封装了起来,最后返回一个Channle类型,由它进行发送数据的操作

  
  1. public class ClientNetty {

  2. public static Channel connect(String host,int port) throws InterruptedException {

  3. InetSocketAddress address = new InetSocketAddress(host,port);

  4. EventLoopGroup workGroup = new NioEventLoopGroup();

  5. Bootstrap bootstrap = new Bootstrap();

  6. bootstrap.channel(NioSocketChannel.class)

  7. .group(workGroup)

  8. .handler(new ChannelInitializer<SocketChannel>() {

  9. @Override

  10. protected void initChannel(SocketChannel ch) throws Exception {

  11. //自定义协议handler(客户端接收的是response)

  12. ch.pipeline().addLast(new NettyProtocolHandler(RpcResponse.class));

  13. //处理数据handler

  14. ch.pipeline().addLast(new ClientHandler());

  15. }

  16. });

  17. Channel channel = bootstrap.connect(address).sync().channel();

  18. return channel;

  19. }

  20. }

数据处理器负责接收response,并将响应结果放入在future中,future的使用在后续的动态代理中

  
  1. public class ClientHandler extends SimpleChannelInboundHandler {

  2. @Override

  3. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

  4. RpcResponse rpcResponse = (RpcResponse) msg;

  5. //服务端正常情况返回码为200

  6. if(rpcResponse.getCode() != 200){

  7. throw new Exception();

  8. }

  9. //将结果放到future里

  10. RPCInvocationHandler.future.complete(rpcResponse.getResult());

  11. }

  12. @Override

  13. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

  14. super.exceptionCaught(ctx, cause);

  15. }

  16. }

要让客户端在调用远程方法时像调用本地方法一样,就需要一个代理对象,供客户端调用,让代理对象去调用服务端的实现。

代理对象构造

  
  1. public class ProxyFactory {

  2. public static Object getProxy(Class<?>[] interfaces){

  3. return Proxy.newProxyInstance(ProxyFactory.class.getClassLoader(),

  4. interfaces,

  5. new RPCInvocationHandler());

  6. }

  7. }

客户端代理对象的方法执行

将request发送给服务端后,一直阻塞,等到future里面有了结果为止。

  
  1. public class RPCInvocationHandler implements InvocationHandler {

  2. static public CompletableFuture future;

  3. static Channel channel;

  4. static {

  5. future = new CompletableFuture();

  6. //开启netty网络服务

  7. try {

  8. channel = ClientNetty.connect("127.0.0.1",8989);

  9. } catch (InterruptedException e) {

  10. e.printStackTrace();

  11. }

  12. }

  13. @Override

  14. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

  15. RpcRequest rpcRequest = new RpcRequest();

  16. rpcRequest.setArgs(args);

  17. rpcRequest.setMethodName(method.getName());

  18. rpcRequest.setParamType(method.getParameterTypes());

  19. rpcRequest.setClassName(method.getDeclaringClass().getSimpleName());

  20. channel.writeAndFlush(rpcRequest);

  21. //一个阻塞操作,等待网络传输的结果

  22. String result = (String) future.get();

  23. return result;

  24. }

  25. }

  • 这里用static修饰future和channle,没有考虑到客户端去连接多个服务端和多次远程调用
  • 可以使用一个hash表,存储与不同服务端对应的channle,每次调用时从hash表中获取即可
  • 用hash表存储与不同request对应的future,每个响应的结果与之对应

客户端

要进行远程调用需要拥有的接口

  
  1. public interface OrderService {

  2. public String buy();

  3. }

预先的操作和测试代码

  
  1. public class Client {

  2. static OrderService orderService;

  3. public static void main(String[] args) throws InterruptedException {

  4. //创建一个代理对象给进行远程调用的类

  5. orderService = (OrderService) ProxyFactory.getProxy(new Class[]{OrderService.class});

  6. String result = orderService.buy();

  7. System.out.println(result);

  8. }

  9. }

服务端

要接受远程调用需要拥有的具体实现类

  
  1. public class OrderImpl implements OrderService {

  2. public OrderImpl() {

  3. }

  4. @Override

  5. public String buy() {

  6. System.out.println("调用buy方法");

  7. return "调用buy方法成功";

  8. }

  9. }

预先操作和测试代码

  
  1. public class Server {

  2. public static HashMap<String ,Object> hashMap = new HashMap<>();

  3. public static void main(String[] args) throws InterruptedException {

  4. //开启netty网络服务

  5. ServerNetty.connect(8989);

  6. //提前将需要开放的服务注册到hash表中

  7. hashMap.put("OrderService",new OrderImpl());

  8. }

  9. }

执行结果

原文链接:
https://www.cnblogs.com/davidFB/p/15481823.html

作者:划水的鱼dm