RPC August 24, 2020

RPC框架初体验之入门

Words count 26k Reading time 24 mins. Read count 0

项目地址:https://github.com/shirukai/learn-demo-rpc.git

RPC全称Remote Procedure Call,顾名思义,远程过程调用的意思。关于RPC的介绍,可以参考一下简书上《如何给老婆解释什么是RPC》这篇文章,很有趣。RPC这个概念,我第一次接触是在《Spark内核设计的艺术》这本书里。后来在看微服务的时候,也提及到了几款RPC框架,比如Thrift、Dubbo、gRPC。所以决定认真的学习一下RPC以及这几种框架。下面将会在本篇文章里入门RPC,动手实现一个简单的RPC,再基于Netty实现一个RPC,最后简单介绍一下几款常见的RPC框架,以及它们的优缺点。后面将会以系列的形式分别介绍这几款常见的RPC框架的使用。

1 动手实现一个简单的RPC

为了深入理解RPC,这里动手实现了一个简单的RPC,服务之前通过简单的socket进行通讯。

1.1 项目描述

在learn-demo-rpc项目下有一个simple-rpc的模块,该模块实现了一个简单的RPC,其中包括四个子模块

simple-rpc/
├── simple-rpc-api
├── simple-rpc-consumer
├── simple-rpc-core
└── simple-rpc-provider
  • simple-rpc-api: 该模块提供服务接口
  • simple-rpc-core: RPC核心实现
  • simple-rpc-consumer: RPC消费者服务
  • simple-rpc-provider: RPC提供者服务

simple-rpc-provider 模块实现simple-rpc-api定义的相关接口,并通过simple-rpc-core模块创建提供者服务。

simple-rpc-consumer通过simple-rpc-core模块创建消费者服务,并通过simple-rpc-api模块的接口进行RPC。

项目演示:

启动simple-rpc-provider模块里的DemoServiceProvider

启动simple-rpc-consumer里的DemoServiceConsumer

1.2 simple-rpc-core模块

该模块为核心模块,分别提供了服务者、消费者服务创建。如下所示,主要包括四个功能。request包定义RPC请求数据类型,response包定义RPC响应数据类型,server提供RPC的provider服务,client提供RPC的consumer服务。

1.2.1 request

在该包下创建RpcRequest类,用来定义请求数据类型实体,该实体主要包含,远程调用的方法名称、参数列表、参数类型列表。

package learn.demo.rpc.simple.core.request;

import java.io.Serializable;
import java.util.Arrays;

/**
 * Created by shirukai on 2019-06-21 15:02
 * RPC 请求
 */
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = 4932007273709224551L;
    /**
     * 方法名称
     */
    private String methodName;

    /**
     * 参数列表
     */
    private Object[] parameters;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;
    /**
     * 省略get、set方法。
     */
}

1.2.2 response

在该包下创建RpcResponse类,用来定义RPC响应的数据类型,其中包含响应状态status,用来描述请求是否执行成功,它有两个状态succeed和failed。响应信息message主要存放异常响应时的错误信息。响应数据data,远程调用方法的返回值。

public class RpcResponse implements Serializable {
    public static String SUCCEED = "succeed";
    public static String FAILED = "failed";
    private static final long serialVersionUID = 6595683424889346485L;

    /**
     * 响应状态
     */
    private String status = "succeed";
    /**
     * 响应信息,如异常信息
     */
    private String message;

    /**
     * 响应数据,返回值
     */
    private Object data;
    /**
     * 省略get、set方法
     */
}

1.2.3 server

在该包下创建RpcProvider类,用来定义创建Provider服务的方法。原理很简单,根据指定的端口创建ServerSocket,监听客户端发送数据。接收到客户端发送数据后,反序列化成Request,获取其中的方法名和参数类型及参数列表,根据传入的接口class和实例,通过反射机制,调用该方法,拿到执行结果后封装成RpcResponse返回给客户端。具体实现如下:

package learn.demo.rpc.simple.core.server;

import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * Created by shirukai on 2019-06-21 16:26
 * RPC provider
 */
public class RpcProvider<T> {
    private static final Logger log = LoggerFactory.getLogger(RpcProvider.class);
    private T ref;

    private Class<?> interfaceClass;

    public void setRef(T ref) {
        this.ref = ref;
    }

    public RpcProvider<T> setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
        return this;
    }

    public void export(int port) {
        try {
            log.info("The RPC Server is starting, address:{}, bind:{}", InetAddress.getLocalHost().getHostAddress(), port);
            ServerSocket listener = new ServerSocket(port);
            while (true) {
                Socket socket = listener.accept();
                // 接收数据并进行反序列化
                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

                // 获取请求对象
                Object object = objectInputStream.readObject();

                if (object instanceof RpcRequest) {
                    RpcRequest request = (RpcRequest) object;
                    log.info("Received request:{}", request);
                    // 处理请求
                    RpcResponse response = handleRequest(request);
                    // 将结果返回给客户端
                    log.info("Send response to client.{}", response);
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                    objectOutputStream.writeObject(response);
                }
                socket.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private RpcResponse handleRequest(RpcRequest request) {
        RpcResponse response = new RpcResponse();
        try {
            log.info("The server is handling request.");
            Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes());
            Object data = method.invoke(ref, request.getParameters());
            response.setData(data);
        } catch (Exception e) {
            response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage());
        }
        return response;
    }
}

1.2.4 client

客户端的实现比较有趣,包含如下内容

client/
├── RpcClient.java
├── RpcConsumer.java
└── proxy
    └── RpcInvocationHandler.java

原理也不复杂,通过上面的结构可以看出,我们clien里包含了一个proxy包,该包主要实现的是一个动态代理。客户端实现API接口的动态代理,生成接口实例,表面上调用的接口方法,通过代理后,经过RpcClient进行的远程调用,也就是我们的定义的RPC,拿到结果后再返回。

1.2.4.1 RpcClient

RpcClient主要是与远程ServerSocket进行通讯的,创建根据IP和端口创建Socket,将RpcRequest进行序列化之后,发送给远程服务。

package learn.demo.rpc.simple.core.client;

import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;

/**
 * Created by shirukai on 2019-06-21 15:42
 * Rpc客户端
 */
public class RpcClient {
    /**
     * 服务地址
     */
    private String address;

    /**
     * 服务端口
     */
    private int port;


    public RpcResponse send(RpcRequest rpcRequest) throws Exception {

        Socket socket = new Socket(address, port);

        //请求序列化
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

        //将请求发给服务提供方
        objectOutputStream.writeObject(rpcRequest);

        // 将响应体反序列化
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

        Object response = objectInputStream.readObject();
        if (response instanceof RpcResponse) {
            return (RpcResponse) response;
        }
        throw new RuntimeException("Return error");
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

1.2.4.2 RpcInvocationHandler

在proxy下创建RpcInvocationHandler,调用处理器,继承InvocationHandler接口并实现其invoke方法。实现代理逻辑。如下所示:

package learn.demo.rpc.simple.core.client.proxy;

import learn.demo.rpc.simple.core.client.RpcClient;
import learn.demo.rpc.simple.core.request.RpcRequest;
import learn.demo.rpc.simple.core.response.RpcResponse;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * Created by shirukai on 2019-06-21 15:43
 * RPC 代理处理器
 */
public class RpcInvocationHandler implements InvocationHandler {
    private RpcClient client;

    public RpcInvocationHandler(RpcClient client) {
        this.client = client;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 构建请求对象
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setMethodName(method.getName())
                .setParameterTypes(method.getParameterTypes())
                .setParameters(args);
        // 使用客户端发送请求
        RpcResponse response = client.send(rpcRequest);

        // 响应成功返回结果
        if (RpcResponse.SUCCEED.equals(response.getStatus())) {
            return response.getData();
        }
        throw new RuntimeException(response.getMessage());
    }
}

1.2.4.3 RpcConsumer

该类通过实例化RpcClient以及创建代理实例来构建生产者

package learn.demo.rpc.simple.core.client;

import learn.demo.rpc.simple.core.client.proxy.RpcInvocationHandler;

import java.lang.reflect.Proxy;

/**
 * Created by shirukai on 2019-06-21 16:11
 * 生产者构建器
 */
public class RpcConsumer {
    private String address;
    private int port;

    private Class<?> interfaceClass;

    public RpcConsumer setAddress(String address) {
        this.address = address;
        return this;
    }

    public RpcConsumer setPort(int port) {
        this.port = port;
        return this;
    }

    public RpcConsumer setInterface(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
        return this;
    }

    public <T> T get() {
        RpcClient client = new RpcClient();
        client.setAddress(address);
        client.setPort(port);
        // 实例化RPC代理处理器
        RpcInvocationHandler handler = new RpcInvocationHandler(client);
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
    }
}

1.3 simple-rpc-api模块

上文也提到,该模块只是提供一个公用的接口API,没有特殊方法。如下提供一个名为DemoService的接口,定义如下接口:

package learn.demo.rpc.simple.api;

/**
 * Created by shirukai on 2019-06-21 10:54
 * DemoService 接口
 */
public interface DemoService {
    String sayHello(String name);

    String sayGoodbye(String name);
}

1.4 simple-rpc-provider模块

上面我们已经实现了核心模块core以及接口api,这里我们调用这两个模块进行提供者服务的创建。分为接口实现,和服务创建两部分。

1.4.1 API接口实现

引入我们创建的simple-rpc-api模块

        <dependency>
            <groupId>learn.demo</groupId>
            <artifactId>simple-rpc-api</artifactId>
            <version>1.0</version>
            <scope>compile</scope>
        </dependency>

创建DemoServiceImpl实现DemoService接口

package learn.demo.rpc.simple.provider;

import learn.demo.rpc.simple.api.DemoService;

/**
 * Created by shirukai on 2019-06-21 10:55
 * 接口实现类
 */
public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        return "This is simple RPC service.\nHello " + name;
    }

    @Override
    public String sayGoodbye(String name) {
        return "This is simple RPC service.\nGoodbye " + name;
    }
}

1.4.2 RPC Provider服务创建

使用simple-rpc-core模块创建RpcProvider实例,然后启动服务。这里向外暴露端口9090。

package learn.demo.rpc.simple.provider;


import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.server.RpcProvider;

/**
 * Created by shirukai on 2019-06-21 10:56
 * 服务提供者
 */
public class DemoServiceProvider {

    public static void main(String[] args) {
        DemoServiceImpl demoService = new DemoServiceImpl();

        RpcProvider<DemoService> provider = new RpcProvider<>();
        provider.setInterfaceClass(DemoService.class)
                .setRef(demoService);

        provider.export(9090);
    }
}

1.5 simple-rpc-consumer模块

通过simple-rpc-cor模块创建RpcConsumer实例,设置Provider地址和端口,然后获取接口实例,调用相关方法。

package learn.demo.rpc.simple.consumer;

import learn.demo.rpc.simple.api.DemoService;
import learn.demo.rpc.simple.core.client.RpcConsumer;

/**
 * Created by shirukai on 2019-06-21 11:29
 * 消费者
 */
public class DemoServiceConsumer {
    public static void main(String[] args) {
        RpcConsumer consumer = new RpcConsumer();
        consumer.setAddress("127.0.0.1");
        consumer.setPort(9090);
        consumer.setInterface(DemoService.class);

        DemoService service = consumer.get();

        System.out.println(service.sayGoodbye("hahah"));
    }
}

2 基于ZooKeeper注册中心的RPC实现

上面我们介绍了通过直连的方式,实现了一个简单RPC,我们也可以通过注册中心的形式去实现RPC,这涉及到了服务注册和服务发现。这里使用ZooKeeper作为注册中心,也简单的进行了RPC的实现,其中有些地方没有进行详细实现,比如服务的负载均衡。该部分的代码在learn-demo-rpc下的zk-registry-rpc模块下,目录结构如simple-rpc相同,如下所示:

zk-registry-rpc/
├── zk-registry-rpc-api
├── zk-registry-rpc-consumer
├── zk-registry-rpc-core
└── zk-registry-rpc-provider

内容改动不大,主要在core的实现上,加入了注册中心,进行服务发现和服务注册。在Rpc的请求上,也加入了ID字段,用来表示需要调用哪个服务。下面将对几处修改的地方进行讲解。

2.1 zk-registry-rpc-core模块

2.1.1 request

上面提到对RpcRequest进行简单修改,加入Id字段,用来描述调用的是那个接口下的服务,所以此id是使用接口名生成的。

    /**
     * 请求ID,接口类名
     */
    private String id;

2.1.2 registry

这里主要是对注册中心的实现,其中包括ProviderInfo实体类,用来描述提供者信息,如id、address、port。另外包括RpcZKRegistryService注册中心服务。

2.1.2.1 ProviderInfo

package learn.demo.rpc.zk.core.registry;

import com.alibaba.fastjson.JSON;

/**
 * Created by shirukai on 2019-06-25 16:34
 * Provider信息
 */
public class ProviderInfo {
    /**
     * 提供者ID
     */
    private String id;
    /**
     * 提供者地址
     */
    private String address;
    /**
     * 提供者端口
     */
    private int port;

    public String getId() {
        return id;
    }

    public ProviderInfo setId(String id) {
        this.id = id;
        return this;
    }

    public String getAddress() {
        return address;
    }

    public ProviderInfo setAddress(String address) {
        this.address = address;
        return this;
    }

    public int getPort() {
        return port;
    }

    public ProviderInfo setPort(int port) {
        this.port = port;
        return this;
    }

    public String toJSONString() {
        return JSON.toJSONString(this);
    }

    @Override
    public String toString() {
        return "ProviderInfo{" +
                "id='" + id + '\'' +
                ", address='" + address + '\'' +
                ", port=" + port +
                '}';
    }
}

2.1.2.2 RpcZKRegistryService

注册中心的实现,主要包括三个功能:服务注册、服务发现、服务监听。Provider通过调用注册中的服务注册,将自己的信息注册到ZK中,Consumer通过调用注册中心的服务发现,查找自己想要请求的服务列表,并通过服务监听,更新服务列表。具体实现如下所示:

package learn.demo.rpc.zk.core.registry;

import com.alibaba.fastjson.JSON;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * Created by shirukai on 2019-06-25 16:42
 * Rpc注册服务
 */
public class RpcZKRegistryService {
    private static final Logger log = LoggerFactory.getLogger(RpcZKRegistryService.class);
    private static final String NAMESPACE = "zk-rpc";
    private static final String RPC_PROVIDER_NODE = "/provider";


    private final Map<String, ProviderInfo> remoteProviders = new HashMap<>();
    private CuratorFramework zkClient;

    public RpcZKRegistryService(String zkConnectString) {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        // 获取客户端
        this.zkClient = CuratorFrameworkFactory.builder()
                .connectString(zkConnectString)
                .sessionTimeoutMs(10000)
                .retryPolicy(retryPolicy)
                .namespace(NAMESPACE)
                .build();
        this.zkClient.start();
    }

    /**
     * 注册服务
     *
     * @param providerInfo 提供者信息
     */
    public void register(ProviderInfo providerInfo) {
        String nodePath = RPC_PROVIDER_NODE + "/" + providerInfo.getId();
        try {
            // 判断节点存不存在,不存在则创建,存在则报异常
            Stat stat = zkClient.checkExists().forPath(nodePath);
            if (stat == null) {
                // 创建临时节点
                zkClient.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                        .forPath(nodePath, providerInfo.toJSONString().getBytes());
            } else {
                log.error("The provider already exists.{}", providerInfo.toJSONString());
            }
        } catch (Exception e) {
            log.error("Registration provider failed.{}", e.getMessage());
        }
    }


    /**
     * 订阅服务
     *
     * @param id 提供者ID,接口名字
     */
    public void subscribe(String id) {
        try {
            // 获取所有的Provider
            List<String> providerIds = zkClient.getChildren().forPath(RPC_PROVIDER_NODE);
            for (String providerId : providerIds) {
                // 如果与订阅服务相同,则获取节点信息
                if (providerId.contains(id)) {
                    String nodePath = RPC_PROVIDER_NODE + "/" + providerId;
                    byte[] data = zkClient.getData().forPath(nodePath);
                    ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
                    this.remoteProviders.put(providerId, info);
                }
            }

            // 添加监听事件
            addProviderWatch(id);
        } catch (Exception e) {
            log.error("Subscription provider failed.");
        }

    }

    /**
     * 添加监听事件
     *
     * @param id 提供者ID,接口名字
     */
    private void addProviderWatch(String id) throws Exception {
        // 创建子节点缓存
        final PathChildrenCache childrenCache = new PathChildrenCache(this.zkClient, RPC_PROVIDER_NODE, true);
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 添加子节点监听事件
        childrenCache.getListenable().addListener((client, event) -> {
            String nodePath = event.getData().getPath();
            // 如果监听节点为订阅的ProviderID
            if (nodePath.contains(id)) {

                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    // 节点移除
                    this.remoteProviders.remove(nodePath);

                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
                    byte[] data = event.getData().getData();
                    ProviderInfo info = JSON.parseObject(data, ProviderInfo.class);
                    // 节点添加
                    this.remoteProviders.put(nodePath, info);
                }
            }
        });
    }

    public Map<String, ProviderInfo> getRemoteProviders() {
        return remoteProviders;
    }
}

2.1.3 server

server的改动不大,主要是加入了设置zk连接的方法,以及将自己的信息通过注册中心注册到zk的逻辑。

// 设置zk连接    
public RpcProvider<T> setZKConnectString(String zkConnectString) {
        this.registryService = new RpcZKRegistryService(zkConnectString);
        return this;
    }
// 生成服务信息
ProviderInfo providerInfo = new ProviderInfo();
providerInfo.setAddress(InetAddress.getLocalHost().getHostAddress())
        .setPort(port)
        .setId(interfaceClass.getName());
// 创建服务
ServerSocket listener = new ServerSocket(port);

// 服务创建完成后将信息注册到zk
registryService.register(providerInfo);

2.1.4 client

客户端主要修改了RpcConsumer,添加了服务发现,和模拟负载均衡的两个方法。

    /**
     * 获取所有Providers
     *
     * @return list
     */
    private List<ProviderInfo> lookupProviders() {
        // 订阅服务
        registryService.subscribe(interfaceClass.getName());
        // 获取所有Provider
        Map<String, ProviderInfo> providers = registryService.getRemoteProviders();
        return new ArrayList<>(providers.values());
    }

    /**
     * 模拟负载均衡
     *
     * @param providers provider 列表
     * @return ProviderInfo
     */
    private static ProviderInfo chooseTarget(List<ProviderInfo> providers) {
        if (providers == null || providers.isEmpty()) {
            throw new RuntimeException("providers has not exits!");
        }
        return providers.get(0);
    }

然后再创建代理实例之前调用服务发现和负载均衡方法

    public <T> T get() {
        List<ProviderInfo> providers = lookupProviders();

        ProviderInfo provider = chooseTarget(providers);

        RpcClient client = new RpcClient();
        client.setAddress(provider.getAddress());
        client.setPort(provider.getPort());
        // 实例化RPC代理处理器
        RpcInvocationHandler handler = new RpcInvocationHandler(client);
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler);
    }

3 常见RPC框架

上面通过动手实现一个简单的RPC,大体对RPC的工作流程有了一定的了解,当然我们写的只是一个简单的RPC,Demo级别的,只能玩玩不能用于生产。如果想提高性能,可以考虑使用NIO Socket进行通信,也可以基于Netty进行RPC通信,也可以通过利用一下已有的RPC框架,这里就简单对比一下几款常见的RPC框架。

PRC对比 Dubbo Motan Thrift Grpc
开发语言 java java 跨语言 跨语言
服务治理
多种序列化 只支持thrift 只支持protobuf
多种注册中心
管理中心
跨语言通讯
整体性能 3 4 5 3

等有时间整理一下Dubbo以及Thrift的简单使用。

0%